# -*- coding: utf-8 -*- import asyncio import json import re import time from functools import partial, wraps from json import JSONDecodeError import ftfy import pyperclip from playwright.async_api import Browser, async_playwright from utils.ai_seo_api_utils import AiSeoApis import config from abs_spider import AbstractAiSeoSpider from domain.ai_seo import AiAnswer, AiSearchResult from utils import create_logger from utils.captcha import get_slide_offset_from_base64 logger = create_logger(__name__) class MetasoSpider(AbstractAiSeoSpider): def __init__(self, browser: Browser, prompt: str, keyword: str, load_session: bool = True): super().__init__(browser, prompt, keyword, load_session) self.__listen_response = self.handle_listen_response_error(self.__listen_response) def get_home_url(self) -> str: return 'https://metaso.cn/' async def _do_spider(self) -> AiAnswer: # 初始化信息 self._init_data() self.browser_page.on('response', partial(self.__listen_response)) await self.browser_page.goto(self.get_home_url(), timeout=600000) await asyncio.sleep(2) info = await self.browser_page.wait_for_selector('#left-menu > div > div.LeftMenu_footer__qsJdJ > div > div > div > button', timeout=600000) await info.click() # edu = await self.browser_page.wait_for_selector('body > div:nth-child(51) > div > div > div > div > div.MuiBox-root.css-o45jia > div:nth-child(2) > div.MuiListItemText-root.css-rkhw2f', timeout=600000) edu = self.browser_page.locator('//div[@aria-label="每天有100次搜索额度"]/following-sibling::div[1]//span[contains(@class, "MuiTypography-root")]') edu_txt= await edu.text_content() if edu_txt == '0': await AiSeoApis.update_spider_session(self.session_info['id'], 3) raise "session额度已用完!" # 开始操作 chat_input_element = self.browser_page.locator("//textarea[contains(@class, 'search-consult-textarea')]") # 输入提问词 await chat_input_element.fill(self.prompt) await self.browser_page.keyboard.press('Enter') # 监听请求 await asyncio.sleep(2) await self.browser_page.reload() # await self.completed_event.wait() # 等待指定元素 #//*[@id="search-content-container-8626530479804592128"]/div[2]/button copy_button = await self.browser_page.wait_for_selector('//*[starts-with(@id, "search-content-container-")]/div[2]/button', timeout=600000) # 点击复制按钮 await copy_button.click() # 读取剪贴板 self.ai_answer.answer = pyperclip.paste() logger.debug(f'ai回复内容: {self.ai_answer}') # 获取来源数据 try: await self.browser_page.wait_for_selector("//div[contains(@class, 'meta-ordered-list_list-item')]/span", timeout=60000) search_items = self.browser_page.locator("//div[contains(@class, 'meta-ordered-list_list-item')]/span") search_item_count = await search_items.count() logger.debug(f'来源数据: {search_item_count}') await asyncio.sleep(5) search_results = [] for i in range(search_item_count): search_result = AiSearchResult() search_item = search_items.nth(i) # 抽取链接和标题 a = search_item.locator("xpath=./a") # 抽取时间 publish_date_element = search_item.locator("xpath=./span") if await a.is_visible(): search_result.title = await a.text_content() search_result.url = await a.get_attribute('href') if await publish_date_element.count() > 0: publish_date_element = search_item.locator("xpath=./span").nth(-1) publish_str = await publish_date_element.text_content() search_result.publish_time = publish_str.replace('[', '').replace(']', '') search_results.append(search_result) self.ai_answer.search_result = search_results except TimeoutError: logger.error('没有搜索结果') # 报错检查 if self.fail_status: raise self.fail_exception # 获取回答元素 answer_element = self.browser_page.locator("//div[contains(@class, 'Search_search-result-container')]") box = await answer_element.bounding_box() logger.debug(f'answer_element: {box}') view_port_height = box['height'] + 300 # 调整视口大小 await self.browser_page.set_viewport_size({ 'width': 1920, 'height': int(view_port_height) }) # 截图 screenshot_path = self._get_screenshot_path() await self.browser_page.screenshot(path=screenshot_path) self.ai_answer.screenshot_file = screenshot_path return self.ai_answer def get_platform_id(self) -> int: return 13 def get_platform_name(self) -> str: return 'Metaso' async def __listen_response(self, response): url = response.url if response.status == 200: # if 'searchV2' in url: # answer = '' # results = [] # search_results = list() # response_text = ftfy.fix_text(await response.text()) # event_lines = response_text.split('\n\n') # self.completed_event.set() # for line in event_lines: # if line.startswith('data:'): # line = line[5:] # try: # event_json = json.loads(line) # except JSONDecodeError: # continue # # 开始event_json # type = event_json.get('type') # # 获取到搜索结果 # if type == 'set-reference': # search_results = event_json.get('list', []) # # for search_result in search_results: # # result = AiSearchResult(title=search_result.get('title', ''), # # url=search_result.get('url', ''), # # host_name=search_result.get('author', ''), # # body=search_result.get('displaySource'), # # publish_time=search_result.get('publish_time', '')) # # results.append(result) # # self.ai_answer.search_result = results # # 获取到回答内容 # if type == 'append-text': # answer = answer + event_json.get('text', '') # pattern = r'\[(\d+)\]' # index_data = list(set(re.findall(pattern, answer))) # for index,search_result in enumerate(search_results): # if str(index+1) in index_data: # result = AiSearchResult(title=search_result.get('title', ''), # url=search_result.get('url', ''), # host_name=search_result.get('author', ''), # body=search_result.get('displaySource'), # publish_time=search_result.get('publish_time', ''), # is_referenced="1") # else: # result = AiSearchResult(title=search_result.get('title', ''), # url=search_result.get('url', ''), # host_name=search_result.get('author', ''), # body=search_result.get('displaySource'), # publish_time=search_result.get('publish_time', ''), # is_referenced="0") # results.append(result) # self.ai_answer.search_result = results # self.ai_answer.answer = answer # self.completed_event.set() if 'api/captcha/get' in url: logger.info(await response.json()) captcha_data = await response.json() bg = captcha_data.get("repData").get('originalImageBase64') slider = captcha_data.get("repData").get('jigsawImageBase64') x_box = get_slide_offset_from_base64(bg,slider) slider = await self.browser_page.query_selector('.verify-move-block') # 替换为实际滑块的 CSS 选择器 if slider: box = await slider.bounding_box() start_x = box['x'] + box['width'] / 2 # 获取滑块的初始位置 X start_y = box['y'] + box['height'] / 2 # 获取滑块的初始位置 Y # 目标位置 (替换为你想要的目标位置) target_x = start_x + int(x_box) # 向右移动 100px target_y = start_y await self.browser_page.mouse.move(start_x, start_y) await self.browser_page.mouse.down() await self.browser_page.mouse.move(target_x, target_y, steps=20) # 分步模拟滑动 await self.browser_page.mouse.up() time.sleep(5) else: pass def handle_listen_response_error(self, func): """ 装饰器 用于处理请求回调中的异常 :param func: :return: """ @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True) # 标记失败状态 记录异常 self.fail_status = True self.fail_exception = e self.completed_event.set() return wrapper async def run(): # playwright = await async_playwright().start() # browser = await playwright.chromium.launch(headless=False, # chromium_sandbox=config.BROWSER_ENABLE_SANDBOX, # ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS, # channel="chrome", # args=config.BROWSER_ARGS) playwright = await async_playwright().start() browser = await playwright.firefox.launch( headless=False, ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS, args=config.BROWSER_ARGS ) spider = MetasoSpider(browser, '2025前端工具库top5', '') await spider.run() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(run())