6 Commits

Author SHA1 Message Date
zzx d4c34bda64 fix(ai_seo): 修复每日搜索额度判断逻辑 1 month ago
zzx f3a7bd9539 feat:修复纳米盒禁用智能体问题 1 month ago
zzx ac93ca5e3b refactor(ai_seo): 优化一言蜘蛛的思考模式选择逻辑 1 month ago
zzx d64013d2ec refactor(ai_seo): 重构 kimi 爬虫以适应网页结构变化 1 month ago
zzx 90870aaf5d refactor(ai_seo): 重构 DeepSeek 爬虫 1 month ago
zzx 184c7ad851 refactor(ai_seo): 重构通义灵码爬虫 1 month ago
  1. 56
      spiders/ai_seo/deepseek.py
  2. 60
      spiders/ai_seo/kimi.py
  3. 6
      spiders/ai_seo/metaso.py
  4. 10
      spiders/ai_seo/nanometer.py
  5. 50
      spiders/ai_seo/tongyi.py
  6. 7
      spiders/ai_seo/yiyan.py

56
spiders/ai_seo/deepseek.py

@ -37,15 +37,16 @@ class DeepseekSpider(AbstractAiSeoSpider):
search_btn = self.browser_page.locator("span:text('联网搜索')").locator('..')
if await search_btn.is_visible():
await search_btn.click()
self.think = True
if self.think:
# 开启深度思考
think_btn = self.browser_page.locator("span:text('深度思考 (R1)')").locator('..')
# 开启深度思考
think_btn = self.browser_page.locator("span:text('深度思考')").locator('..')
if await think_btn.is_visible():
styles = css_to_dict(await think_btn.get_attribute('style'))
if styles.get('--ds-button-color') == '#fff':
await think_btn.click()
await asyncio.sleep(1)
chat_input_element = self.browser_page.locator("//textarea[@id='chat-input']")
# styles = css_to_dict(await think_btn.get_attribute('style'))
# if styles.get('--ds-button-color') == '#fff':
await think_btn.click()
await asyncio.sleep(1)
chat_input_element = self.browser_page.locator("//textarea[@placeholder='给 DeepSeek 发送消息 ']")
await chat_input_element.click()
# 输入提问词
await self.browser_page.keyboard.type(self.prompt)
@ -72,7 +73,8 @@ class DeepseekSpider(AbstractAiSeoSpider):
await think_element.nth(-1).click()
await asyncio.sleep(2)
# 获取回答元素
answer = self.browser_page.locator("//div[@class='ds-markdown ds-markdown--block']").nth(-1)
# answer = self.browser_page.locator("//div[@class='ds-markdown ds-markdown--block']").nth(-1)
answer = self.browser_page.locator("//div[contains(@class, 'ds-message')]").nth(-1)
box = await answer.bounding_box()
# 设置视口大小
await self.browser_page.set_viewport_size({
@ -130,9 +132,11 @@ class DeepseekSpider(AbstractAiSeoSpider):
body = stream.decode('utf-8')
datas = body.split("\n\n")
for data_str in datas:
# 返回数据为空 跳过
if not data_str:
continue
data_str = data_str.replace('data: ', '')
# 服务器繁忙 跳过
try:
data = json.loads(data_str)
if glom(data, 'v.0.v', default='') == 'TIMEOUT':
@ -145,27 +149,27 @@ class DeepseekSpider(AbstractAiSeoSpider):
logger.debug(f"获取到联网搜索结果")
search_result_list = data.get('v', [])
search_result_lists.extend(search_result_list)
# # 保存搜索结果
# ai_search_result_list = []
# for search_result in search_result_list:
# url = search_result.get('url', '')
# title = search_result.get('title', '')
# body = search_result.get('snippet', '')
# publish_time = search_result.get('published_at', '')
# host_name = search_result.get('site_name', '未知')
# ai_result = AiSearchResult(url=url, title=title, body=body, publish_time=publish_time, host_name=host_name)
# if ai_result.title and ai_result.url:
# ai_search_result_list.append(ai_result)
# logger.debug(f"ai参考资料: [{host_name}]{title}({url})")
# if ai_search_result_list:
# self.ai_answer.search_result = ai_search_result_list
# self.search_result_count = len(self.ai_answer.search_result)
# 保存搜索结果
ai_search_result_list = []
for search_result in search_result_list:
url = search_result.get('url', '')
title = search_result.get('title', '')
body = search_result.get('snippet', '')
publish_time = search_result.get('published_at', '')
host_name = search_result.get('site_name', '未知')
ai_result = AiSearchResult(url=url, title=title, body=body, publish_time=publish_time, host_name=host_name)
if ai_result.title and ai_result.url:
ai_search_result_list.append(ai_result)
logger.debug(f"ai参考资料: [{host_name}]{title}({url})")
if ai_search_result_list:
self.ai_answer.search_result = ai_search_result_list
self.search_result_count = len(self.ai_answer.search_result)
continue
# 是否开始返回深度思考数据
if data.get('p', '') == 'response/thinking_content':
if data.get('p', '') == 'response/fragments/1/content':
start_thinking = True
if data.get('p', '') == 'response/thinking_elapsed_secs':
if data.get('p', '') == 'response/fragments/1/elapsed_secs':
start_thinking = False
if start_thinking:
# 获取深度思考回复
@ -177,7 +181,7 @@ class DeepseekSpider(AbstractAiSeoSpider):
value = glom(data, target, default="")
thinking_text = thinking_text + str(value)
# 是否开始返回回复数据
if data.get('p', '') == 'response/content':
if data.get('p', '') == 'response/fragments/2/content':
start_content = True
if start_content:
# 获取ai回复

60
spiders/ai_seo/kimi.py

@ -2,6 +2,7 @@
import asyncio
from functools import partial, wraps
import pyperclip
from playwright.async_api import Browser
from abs_spider import AbstractAiSeoSpider
@ -43,13 +44,16 @@ class KimiSpider(AbstractAiSeoSpider):
await self.browser_page.goto('https://www.kimi.ai', timeout=600000)
self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword)
await asyncio.sleep(3)
if self.think:
think_btn = self.browser_page.locator("span:text('长思考 (k1.5)')").locator('..')
if await think_btn.is_visible():
clazz = (await think_btn.get_attribute('class')).split(' ')
if 'open' not in clazz:
await think_btn.click()
await asyncio.sleep(2)
confirm_btn = self.browser_page.locator('//button[text()="知道了"]')
if await confirm_btn.is_visible():
await confirm_btn.click()
# if self.think:
# think_btn = self.browser_page.locator("span:text('长思考 (k1.5)')").locator('..')
# if await think_btn.is_visible():
# clazz = (await think_btn.get_attribute('class')).split(' ')
# if 'open' not in clazz:
# await think_btn.click()
# await asyncio.sleep(2)
chat_input_element = self.browser_page.locator("//div[@class='chat-input']")
await chat_input_element.click()
# 输入提问词
@ -57,10 +61,19 @@ class KimiSpider(AbstractAiSeoSpider):
await asyncio.sleep(2)
await self.browser_page.keyboard.press('Enter')
# 监听请求
self.browser_page.on('response', partial(self.__listen_response))
await self.completed_event.wait()
# self.browser_page.on('response', partial(self.__listen_response))
# await self.completed_event.wait()
await asyncio.sleep(2)
# 等待复制按钮可见
copy_btn_xpath = "//div[@class='segment-assistant-actions-content']/div[@class='simple-button size-small'][1]"
await self.browser_page.wait_for_selector(copy_btn_xpath, timeout=600000)
copy_btn = self.browser_page.locator(copy_btn_xpath)
await copy_btn.click()
# 读取剪贴板
self.ai_answer.answer = pyperclip.paste()
logger.debug(f"ai回复: {self.ai_answer.answer}")
# 报错检查
if self.fail_status:
await AiSeoApis.update_spider_session(self.session_info['id'], 2)
@ -85,6 +98,35 @@ class KimiSpider(AbstractAiSeoSpider):
search_list_element = self.browser_page.locator("//div[@class='search-plus']")
if await search_list_element.is_visible() and not await search_list_content_element.is_visible():
await search_list_element.click()
# 获取搜索结果
search_list = []
search_elements = await self.browser_page.locator("//div[@class='sites']/a[@class='site']").all()
for search_element in search_elements:
result = AiSearchResult()
result.url = await search_element.get_attribute('href')
children = await search_element.locator("xpath=./child::*").all()
result.title = await children[1].inner_text()
result.body = await children[2].inner_text()
# 获取信源信息元素
result_host_elements = await children[0].locator("xpath=./child::*").all()
try:
result.host_name = await result_host_elements[1].inner_text()
except Exception:
result.host_name = ''
if len(result_host_elements) >= 3:
try:
result.publish_time = await result_host_elements[2].inner_text()
result.publish_time = result.publish_time.replace('/', '-')
except Exception:
result.publish_time = 0
logger.error(f"{result.title}获取发布时间失败")
search_list.append(result)
self.ai_answer.search_result = search_list
# 截图
screenshot_path = self._get_screenshot_path()
self.ai_answer.screenshot_file = screenshot_path

6
spiders/ai_seo/metaso.py

@ -37,8 +37,9 @@ class MetasoSpider(AbstractAiSeoSpider):
info = await self.browser_page.wait_for_selector('#left-menu > div > div.LeftMenu_footer__qsJdJ > div > div > div > button', timeout=600000)
await info.click()
# edu = await self.browser_page.wait_for_selector('body > div:nth-child(51) > div > div > div > div > div.MuiBox-root.css-o45jia > div:nth-child(2) > div.MuiListItemText-root.css-rkhw2f', timeout=600000)
edu = self.browser_page.locator('//div[@aria-label="每天有100搜索额度"]/following-sibling::div[1]//span[contains(@class, "MuiTypography-root")]')
edu = await self.browser_page.wait_for_selector(
'//div[@aria-label="每天有100搜索额度"]/following-sibling::div[1]//span[contains(@class, "MuiTypography-root")]',
timeout=600000)
edu_txt= await edu.text_content()
if edu_txt == '0':
await AiSeoApis.update_spider_session(self.session_info['id'], 3)
@ -54,7 +55,6 @@ class MetasoSpider(AbstractAiSeoSpider):
await self.browser_page.reload()
# await self.completed_event.wait()
# 等待指定元素
#//*[@id="search-content-container-8626530479804592128"]/div[2]/button
copy_button = await self.browser_page.wait_for_selector('//*[starts-with(@id, "search-content-container-")]/div[2]/div[3]/button', timeout=600000)
# 点击复制按钮
await copy_button.click()

10
spiders/ai_seo/nanometer.py

@ -35,8 +35,7 @@ class NanometerSpider(AbstractAiSeoSpider):
# 开始操作
await self.browser_page.goto(self.get_home_url(), timeout=600000)
#开启深度思考
# await self.browser_page.locator('//*[@id="NMAI_SIDEBAR_MENU"]/div/div[2]').click()
chat_input_element = self.browser_page.locator('//*[@id="NM-ASSISTANT_chat_input"]')
chat_input_element = self.browser_page.locator('//*[@id="NM-ASSISTANT_chat_input"]//textarea')
# 输入提问词
await chat_input_element.press_sequentially(self.prompt)
await self.browser_page.keyboard.press('Enter')
@ -68,6 +67,13 @@ class NanometerSpider(AbstractAiSeoSpider):
div_height = div_box['height'] if div_box else None
logger.debug(f'answer_element: {div_height}')
view_port_height = div_box['height']+ 500
# 修改标题
title = iframe.locator("//h1[@id='message-prompt']")
title_text = await title.inner_text()
new_title = title_text.replace('(禁用智能体)', '')
await title.evaluate(f"node => node.innerHTML = '{new_title}'")
# 调整视口大小
await self.browser_page.set_viewport_size({
'width': 1920,

50
spiders/ai_seo/tongyi.py

@ -30,6 +30,10 @@ class TongyiSpider(AbstractAiSeoSpider):
# 初始化信息
self._init_data()
await self.browser_page.goto(self.get_home_url(), timeout=600000)
# 点掉提示框
confirm_btn = self.browser_page.locator('//button[.//span[text()="我知道了"]]')
if await confirm_btn.is_visible():
await confirm_btn.click()
if self.think:
search_btn = self.browser_page.locator("div:text('深度思考')")
if await search_btn.is_visible():
@ -94,7 +98,6 @@ class TongyiSpider(AbstractAiSeoSpider):
stream = await response.body()
response_text = stream.decode('utf-8')
datas = response_text.split("\n")
# print("datas:",datas)
# 合规数据转成字典
for data_str in datas:
if not data_str or data_str == 'data: [DONE]':
@ -109,46 +112,29 @@ class TongyiSpider(AbstractAiSeoSpider):
contents = data.get('contents', [])
# 保存搜索内容
ai_search_result_list = []
search_result_list = list()
for content in contents:
content_type = content.get('contentType', '')
if content_type == 'plugin':
if content_type == 'referenceLink':
logger.debug(f"获取到联网搜索结果")
if self.think:
search_result_list = glom(content, 'content.pluginResult.links', default=[])
else:
search_result_list = glom(content, 'content.pluginResult.links.-1.search_results', default=[])
# for search_result in search_result_list:
# url = search_result.get('url', '')
# title = search_result.get('title', '')
# body = search_result.get('body', '')
# host_name = search_result.get('host_name', '未知')
# publish_time = search_result.get('time', 0)
# logger.debug(f"ai参考资料: [{host_name}]{title}({url})")
# ai_search_result_list.append(
# AiSearchResult(title=title, url=url, body=body, host_name=host_name, publish_time=publish_time)
# )
if content_type == 'think':
search_result_list = glom(content, 'content.links', default=[])
for search_result in search_result_list:
url = search_result.get('url', '')
title = search_result.get('title', '')
body = search_result.get('body', '')
host_name =title.rsplit('-', 1)[1] if '-' in title else '未知'
publish_time = search_result.get('time', 0)
logger.debug(f"ai参考资料: [{host_name}]{title}({url})")
ai_search_result_list.append(
AiSearchResult(title=title, url=url, body=body, host_name=host_name, publish_time=publish_time, is_referenced='1')
)
if content_type == 'text':
logger.debug(f'获取到ai回复结果')
answer = content.get('content', '').get('content', '')
answer = content.get('content', '')
logger.debug(f"ai回复: {answer}")
self.ai_answer.answer = answer
pattern = r'ty-reference]\((\d+)\)'
index_data = list(set(re.findall(pattern, self.ai_answer.answer)))
for index, search_result in enumerate(search_result_list):
url = search_result.get('url', '')
title = search_result.get('title', '')
body = search_result.get('body', '')
host_name = search_result.get('host_name', '未知')
publish_time = search_result.get('time', 0)
if str(index+1) in index_data:
is_referenced = "1"
else:
is_referenced = "0"
logger.debug(f"ai参考资料: [{host_name}]{title}({url})")
ai_search_result_list.append(
AiSearchResult(title=title, url=url, body=body, host_name=host_name, publish_time=publish_time , is_referenced=is_referenced)
)
if ai_search_result_list:
self.ai_answer.search_result = ai_search_result_list
self.completed_event.set()

7
spiders/ai_seo/yiyan.py

@ -35,10 +35,9 @@ class YiYanSpider(AbstractAiSeoSpider):
# 检查登录状态
await self.check_login()
if self.think:
think_btn = self.browser_page.locator("//span[text()='思考(X1 Turbo)']/parent::div")
clazz = await think_btn.get_attribute('class')
if 'active' not in clazz:
await think_btn.click()
think_btn = self.browser_page.locator("//span[text()='思考·自动']/parent::div")
await think_btn.click()
await self.browser_page.locator("//div[contains(@class, 'dtModeItem__')][2]").click()
# 开始操作
chat_input_element = self.browser_page.locator("//div[@class='yc-editor']")
await chat_input_element.click()

Loading…
Cancel
Save