Browse Source

refactor(ai_seo): 重构 kimi 爬虫以适应网页结构变化

- 移除长思考功能,增加确认按钮点击
- 修改输入框点击和文本输入逻辑
- 添加复制按钮点击和剪贴板读取功能- 增加搜索结果获取和解析
- 调整截图功能
master
zzx 1 month ago
parent
commit
d64013d2ec
  1. 60
      spiders/ai_seo/kimi.py

60
spiders/ai_seo/kimi.py

@ -2,6 +2,7 @@
import asyncio import asyncio
from functools import partial, wraps from functools import partial, wraps
import pyperclip
from playwright.async_api import Browser from playwright.async_api import Browser
from abs_spider import AbstractAiSeoSpider from abs_spider import AbstractAiSeoSpider
@ -43,13 +44,16 @@ class KimiSpider(AbstractAiSeoSpider):
await self.browser_page.goto('https://www.kimi.ai', timeout=600000) await self.browser_page.goto('https://www.kimi.ai', timeout=600000)
self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword) self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword)
await asyncio.sleep(3) await asyncio.sleep(3)
if self.think:
think_btn = self.browser_page.locator("span:text('长思考 (k1.5)')").locator('..')
if await think_btn.is_visible():
clazz = (await think_btn.get_attribute('class')).split(' ')
if 'open' not in clazz:
await think_btn.click()
await asyncio.sleep(2)
confirm_btn = self.browser_page.locator('//button[text()="知道了"]')
if await confirm_btn.is_visible():
await confirm_btn.click()
# if self.think:
# think_btn = self.browser_page.locator("span:text('长思考 (k1.5)')").locator('..')
# if await think_btn.is_visible():
# clazz = (await think_btn.get_attribute('class')).split(' ')
# if 'open' not in clazz:
# await think_btn.click()
# await asyncio.sleep(2)
chat_input_element = self.browser_page.locator("//div[@class='chat-input']") chat_input_element = self.browser_page.locator("//div[@class='chat-input']")
await chat_input_element.click() await chat_input_element.click()
# 输入提问词 # 输入提问词
@ -57,10 +61,19 @@ class KimiSpider(AbstractAiSeoSpider):
await asyncio.sleep(2) await asyncio.sleep(2)
await self.browser_page.keyboard.press('Enter') await self.browser_page.keyboard.press('Enter')
# 监听请求 # 监听请求
self.browser_page.on('response', partial(self.__listen_response))
await self.completed_event.wait()
# self.browser_page.on('response', partial(self.__listen_response))
# await self.completed_event.wait()
await asyncio.sleep(2) await asyncio.sleep(2)
# 等待复制按钮可见
copy_btn_xpath = "//div[@class='segment-assistant-actions-content']/div[@class='simple-button size-small'][1]"
await self.browser_page.wait_for_selector(copy_btn_xpath, timeout=600000)
copy_btn = self.browser_page.locator(copy_btn_xpath)
await copy_btn.click()
# 读取剪贴板
self.ai_answer.answer = pyperclip.paste()
logger.debug(f"ai回复: {self.ai_answer.answer}")
# 报错检查 # 报错检查
if self.fail_status: if self.fail_status:
await AiSeoApis.update_spider_session(self.session_info['id'], 2) await AiSeoApis.update_spider_session(self.session_info['id'], 2)
@ -85,6 +98,35 @@ class KimiSpider(AbstractAiSeoSpider):
search_list_element = self.browser_page.locator("//div[@class='search-plus']") search_list_element = self.browser_page.locator("//div[@class='search-plus']")
if await search_list_element.is_visible() and not await search_list_content_element.is_visible(): if await search_list_element.is_visible() and not await search_list_content_element.is_visible():
await search_list_element.click() await search_list_element.click()
# 获取搜索结果
search_list = []
search_elements = await self.browser_page.locator("//div[@class='sites']/a[@class='site']").all()
for search_element in search_elements:
result = AiSearchResult()
result.url = await search_element.get_attribute('href')
children = await search_element.locator("xpath=./child::*").all()
result.title = await children[1].inner_text()
result.body = await children[2].inner_text()
# 获取信源信息元素
result_host_elements = await children[0].locator("xpath=./child::*").all()
try:
result.host_name = await result_host_elements[1].inner_text()
except Exception:
result.host_name = ''
if len(result_host_elements) >= 3:
try:
result.publish_time = await result_host_elements[2].inner_text()
result.publish_time = result.publish_time.replace('/', '-')
except Exception:
result.publish_time = 0
logger.error(f"{result.title}获取发布时间失败")
search_list.append(result)
self.ai_answer.search_result = search_list
# 截图 # 截图
screenshot_path = self._get_screenshot_path() screenshot_path = self._get_screenshot_path()
self.ai_answer.screenshot_file = screenshot_path self.ai_answer.screenshot_file = screenshot_path

Loading…
Cancel
Save