You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
230 lines
8.8 KiB
230 lines
8.8 KiB
# coding=utf-8
|
|
import asyncio
|
|
import json
|
|
from functools import partial, wraps
|
|
|
|
from glom import glom
|
|
from playwright.async_api import async_playwright, Browser
|
|
|
|
import config
|
|
from abs_spider import AbstractAiSeoSpider
|
|
from domain.ai_seo import AiAnswer, AiSearchResult
|
|
from utils import create_logger
|
|
from utils.ai_seo_api_utils import AiSeoApis
|
|
from utils.image_utils import crop_image_left
|
|
from utils.session_utils import get_spider_session
|
|
|
|
logger = create_logger(__name__)
|
|
|
|
|
|
class YiYanSpider(AbstractAiSeoSpider):
|
|
|
|
def __init__(self, browser: Browser, prompt: str, keyword: str, think: bool = False):
|
|
super().__init__(browser, prompt, keyword, think)
|
|
self.__listen_response = self.handle_listen_response_error(self.__listen_response)
|
|
|
|
def get_home_url(self) -> str:
|
|
return 'https://yiyan.baidu.com/'
|
|
|
|
async def _do_spider(self) -> AiAnswer:
|
|
# 初始化数据
|
|
self.think = True
|
|
self._init_data()
|
|
await self.browser_page.goto(self.get_home_url(), timeout=600000)
|
|
await asyncio.sleep(2)
|
|
# 检查登录状态
|
|
await self.check_login()
|
|
if self.think:
|
|
think_btn = self.browser_page.locator("//span[text()='思考·自动']/parent::div")
|
|
await think_btn.click()
|
|
await self.browser_page.locator("//div[contains(@class, 'dtModeItem__')][2]").click()
|
|
# 开始操作
|
|
chat_input_element = self.browser_page.locator("//div[@class='yc-editor']")
|
|
await chat_input_element.click()
|
|
await asyncio.sleep(2)
|
|
# 输入提问词
|
|
await self.browser_page.keyboard.insert_text(self.prompt)
|
|
await asyncio.sleep(2)
|
|
await self.browser_page.keyboard.press('Enter')
|
|
# 监听请求
|
|
self.browser_page.on('response', partial(self.__listen_response))
|
|
if self.think:
|
|
self.browser_page.on('response', partial(self.__listen_response_thinking))
|
|
await asyncio.sleep(2)
|
|
try:
|
|
await self.browser_page.wait_for_selector("//div[@data-auto-test='anew_response']", state='attached', timeout=600000)
|
|
logger.debug('ai回答完毕')
|
|
except TimeoutError as e:
|
|
logger.error('ai回答超时')
|
|
|
|
# 报错检查
|
|
if self.fail_status:
|
|
raise self.fail_exception
|
|
|
|
# 打开搜索结果
|
|
open_search_btn_element = self.browser_page.locator(f"div:text('参考{len(self.ai_answer.search_result)}个网页')")
|
|
if await open_search_btn_element.count() > 0:
|
|
await open_search_btn_element.click()
|
|
|
|
# 获取回答元素
|
|
answer_element = self.browser_page.locator("//div[contains(@class, 'dialog-card-wrapper')]").nth(-1)
|
|
box = await answer_element.bounding_box()
|
|
logger.debug(f'answer_element: {box}')
|
|
view_port_height = box['height'] + 1000
|
|
|
|
# 调整视口大小
|
|
await self.browser_page.set_viewport_size({
|
|
'width': 1920,
|
|
'height': int(view_port_height)
|
|
})
|
|
|
|
await asyncio.sleep(2)
|
|
# 截图
|
|
screenshot_path = self._get_screenshot_path()
|
|
await self.browser_page.screenshot(path=screenshot_path)
|
|
# 切割图片
|
|
crop_image_left(screenshot_path, 260)
|
|
|
|
self.ai_answer.screenshot_file = screenshot_path
|
|
return self.ai_answer
|
|
|
|
async def __listen_response(self, response):
|
|
if '/chat/history' not in response.url:
|
|
return
|
|
answer = ''
|
|
chat = {}
|
|
json_data = await response.json()
|
|
chats = list(dict.values(glom(json_data, 'data.chats', default={})))
|
|
# 选择目标chat
|
|
for _chat in chats:
|
|
if not _chat.get('role', '') == 'robot':
|
|
continue
|
|
content = glom(_chat, 'message.0.content', default='')
|
|
if not content:
|
|
continue
|
|
chat = _chat
|
|
break
|
|
if not chat:
|
|
return
|
|
answer = glom(chat, 'message.0.content', default="")
|
|
# 搜索结果
|
|
if not self.think:
|
|
search_result_list = glom(chat, 'searchCitations.list', default=[])
|
|
# 保存搜索结果
|
|
ai_search_result_list = []
|
|
for search_result in search_result_list:
|
|
url = search_result.get('url', '')
|
|
title = search_result.get('title', '')
|
|
desc = search_result.get('wild_abstract', '')
|
|
host_name = search_result.get('site', '')
|
|
date = search_result.get('date', '')
|
|
logger.debug(f"ai参考资料: [{host_name}][{date}]{title}({url})")
|
|
ai_search_result_list.append(AiSearchResult(
|
|
url=url,
|
|
title=title,
|
|
host_name=host_name,
|
|
body=desc,
|
|
publish_time=date
|
|
))
|
|
self.ai_answer.search_result = ai_search_result_list
|
|
self.ai_answer.answer = answer
|
|
|
|
async def __listen_response_thinking(self, response):
|
|
if '/chat/conversation/v2' not in response.url:
|
|
return
|
|
# 读取流式数据
|
|
data = {}
|
|
search_list = []
|
|
stream = await response.body()
|
|
response_text = stream.decode('utf-8')
|
|
response_lines = response_text.split("\n\n")
|
|
for line in response_lines:
|
|
sub_lines = line.split("\n")
|
|
for sub_line in sub_lines:
|
|
if sub_line.startswith('data:'):
|
|
json_data = json.loads(sub_line[5:])
|
|
history_need = json_data.get('historyNeed', None)
|
|
search_list = json_data.get('contents', [])
|
|
if history_need == 1 and search_list:
|
|
break
|
|
if search_list:
|
|
break
|
|
ai_search_result_list = []
|
|
for search_result in search_list:
|
|
url = search_result.get('url', '')
|
|
title = search_result.get('title', '')
|
|
desc = search_result.get('siteAbstract', '')
|
|
host_name = search_result.get('name', '')
|
|
date = search_result.get('publishTime', '')
|
|
logger.debug(f"ai参考资料: [{host_name}][{date}]{title}({url})")
|
|
ai_search_result_list.append(AiSearchResult(
|
|
url=url,
|
|
title=title,
|
|
host_name=host_name,
|
|
body=desc,
|
|
publish_time=date
|
|
))
|
|
self.ai_answer.search_result = ai_search_result_list
|
|
|
|
async def check_login(self):
|
|
# 找登录后按钮
|
|
login_btn = self.browser_page.locator("//button[contains(@class, 'ebButton') and normalize-space(text())='登录']")
|
|
if await login_btn.count() > 0:
|
|
# 更新session状态
|
|
await AiSeoApis.update_spider_session(self.session_info['id'], 2)
|
|
raise Exception(f"{self.get_platform_name()}登录失败 session_id: {self.session_info['id']}")
|
|
|
|
|
|
async def do_check_session(self) -> bool:
|
|
try:
|
|
await self.browser_page.goto(self.get_home_url(), timeout=200000)
|
|
await asyncio.sleep(2)
|
|
# 检查登录状态
|
|
await self.check_login()
|
|
# 开始操作
|
|
chat_input_element = self.browser_page.locator("//div[@class='yc-editor']")
|
|
await chat_input_element.click()
|
|
await asyncio.sleep(2)
|
|
# 输入提问词
|
|
await self.browser_page.keyboard.insert_text(self.prompt)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def get_platform_id(self) -> int:
|
|
return 6
|
|
|
|
def get_platform_name(self) -> str:
|
|
return 'YiYan'
|
|
|
|
def handle_listen_response_error(self, func):
|
|
"""
|
|
装饰器 用于处理请求回调中的异常
|
|
:param func:
|
|
:return:
|
|
"""
|
|
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True)
|
|
# 标记失败状态 记录异常
|
|
self.fail_status = True
|
|
self.fail_exception = e
|
|
self.completed_event.set()
|
|
|
|
return wrapper
|
|
|
|
async def run():
|
|
playwright = await async_playwright().start()
|
|
browser = await playwright.chromium.launch(headless=config.BROWSER_HANDLESS,
|
|
chromium_sandbox=config.BROWSER_ENABLE_SANDBOX,
|
|
ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
|
|
channel="chrome",
|
|
args=config.BROWSER_ARGS)
|
|
spider = YiYanSpider(browser, '你好', '')
|
|
await spider.run()
|
|
if __name__ == '__main__':
|
|
asyncio.run(run())
|