You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
8.7 KiB

# coding=utf-8
import asyncio
from functools import partial, wraps
import pyperclip
from playwright.async_api import Browser
from abs_spider import AbstractAiSeoSpider
from domain.ai_seo import AiAnswer, AiSearchResult
from utils import create_logger
from glom import glom, Coalesce
from utils.ai_seo_api_utils import AiSeoApis
logger = create_logger(__name__)
class KimiSpider(AbstractAiSeoSpider):
async def do_check_session(self) -> bool:
self.completed_event = asyncio.Event()
await self.browser_page.goto(self.get_home_url(), timeout=300000)
await asyncio.sleep(3)
user_name_element = self.browser_page.locator("//span[@class='user-name']")
if await user_name_element.is_visible() and not await user_name_element.text_content() == '登录':
return True
return False
def __init__(self, browser: Browser, prompt: str, keyword: str, think: bool = False):
super().__init__(browser, prompt, keyword, think)
self.__listen_response = self.handle_listen_response_error(self.__listen_response)
def get_home_url(self) -> str:
return 'https://www.kimi.ai'
def get_platform_id(self) -> int:
return 4
def get_platform_name(self) -> str:
return 'Kimi'
async def _do_spider(self) -> AiAnswer:
self.completed_event = asyncio.Event()
await self.browser_page.goto('https://www.kimi.ai', timeout=600000)
self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword)
await asyncio.sleep(3)
confirm_btn = self.browser_page.locator('//button[text()="知道了"]')
if await confirm_btn.is_visible():
await confirm_btn.click()
# if self.think:
# think_btn = self.browser_page.locator("span:text('长思考 (k1.5)')").locator('..')
# if await think_btn.is_visible():
# clazz = (await think_btn.get_attribute('class')).split(' ')
# if 'open' not in clazz:
# await think_btn.click()
# await asyncio.sleep(2)
chat_input_element = self.browser_page.locator("//div[@class='chat-input']")
await chat_input_element.click()
# 输入提问词
await self.browser_page.keyboard.type(self.prompt)
await asyncio.sleep(2)
await self.browser_page.keyboard.press('Enter')
# 监听请求
# self.browser_page.on('response', partial(self.__listen_response))
# await self.completed_event.wait()
await asyncio.sleep(2)
# 等待复制按钮可见
copy_btn_xpath = "//div[@class='segment-assistant-actions-content']/div[@class='simple-button size-small'][1]"
await self.browser_page.wait_for_selector(copy_btn_xpath, timeout=600000)
copy_btn = self.browser_page.locator(copy_btn_xpath)
await copy_btn.click()
# 读取剪贴板
self.ai_answer.answer = pyperclip.paste()
logger.debug(f"ai回复: {self.ai_answer.answer}")
# 报错检查
if self.fail_status:
await AiSeoApis.update_spider_session(self.session_info['id'], 2)
raise self.fail_exception
# 关闭侧边栏
sidebar_element = self.browser_page.locator("//div[@class='expand-btn']")
if await sidebar_element.is_visible():
await sidebar_element.click()
# 获取回答元素
answer_element = self.browser_page.locator("//div[@class='segment-container']").nth(-1)
box = await answer_element.bounding_box()
logger.debug(f'answer_element: {box}')
view_port_height = box['height'] + 500
# 调整视口大小
await self.browser_page.set_viewport_size({
'width': 1920,
'height': int(view_port_height)
})
# 打开搜索结果
search_list_content_element = self.browser_page.locator("//div[contains(@class, 'side-console-container')]")
search_list_element = self.browser_page.locator("//div[@class='search-plus']")
if await search_list_element.is_visible() and not await search_list_content_element.is_visible():
await search_list_element.click()
# 获取搜索结果
search_list = []
search_elements = await self.browser_page.locator("//div[@class='sites']/a[@class='site']").all()
for search_element in search_elements:
result = AiSearchResult()
result.url = await search_element.get_attribute('href')
children = await search_element.locator("xpath=./child::*").all()
result.title = await children[1].inner_text()
result.body = await children[2].inner_text()
# 获取信源信息元素
result_host_elements = await children[0].locator("xpath=./child::*").all()
try:
result.host_name = await result_host_elements[1].inner_text()
except Exception:
result.host_name = ''
if len(result_host_elements) >= 3:
try:
result.publish_time = await result_host_elements[2].inner_text()
result.publish_time = result.publish_time.replace('/', '-')
except Exception:
result.publish_time = 0
logger.error(f"{result.title}获取发布时间失败")
search_list.append(result)
self.ai_answer.search_result = search_list
# 截图
screenshot_path = self._get_screenshot_path()
self.ai_answer.screenshot_file = screenshot_path
await self.browser_page.screenshot(path=screenshot_path)
return self.ai_answer
async def __listen_response(self, response):
if '/segment/scroll' in response.url:
json_data = await response.json()
if json_data['items']:
logger.debug(json_data)
detail = json_data['items'][-1]
if 'error' in detail:
logger.error(f"kimi回复错误: {detail['error']['detail']}")
self.fail_status = True
self.fail_exception = Exception(detail['error']['detail'])
self.completed_event.set()
return
content = detail['content']
if self.think:
self.ai_answer.search_result = self.get_search_list_enable_think(detail)
else:
self.ai_answer.search_result = self.get_search_list_disable_think(detail)
self.ai_answer.answer = content
logger.debug(f"ai回复: {content}")
self.completed_event.set()
def handle_listen_response_error(self, func):
"""
装饰器 用于处理请求回调中的异常
:param func:
:return:
"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True)
# 标记失败状态 记录异常
self.fail_status = True
self.fail_exception = e
self.completed_event.set()
return wrapper
def get_search_list_disable_think(self, detail):
"""
未开启深度思考时 获取搜索结果
:param detail:
:return:
"""
answer_search_list = []
search_result_list = detail.get('search_plus', [])
for search_result in search_result_list:
event = search_result.get('event', '')
msg = search_result.get('msg', {})
msg_type = msg.get('type', '')
if event == 'search_plus' and msg_type == 'get_res':
answer_search_list.append(
AiSearchResult(msg['title'], msg['url'], msg['site_name'], msg['snippet'], msg['date']))
logger.debug(f"ai参考资料: {msg['title']}({msg['url']})")
return answer_search_list
def get_search_list_enable_think(self, detail):
"""
开启深度思考时 获取搜索结果
:param detail:
:return:
"""
answer_search_list = []
keys = 'contents.zones.0.sections.0.k1.search_results'
search_result_list = glom(detail, keys, default=[])
for search_result in search_result_list:
answer_search_list.append(
AiSearchResult(search_result['title'], search_result['url'], search_result['site_name'], search_result['snippet'], search_result['date']))
logger.debug(f"ai参考资料: {search_result['title']}({search_result['url']})")
return answer_search_list