# coding=utf-8 import asyncio from functools import partial, wraps from playwright.async_api import Browser from abs_spider import AbstractAiSeoSpider from domain.ai_seo import AiAnswer, AiSearchResult from utils import create_logger from glom import glom, Coalesce from utils.ai_seo_api_utils import AiSeoApis logger = create_logger(__name__) class KimiSpider(AbstractAiSeoSpider): async def do_check_session(self) -> bool: self.completed_event = asyncio.Event() await self.browser_page.goto(self.get_home_url(), timeout=300000) await asyncio.sleep(3) user_name_element = self.browser_page.locator("//span[@class='user-name']") if await user_name_element.is_visible() and not await user_name_element.text_content() == '登录': return True return False def __init__(self, browser: Browser, prompt: str, keyword: str, think: bool = False): super().__init__(browser, prompt, keyword, think) self.__listen_response = self.handle_listen_response_error(self.__listen_response) def get_home_url(self) -> str: return 'https://www.kimi.ai' def get_platform_id(self) -> int: return 4 def get_platform_name(self) -> str: return 'Kimi' async def _do_spider(self) -> AiAnswer: self.completed_event = asyncio.Event() await self.browser_page.goto('https://www.kimi.ai', timeout=600000) self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword) await asyncio.sleep(3) if self.think: think_btn = self.browser_page.locator("span:text('长思考 (k1.5)')").locator('..') if await think_btn.is_visible(): clazz = (await think_btn.get_attribute('class')).split(' ') if 'open' not in clazz: await think_btn.click() await asyncio.sleep(2) chat_input_element = self.browser_page.locator("//div[@class='chat-input']") await chat_input_element.click() # 输入提问词 await self.browser_page.keyboard.type(self.prompt) await asyncio.sleep(2) await self.browser_page.keyboard.press('Enter') # 监听请求 self.browser_page.on('response', partial(self.__listen_response)) await self.completed_event.wait() await asyncio.sleep(2) # 报错检查 if self.fail_status: await AiSeoApis.update_spider_session(self.session_info['id'], 2) raise self.fail_exception # 关闭侧边栏 sidebar_element = self.browser_page.locator("//div[@class='expand-btn']") if await sidebar_element.is_visible(): await sidebar_element.click() # 获取回答元素 answer_element = self.browser_page.locator("//div[@class='segment-container']").nth(-1) box = await answer_element.bounding_box() logger.debug(f'answer_element: {box}') view_port_height = box['height'] + 500 # 调整视口大小 await self.browser_page.set_viewport_size({ 'width': 1920, 'height': int(view_port_height) }) # 打开搜索结果 search_list_content_element = self.browser_page.locator("//div[contains(@class, 'side-console-container')]") search_list_element = self.browser_page.locator("//div[@class='search-plus']") if await search_list_element.is_visible() and not await search_list_content_element.is_visible(): await search_list_element.click() # 截图 screenshot_path = self._get_screenshot_path() self.ai_answer.screenshot_file = screenshot_path await self.browser_page.screenshot(path=screenshot_path) return self.ai_answer async def __listen_response(self, response): if '/segment/scroll' in response.url: json_data = await response.json() if json_data['items']: logger.debug(json_data) detail = json_data['items'][-1] if 'error' in detail: logger.error(f"kimi回复错误: {detail['error']['detail']}") self.fail_status = True self.fail_exception = Exception(detail['error']['detail']) self.completed_event.set() return content = detail['content'] if self.think: self.ai_answer.search_result = self.get_search_list_enable_think(detail) else: self.ai_answer.search_result = self.get_search_list_disable_think(detail) self.ai_answer.answer = content logger.debug(f"ai回复: {content}") self.completed_event.set() def handle_listen_response_error(self, func): """ 装饰器 用于处理请求回调中的异常 :param func: :return: """ @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True) # 标记失败状态 记录异常 self.fail_status = True self.fail_exception = e self.completed_event.set() return wrapper def get_search_list_disable_think(self, detail): """ 未开启深度思考时 获取搜索结果 :param detail: :return: """ answer_search_list = [] search_result_list = detail.get('search_plus', []) for search_result in search_result_list: event = search_result.get('event', '') msg = search_result.get('msg', {}) msg_type = msg.get('type', '') if event == 'search_plus' and msg_type == 'get_res': answer_search_list.append( AiSearchResult(msg['title'], msg['url'], msg['site_name'], msg['snippet'], msg['date'])) logger.debug(f"ai参考资料: {msg['title']}({msg['url']})") return answer_search_list def get_search_list_enable_think(self, detail): """ 开启深度思考时 获取搜索结果 :param detail: :return: """ answer_search_list = [] keys = 'contents.zones.0.sections.0.k1.search_results' search_result_list = glom(detail, keys, default=[]) for search_result in search_result_list: answer_search_list.append( AiSearchResult(search_result['title'], search_result['url'], search_result['site_name'], search_result['snippet'], search_result['date'])) logger.debug(f"ai参考资料: {search_result['title']}({search_result['url']})") return answer_search_list