You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							237 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							237 lines
						
					
					
						
							11 KiB
						
					
					
				| # -*- coding: utf-8 -*- | |
| import asyncio | |
| import json | |
| import re | |
| import time | |
| from functools import partial, wraps | |
| from json import JSONDecodeError | |
| 
 | |
| import ftfy | |
| import pyperclip | |
| from playwright.async_api import Browser, async_playwright | |
| from utils.ai_seo_api_utils import AiSeoApis | |
| import config | |
| from abs_spider import AbstractAiSeoSpider | |
| from domain.ai_seo import AiAnswer, AiSearchResult | |
| from utils import create_logger | |
| from utils.captcha import get_slide_offset_from_base64 | |
| 
 | |
| logger = create_logger(__name__) | |
| 
 | |
| 
 | |
| class MetasoSpider(AbstractAiSeoSpider): | |
| 
 | |
|     def __init__(self, browser: Browser, prompt: str, keyword: str, load_session: bool = True): | |
|         super().__init__(browser, prompt, keyword, load_session) | |
|         self.__listen_response = self.handle_listen_response_error(self.__listen_response) | |
| 
 | |
|     def get_home_url(self) -> str: | |
|         return 'https://metaso.cn/' | |
| 
 | |
|     async def _do_spider(self) -> AiAnswer: | |
|         # 初始化信息 | |
|         self._init_data() | |
|         self.browser_page.on('response', partial(self.__listen_response)) | |
|         await self.browser_page.goto(self.get_home_url(), timeout=600000) | |
|         await asyncio.sleep(2) | |
|         info = await self.browser_page.wait_for_selector('#left-menu > div > div.LeftMenu_footer__qsJdJ > div > div > div > button', timeout=600000) | |
|         await info.click() | |
| 
 | |
|         # edu = await self.browser_page.wait_for_selector('body > div:nth-child(51) > div > div > div > div > div.MuiBox-root.css-o45jia > div:nth-child(2) > div.MuiListItemText-root.css-rkhw2f', timeout=600000) | |
|         edu = self.browser_page.locator('//div[@aria-label="每天有100次搜索额度"]/following-sibling::div[1]//span[contains(@class, "MuiTypography-root")]') | |
|         edu_txt= await edu.text_content() | |
|         if edu_txt == '0': | |
|             await AiSeoApis.update_spider_session(self.session_info['id'], 3) | |
|             raise "session额度已用完!" | |
|         # 开始操作 | |
|         chat_input_element = self.browser_page.locator("//textarea[contains(@class, 'search-consult-textarea')]") | |
|         # 输入提问词 | |
|         await chat_input_element.fill(self.prompt) | |
|         await self.browser_page.keyboard.press('Enter') | |
|         # 监听请求 | |
|         await asyncio.sleep(2) | |
| 
 | |
|         await self.browser_page.reload() | |
|         # await self.completed_event.wait() | |
|         # 等待指定元素 | |
|         #//*[@id="search-content-container-8626530479804592128"]/div[2]/button | |
|         copy_button = await self.browser_page.wait_for_selector('//*[starts-with(@id, "search-content-container-")]/div[2]/div[3]/button', timeout=600000) | |
|         # 点击复制按钮 | |
|         await copy_button.click() | |
|         # 读取剪贴板 | |
|         self.ai_answer.answer = pyperclip.paste() | |
|         logger.debug(f'ai回复内容: {self.ai_answer}') | |
|         # 获取来源数据 | |
|         try: | |
|             await self.browser_page.wait_for_selector("//div[contains(@class, 'meta-ordered-list_list-item')]/span", timeout=60000) | |
|             search_items = self.browser_page.locator("//div[contains(@class, 'meta-ordered-list_list-item')]/span") | |
|             search_item_count = await search_items.count() | |
|             logger.debug(f'来源数据: {search_item_count}') | |
|             await asyncio.sleep(5) | |
|             search_results = [] | |
|             for i in range(search_item_count): | |
|                 search_result = AiSearchResult() | |
|                 search_item = search_items.nth(i) | |
|                 # 抽取链接和标题 | |
|                 a = search_item.locator("xpath=./a") | |
|                 # 抽取时间 | |
|                 publish_date_element = search_item.locator("xpath=./span") | |
|                 if await a.is_visible(): | |
|                     search_result.title = await a.text_content() | |
|                     search_result.url = await a.get_attribute('href') | |
|                 if await publish_date_element.count() > 0: | |
|                     publish_date_element = search_item.locator("xpath=./span").nth(-1) | |
|                     publish_str = await publish_date_element.text_content() | |
|                     search_result.publish_time = publish_str.replace('[', '').replace(']', '') | |
|                 search_results.append(search_result) | |
|             self.ai_answer.search_result = search_results | |
|         except TimeoutError: | |
|             logger.error('没有搜索结果') | |
|         # 报错检查 | |
|         if self.fail_status: | |
|             raise self.fail_exception | |
|         # 获取回答元素 | |
|         answer_element = self.browser_page.locator("//div[contains(@class, 'Search_search-result-container')]") | |
|         box = await answer_element.bounding_box() | |
|         logger.debug(f'answer_element: {box}') | |
|         view_port_height = box['height'] + 300 | |
|         # 调整视口大小 | |
|         await self.browser_page.set_viewport_size({ | |
|             'width': 1920, | |
|             'height': int(view_port_height) | |
|         }) | |
|         # 截图 | |
|         screenshot_path = self._get_screenshot_path() | |
|         await self.browser_page.screenshot(path=screenshot_path) | |
|         self.ai_answer.screenshot_file = screenshot_path | |
|         return self.ai_answer | |
| 
 | |
|     def get_platform_id(self) -> int: | |
|         return 13 | |
| 
 | |
|     def get_platform_name(self) -> str: | |
|         return 'Metaso' | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
|     async def __listen_response(self, response): | |
|         url = response.url | |
|         if response.status == 200: | |
|             # if 'searchV2' in url: | |
|             #     answer = '' | |
|             #     results = [] | |
|             #     search_results = list() | |
|             #     response_text = ftfy.fix_text(await response.text()) | |
|             #     event_lines = response_text.split('\n\n') | |
|             #     self.completed_event.set() | |
|             #     for line in event_lines: | |
|             #         if line.startswith('data:'): | |
|             #             line = line[5:] | |
|             #         try: | |
|             #             event_json = json.loads(line) | |
|             #         except JSONDecodeError: | |
|             #             continue | |
|             #         # 开始event_json | |
|             #         type = event_json.get('type') | |
|             #         # 获取到搜索结果 | |
|             #         if type == 'set-reference': | |
|             #             search_results = event_json.get('list', []) | |
|             #             # for search_result in search_results: | |
|             #             #     result = AiSearchResult(title=search_result.get('title', ''), | |
|             #             #                    url=search_result.get('url', ''), | |
|             #             #                    host_name=search_result.get('author', ''), | |
|             #             #                    body=search_result.get('displaySource'), | |
|             #             #                    publish_time=search_result.get('publish_time', '')) | |
|             #             #     results.append(result) | |
|             #             # self.ai_answer.search_result = results | |
|             #         # 获取到回答内容 | |
|             #         if type == 'append-text': | |
|             #             answer = answer + event_json.get('text', '') | |
|             #     pattern = r'\[(\d+)\]' | |
|             #     index_data = list(set(re.findall(pattern, answer))) | |
|             #     for index,search_result in enumerate(search_results): | |
|             #         if str(index+1) in index_data: | |
|             #             result = AiSearchResult(title=search_result.get('title', ''), | |
|             #                                     url=search_result.get('url', ''), | |
|             #                                     host_name=search_result.get('author', ''), | |
|             #                                     body=search_result.get('displaySource'), | |
|             #                                     publish_time=search_result.get('publish_time', ''), | |
|             #                                     is_referenced="1") | |
|             #         else: | |
|             #             result = AiSearchResult(title=search_result.get('title', ''), | |
|             #                                     url=search_result.get('url', ''), | |
|             #                                     host_name=search_result.get('author', ''), | |
|             #                                     body=search_result.get('displaySource'), | |
|             #                                     publish_time=search_result.get('publish_time', ''), | |
|             #                                     is_referenced="0") | |
|             #         results.append(result) | |
|             #     self.ai_answer.search_result = results | |
|             #     self.ai_answer.answer = answer | |
|             #     self.completed_event.set() | |
|             if 'api/captcha/get' in url: | |
|                     logger.info(await response.json()) | |
|                     captcha_data = await response.json() | |
|                     bg = captcha_data.get("repData").get('originalImageBase64') | |
|                     slider = captcha_data.get("repData").get('jigsawImageBase64') | |
|                     x_box = get_slide_offset_from_base64(bg,slider) | |
|                     slider = await self.browser_page.query_selector('.verify-move-block')  # 替换为实际滑块的 CSS 选择器 | |
|                     if slider: | |
|                         box = await slider.bounding_box() | |
|                         start_x = box['x'] + box['width'] / 2  # 获取滑块的初始位置 X | |
|                         start_y = box['y'] + box['height'] / 2  # 获取滑块的初始位置 Y | |
| 
 | |
|                         # 目标位置 (替换为你想要的目标位置) | |
|                         target_x = start_x + int(x_box)  # 向右移动 100px | |
|                         target_y = start_y | |
| 
 | |
|                         await self.browser_page.mouse.move(start_x, start_y) | |
|                         await self.browser_page.mouse.down() | |
|                         await self.browser_page.mouse.move(target_x, target_y, steps=20)  # 分步模拟滑动 | |
|                         await self.browser_page.mouse.up() | |
|                         time.sleep(5) | |
| 
 | |
|                     else: | |
|                         pass | |
|     def handle_listen_response_error(self, func): | |
|         """ | |
|         装饰器 用于处理请求回调中的异常 | |
|         :param func: | |
|         :return: | |
|         """ | |
| 
 | |
|         @wraps(func) | |
|         async def wrapper(*args, **kwargs): | |
|             try: | |
|                 return await func(*args, **kwargs) | |
|             except Exception as e: | |
|                 logger.error(f"{self.get_platform_name()}响应异常: {e}", exc_info=True) | |
|                 # 标记失败状态 记录异常 | |
|                 self.fail_status = True | |
|                 self.fail_exception = e | |
|                 self.completed_event.set() | |
| 
 | |
|         return wrapper | |
| 
 | |
| 
 | |
| 
 | |
| async def run(): | |
|     # playwright = await async_playwright().start() | |
|     # browser = await playwright.chromium.launch(headless=False, | |
|     #                                            chromium_sandbox=config.BROWSER_ENABLE_SANDBOX, | |
|     #                                            ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS, | |
|     #                                            channel="chrome", | |
|     #                                            args=config.BROWSER_ARGS) | |
|     playwright = await async_playwright().start() | |
|     browser = await playwright.firefox.launch( | |
|         headless=False, | |
|         ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS, | |
|         args=config.BROWSER_ARGS | |
|     ) | |
|     spider = MetasoSpider(browser, '2025前端工具库top5', '') | |
|     await spider.run() | |
| 
 | |
| 
 | |
| if __name__ == '__main__': | |
|     asyncio.get_event_loop().run_until_complete(run())
 |