# coding=utf-8 import asyncio import json from functools import partial, wraps from glom import glom from playwright.async_api import async_playwright, Browser import config from abs_spider import AbstractAiSeoSpider from domain.ai_seo import AiAnswer, AiSearchResult from utils import create_logger from utils.ai_seo_api_utils import AiSeoApis from utils.image_utils import crop_image_left from utils.session_utils import get_spider_session logger = create_logger(__name__) class YiYanSpider(AbstractAiSeoSpider): def __init__(self, browser: Browser, prompt: str, keyword: str, think: bool = False): super().__init__(browser, prompt, keyword, think) self.__listen_response = self.handle_listen_response_error(self.__listen_response) def get_home_url(self) -> str: return 'https://yiyan.baidu.com/' async def _do_spider(self) -> AiAnswer: # 初始化数据 self._init_data() await self.browser_page.goto(self.get_home_url(), timeout=600000) await asyncio.sleep(2) # 检查登录状态 await self.check_login() if self.think: think_btn = self.browser_page.locator("span:text('深度思考(X1 Turbo)')").locator('..') clazz = (await think_btn.get_attribute('class')).split(' ') if await think_btn.is_visible(): if len(clazz) == 1: await think_btn.click() await asyncio.sleep(2) # 开始操作 chat_input_element = self.browser_page.locator("//div[@class='yc-editor']") await chat_input_element.click() await asyncio.sleep(2) # 输入提问词 await self.browser_page.keyboard.insert_text(self.prompt) await asyncio.sleep(2) await self.browser_page.keyboard.press('Enter') # 监听请求 self.browser_page.on('response', partial(self.__listen_response)) if self.think: self.browser_page.on('response', partial(self.__listen_response_thinking)) await asyncio.sleep(2) try: await self.browser_page.wait_for_selector("//div[@data-auto-test='anew_response']", state='attached', timeout=600000) logger.debug('ai回答完毕') except TimeoutError as e: logger.error('ai回答超时') # 报错检查 if self.fail_status: raise self.fail_exception # 打开搜索结果 open_search_btn_element = self.browser_page.locator("div:text('条网页信息源')") if await open_search_btn_element.count() > 0: await open_search_btn_element.click() # 获取回答元素 answer_element = self.browser_page.locator("//div[contains(@class, 'dialog-card-wrapper')]").nth(-1) box = await answer_element.bounding_box() logger.debug(f'answer_element: {box}') view_port_height = box['height'] + 1000 # 调整视口大小 await self.browser_page.set_viewport_size({ 'width': 1920, 'height': int(view_port_height) }) await asyncio.sleep(2) # 截图 screenshot_path = self._get_screenshot_path() await self.browser_page.screenshot(path=screenshot_path) # 切割图片 crop_image_left(screenshot_path, 260) self.ai_answer.screenshot_file = screenshot_path return self.ai_answer async def __listen_response(self, response): if '/chat/history' not in response.url: return answer = '' chat = {} json_data = await response.json() chats = list(dict.values(glom(json_data, 'data.chats', default={}))) # 选择目标chat for _chat in chats: if not _chat.get('role', '') == 'robot': continue content = glom(_chat, 'message.0.content', default='') if not content: continue chat = _chat break if not chat: return answer = glom(chat, 'message.0.content', default="") # 搜索结果 if not self.think: search_result_list = glom(chat, 'searchCitations.list', default=[]) # 保存搜索结果 ai_search_result_list = [] for search_result in search_result_list: url = search_result.get('url', '') title = search_result.get('title', '') desc = search_result.get('wild_abstract', '') host_name = search_result.get('site', '') date = search_result.get('date', '') logger.debug(f"ai参考资料: [{host_name}][{date}]{title}({url})") ai_search_result_list.append(AiSearchResult( url=url, title=title, host_name=host_name, body=desc, publish_time=date )) self.ai_answer.search_result = ai_search_result_list self.ai_answer.answer = answer async def __listen_response_thinking(self, response): if '/chat/conversation/v2' not in response.url: return # 读取流式数据 data = {} search_list = [] stream = await response.body() response_text = stream.decode('utf-8') response_lines = response_text.split("\n\n") for line in response_lines: sub_lines = line.split("\n") for sub_line in sub_lines: if sub_line.startswith('data:'): json_data = json.loads(sub_line[5:]) history_need = json_data.get('historyNeed', None) search_list = json_data.get('contents', []) if history_need == 1 and search_list: break if search_list: break ai_search_result_list = [] for search_result in search_list: url = search_result.get('url', '') title = search_result.get('title', '') desc = search_result.get('siteAbstract', '') host_name = search_result.get('name', '') date = search_result.get('publishTime', '') logger.debug(f"ai参考资料: [{host_name}][{date}]{title}({url})") ai_search_result_list.append(AiSearchResult( url=url, title=title, host_name=host_name, body=desc, publish_time=date )) self.ai_answer.search_result = ai_search_result_list async def check_login(self): # 找登录后才会出现的侧边栏 try: await self.browser_page.locator("//div[@id='eb_sidebar']").wait_for(state='attached', timeout=20000) except Exception: # 更新session状态 await AiSeoApis.update_spider_session(self.session_info['id'], 2) raise Exception(f"{self.get_platform_name()}登录失败 session_id: {self.session_info['id']}") def get_platform_id(self) -> int: return 6 def get_platform_name(self) -> str: return 'YiYan' def handle_listen_response_error(self, func): """ 装饰器 用于处理请求回调中的异常 :param func: :return: """ @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True) # 标记失败状态 记录异常 self.fail_status = True self.fail_exception = e self.completed_event.set() return wrapper async def run(): playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=config.BROWSER_HANDLESS, chromium_sandbox=config.BROWSER_ENABLE_SANDBOX, ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS, channel="chrome", args=config.BROWSER_ARGS) spider = YiYanSpider(browser, '你好', '') await spider.run() if __name__ == '__main__': asyncio.run(run())