You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
9.2 KiB
206 lines
9.2 KiB
# -*- coding: utf-8 -*-
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from functools import partial, wraps
|
|
from json import JSONDecodeError
|
|
|
|
import ftfy
|
|
import pyperclip
|
|
from playwright.async_api import Browser, async_playwright
|
|
from utils.ai_seo_api_utils import AiSeoApis
|
|
import config
|
|
from abs_spider import AbstractAiSeoSpider
|
|
from domain.ai_seo import AiAnswer, AiSearchResult
|
|
from utils import create_logger
|
|
|
|
logger = create_logger(__name__)
|
|
|
|
|
|
class MetasoSpider(AbstractAiSeoSpider):
|
|
|
|
def __init__(self, browser: Browser, prompt: str, keyword: str, load_session: bool = True):
|
|
super().__init__(browser, prompt, keyword, load_session)
|
|
self.__listen_response = self.handle_listen_response_error(self.__listen_response)
|
|
|
|
def get_home_url(self) -> str:
|
|
return 'https://metaso.cn/'
|
|
|
|
async def _do_spider(self) -> AiAnswer:
|
|
# 初始化信息
|
|
self._init_data()
|
|
await self.browser_page.goto(self.get_home_url(), timeout=600000)
|
|
await asyncio.sleep(2)
|
|
info = await self.browser_page.wait_for_selector('#left-menu > div > div.LeftMenu_footer__qsJdJ > div > div > div > button', timeout=600000)
|
|
await info.click()
|
|
|
|
edu = await self.browser_page.wait_for_selector('body > div:nth-child(51) > div > div > div > div > div.MuiBox-root.css-o45jia > div:nth-child(2) > div.MuiListItemText-root.css-rkhw2f', timeout=600000)
|
|
edu_txt= await edu.text_content()
|
|
if edu_txt == '0':
|
|
await AiSeoApis.update_spider_session(self.session_info['id'], 3)
|
|
raise "session额度已用完!"
|
|
# 开始操作
|
|
chat_input_element = self.browser_page.locator("//textarea[contains(@class, 'search-consult-textarea')]")
|
|
# 输入提问词
|
|
await chat_input_element.fill(self.prompt)
|
|
await self.browser_page.keyboard.press('Enter')
|
|
# 监听请求
|
|
await asyncio.sleep(2)
|
|
# self.browser_page.on('response', partial(self.__listen_response))
|
|
await self.browser_page.reload()
|
|
# await self.completed_event.wait()
|
|
# 等待指定元素
|
|
#//*[@id="search-content-container-8626530479804592128"]/div[2]/button
|
|
copy_button = await self.browser_page.wait_for_selector('//*[starts-with(@id, "search-content-container-")]/div[2]/button', timeout=600000)
|
|
# 点击复制按钮
|
|
await copy_button.click()
|
|
# 读取剪贴板
|
|
self.ai_answer.answer = pyperclip.paste()
|
|
logger.debug(f'ai回复内容: {self.ai_answer}')
|
|
# 获取来源数据
|
|
try:
|
|
await self.browser_page.wait_for_selector("//div[contains(@class, 'meta-ordered-list_list-item')]/span", timeout=60000)
|
|
search_items = self.browser_page.locator("//div[contains(@class, 'meta-ordered-list_list-item')]/span")
|
|
search_item_count = await search_items.count()
|
|
logger.debug(f'来源数据: {search_item_count}')
|
|
await asyncio.sleep(5)
|
|
search_results = []
|
|
for i in range(search_item_count):
|
|
search_result = AiSearchResult()
|
|
search_item = search_items.nth(i)
|
|
# 抽取链接和标题
|
|
a = search_item.locator("xpath=./a")
|
|
# 抽取时间
|
|
publish_date_element = search_item.locator("xpath=./span")
|
|
if await a.is_visible():
|
|
search_result.title = await a.text_content()
|
|
search_result.url = await a.get_attribute('href')
|
|
if await publish_date_element.count() > 0:
|
|
publish_date_element = search_item.locator("xpath=./span").nth(-1)
|
|
publish_str = await publish_date_element.text_content()
|
|
search_result.publish_time = publish_str.replace('[', '').replace(']', '')
|
|
search_results.append(search_result)
|
|
self.ai_answer.search_result = search_results
|
|
except TimeoutError:
|
|
logger.error('没有搜索结果')
|
|
# 报错检查
|
|
if self.fail_status:
|
|
raise self.fail_exception
|
|
# 获取回答元素
|
|
answer_element = self.browser_page.locator("//div[contains(@class, 'Search_search-result-container')]")
|
|
box = await answer_element.bounding_box()
|
|
logger.debug(f'answer_element: {box}')
|
|
view_port_height = box['height'] + 300
|
|
# 调整视口大小
|
|
await self.browser_page.set_viewport_size({
|
|
'width': 1920,
|
|
'height': int(view_port_height)
|
|
})
|
|
# 截图
|
|
screenshot_path = self._get_screenshot_path()
|
|
await self.browser_page.screenshot(path=screenshot_path)
|
|
self.ai_answer.screenshot_file = screenshot_path
|
|
return self.ai_answer
|
|
|
|
def get_platform_id(self) -> int:
|
|
return 13
|
|
|
|
def get_platform_name(self) -> str:
|
|
return 'Metaso'
|
|
|
|
async def __listen_response(self, response):
|
|
url = response.url
|
|
logger.debug(f'url: {url}')
|
|
if 'searchV2' in url:
|
|
answer = ''
|
|
results = []
|
|
search_results = list()
|
|
response_text = ftfy.fix_text(await response.text())
|
|
event_lines = response_text.split('\n\n')
|
|
self.completed_event.set()
|
|
for line in event_lines:
|
|
if line.startswith('data:'):
|
|
line = line[5:]
|
|
try:
|
|
event_json = json.loads(line)
|
|
except JSONDecodeError:
|
|
continue
|
|
# 开始event_json
|
|
type = event_json.get('type')
|
|
# 获取到搜索结果
|
|
if type == 'set-reference':
|
|
search_results = event_json.get('list', [])
|
|
# for search_result in search_results:
|
|
# result = AiSearchResult(title=search_result.get('title', ''),
|
|
# url=search_result.get('url', ''),
|
|
# host_name=search_result.get('author', ''),
|
|
# body=search_result.get('displaySource'),
|
|
# publish_time=search_result.get('publish_time', ''))
|
|
# results.append(result)
|
|
# self.ai_answer.search_result = results
|
|
# 获取到回答内容
|
|
if type == 'append-text':
|
|
answer = answer + event_json.get('text', '')
|
|
pattern = r'\[(\d+)\]'
|
|
index_data = list(set(re.findall(pattern, answer)))
|
|
for index,search_result in enumerate(search_results):
|
|
if str(index+1) in index_data:
|
|
result = AiSearchResult(title=search_result.get('title', ''),
|
|
url=search_result.get('url', ''),
|
|
host_name=search_result.get('author', ''),
|
|
body=search_result.get('displaySource'),
|
|
publish_time=search_result.get('publish_time', ''),
|
|
is_referenced="1")
|
|
else:
|
|
result = AiSearchResult(title=search_result.get('title', ''),
|
|
url=search_result.get('url', ''),
|
|
host_name=search_result.get('author', ''),
|
|
body=search_result.get('displaySource'),
|
|
publish_time=search_result.get('publish_time', ''),
|
|
is_referenced="0")
|
|
results.append(result)
|
|
self.ai_answer.search_result = results
|
|
self.ai_answer.answer = answer
|
|
self.completed_event.set()
|
|
|
|
def handle_listen_response_error(self, func):
|
|
"""
|
|
装饰器 用于处理请求回调中的异常
|
|
:param func:
|
|
:return:
|
|
"""
|
|
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True)
|
|
# 标记失败状态 记录异常
|
|
self.fail_status = True
|
|
self.fail_exception = e
|
|
self.completed_event.set()
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
async def run():
|
|
# playwright = await async_playwright().start()
|
|
# browser = await playwright.chromium.launch(headless=False,
|
|
# chromium_sandbox=config.BROWSER_ENABLE_SANDBOX,
|
|
# ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
|
|
# channel="chrome",
|
|
# args=config.BROWSER_ARGS)
|
|
playwright = await async_playwright().start()
|
|
browser = await playwright.firefox.launch(
|
|
headless=False,
|
|
ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
|
|
args=config.BROWSER_ARGS
|
|
)
|
|
spider = MetasoSpider(browser, '2025前端工具库top5', '')
|
|
await spider.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.get_event_loop().run_until_complete(run())
|