You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
12 KiB

# -*- coding: utf-8 -*-
import asyncio
import json
import re
import time
from functools import partial, wraps
from json import JSONDecodeError
import ftfy
import pyperclip
from playwright.async_api import Browser, async_playwright
from utils.ai_seo_api_utils import AiSeoApis
import config
from abs_spider import AbstractAiSeoSpider
from domain.ai_seo import AiAnswer, AiSearchResult
from utils import create_logger
from utils.captcha import get_slide_offset_from_base64
logger = create_logger(__name__)
class MetasoSpider(AbstractAiSeoSpider):
def __init__(self, browser: Browser, prompt: str, keyword: str, load_session: bool = True):
super().__init__(browser, prompt, keyword, load_session)
self.__listen_response = self.handle_listen_response_error(self.__listen_response)
def get_home_url(self) -> str:
return 'https://metaso.cn/'
async def _do_spider(self) -> AiAnswer:
# 初始化信息
self._init_data()
self.browser_page.on('response', partial(self.__listen_response))
await self.browser_page.goto(self.get_home_url(), timeout=600000)
await asyncio.sleep(2)
info = await self.browser_page.wait_for_selector('//*[@id="left-menu"]/div/div[3]/div/div/div', timeout=600000)
await info.click()
# edu = await self.browser_page.wait_for_selector(
# '//div[@aria-label="每天有100搜索额度"]/following-sibling::div[1]//span[contains(@class, "MuiTypography-root")]',
# timeout=600000)
# edu_txt= await edu.text_content()
# if edu_txt == '0':
# await AiSeoApis.update_spider_session(self.session_info['id'], 3)
# raise "session额度已用完!"
# 开始操作
chat_input_element = self.browser_page.locator("//textarea[contains(@class, 'search-consult-textarea')]")
# 输入提问词
await chat_input_element.fill(self.prompt)
await self.browser_page.keyboard.press('Enter')
# 监听请求
await asyncio.sleep(2)
await self.browser_page.reload()
# await self.completed_event.wait()
# 等待指定元素
copy_button = await self.browser_page.wait_for_selector('//*[starts-with(@id, "search-content-container-")]/div[2]/div[3]/button', timeout=600000)
# 点击复制按钮
await copy_button.click()
# 读取剪贴板
self.ai_answer.answer = pyperclip.paste()
logger.debug(f'ai回复内容: {self.ai_answer}')
# 获取来源数据
try:
await self.browser_page.wait_for_selector("//div[contains(@class, 'meta-ordered-list_list-item')]/span", timeout=60000)
search_items = self.browser_page.locator("//div[contains(@class, 'meta-ordered-list_list-item')]/span")
search_item_count = await search_items.count()
logger.debug(f'来源数据: {search_item_count}')
await asyncio.sleep(5)
search_results = []
for i in range(search_item_count):
search_result = AiSearchResult()
search_item = search_items.nth(i)
# 抽取链接和标题
a = search_item.locator("xpath=./a")
# 抽取时间
publish_date_element = search_item.locator("xpath=./span")
if await a.is_visible():
search_result.title = await a.text_content()
search_result.url = await a.get_attribute('href')
if await publish_date_element.count() > 0:
publish_date_element = search_item.locator("xpath=./span").nth(-1)
publish_str = await publish_date_element.text_content()
search_result.publish_time = publish_str.replace('[', '').replace(']', '')
search_results.append(search_result)
self.ai_answer.search_result = search_results
except TimeoutError:
logger.error('没有搜索结果')
# 报错检查
if self.fail_status:
raise self.fail_exception
# 获取回答元素
answer_element = self.browser_page.locator("//div[contains(@class, 'Search_search-result-container')]")
box = await answer_element.bounding_box()
logger.debug(f'answer_element: {box}')
view_port_height = box['height'] + 300
# 调整视口大小
await self.browser_page.set_viewport_size({
'width': 1920,
'height': int(view_port_height)
})
# 截图
screenshot_path = self._get_screenshot_path()
await self.browser_page.screenshot(path=screenshot_path)
self.ai_answer.screenshot_file = screenshot_path
return self.ai_answer
def get_platform_id(self) -> int:
return 13
def get_platform_name(self) -> str:
return 'Metaso'
async def __listen_response(self, response):
url = response.url
if response.status == 200:
# if 'searchV2' in url:
# answer = ''
# results = []
# search_results = list()
# response_text = ftfy.fix_text(await response.text())
# event_lines = response_text.split('\n\n')
# self.completed_event.set()
# for line in event_lines:
# if line.startswith('data:'):
# line = line[5:]
# try:
# event_json = json.loads(line)
# except JSONDecodeError:
# continue
# # 开始event_json
# type = event_json.get('type')
# # 获取到搜索结果
# if type == 'set-reference':
# search_results = event_json.get('list', [])
# # for search_result in search_results:
# # result = AiSearchResult(title=search_result.get('title', ''),
# # url=search_result.get('url', ''),
# # host_name=search_result.get('author', ''),
# # body=search_result.get('displaySource'),
# # publish_time=search_result.get('publish_time', ''))
# # results.append(result)
# # self.ai_answer.search_result = results
# # 获取到回答内容
# if type == 'append-text':
# answer = answer + event_json.get('text', '')
# pattern = r'\[(\d+)\]'
# index_data = list(set(re.findall(pattern, answer)))
# for index,search_result in enumerate(search_results):
# if str(index+1) in index_data:
# result = AiSearchResult(title=search_result.get('title', ''),
# url=search_result.get('url', ''),
# host_name=search_result.get('author', ''),
# body=search_result.get('displaySource'),
# publish_time=search_result.get('publish_time', ''),
# is_referenced="1")
# else:
# result = AiSearchResult(title=search_result.get('title', ''),
# url=search_result.get('url', ''),
# host_name=search_result.get('author', ''),
# body=search_result.get('displaySource'),
# publish_time=search_result.get('publish_time', ''),
# is_referenced="0")
# results.append(result)
# self.ai_answer.search_result = results
# self.ai_answer.answer = answer
# self.completed_event.set()
if 'api/captcha/get' in url:
logger.info(await response.json())
captcha_data = await response.json()
bg = captcha_data.get("repData").get('originalImageBase64')
slider = captcha_data.get("repData").get('jigsawImageBase64')
x_box = get_slide_offset_from_base64(bg,slider)
slider = await self.browser_page.query_selector('.verify-move-block') # 替换为实际滑块的 CSS 选择器
if slider:
box = await slider.bounding_box()
start_x = box['x'] + box['width'] / 2 # 获取滑块的初始位置 X
start_y = box['y'] + box['height'] / 2 # 获取滑块的初始位置 Y
# 目标位置 (替换为你想要的目标位置)
target_x = start_x + int(x_box) # 向右移动 100px
target_y = start_y
await self.browser_page.mouse.move(start_x, start_y)
await self.browser_page.mouse.down()
await self.browser_page.mouse.move(target_x, target_y, steps=20) # 分步模拟滑动
await self.browser_page.mouse.up()
time.sleep(5)
else:
pass
def handle_listen_response_error(self, func):
"""
装饰器 用于处理请求回调中的异常
:param func:
:return:
"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True)
# 标记失败状态 记录异常
self.fail_status = True
self.fail_exception = e
self.completed_event.set()
return wrapper
async def do_check_session(self) -> bool:
try:
await self.browser_page.goto(self.get_home_url(), timeout=30000)
await asyncio.sleep(2)
info = await self.browser_page.wait_for_selector(
'#left-menu > div > div.LeftMenu_footer__qsJdJ > div > div > div > button', timeout=10000)
await info.click()
edu = self.browser_page.locator(
'//div[@aria-label="每天有100搜索额度"]/following-sibling::div[1]//span[contains(@class, "MuiTypography-root")]')
edu_txt = await edu.text_content()
if edu_txt == '0':
return False
# 开始操作
chat_input_element = self.browser_page.locator("//textarea[contains(@class, 'search-consult-textarea')]")
# 输入提问词
await chat_input_element.fill(self.prompt)
logger.info(f"[{self.get_platform_name()}]查询还剩{edu_txt} 次")
return True
except Exception:
return False
async def run():
# playwright = await async_playwright().start()
# browser = await playwright.chromium.launch(headless=False,
# chromium_sandbox=config.BROWSER_ENABLE_SANDBOX,
# ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
# channel="chrome",
# args=config.BROWSER_ARGS)
playwright = await async_playwright().start()
browser = await playwright.firefox.launch(
headless=False,
ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
args=config.BROWSER_ARGS
)
spider = MetasoSpider(browser, '2025前端工具库top5', '')
await spider.run()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(run())