2 Commits

  1. 122
      spiders/ai_seo/metaso.py

122
spiders/ai_seo/metaso.py

@ -13,6 +13,7 @@ import config
from abs_spider import AbstractAiSeoSpider
from domain.ai_seo import AiAnswer, AiSearchResult
from utils import create_logger
from utils.captcha import get_slide_offset_from_base64
logger = create_logger(__name__)
@ -29,6 +30,7 @@ class MetasoSpider(AbstractAiSeoSpider):
async def _do_spider(self) -> AiAnswer:
# 初始化信息
self._init_data()
self.browser_page.on('response', partial(self.__listen_response))
await self.browser_page.goto(self.get_home_url(), timeout=600000)
await asyncio.sleep(2)
info = await self.browser_page.wait_for_selector('#left-menu > div > div.LeftMenu_footer__qsJdJ > div > div > div > button', timeout=600000)
@ -47,7 +49,7 @@ class MetasoSpider(AbstractAiSeoSpider):
await self.browser_page.keyboard.press('Enter')
# 监听请求
await asyncio.sleep(2)
# self.browser_page.on('response', partial(self.__listen_response))
await self.browser_page.reload()
# await self.completed_event.wait()
# 等待指定元素
@ -109,61 +111,87 @@ class MetasoSpider(AbstractAiSeoSpider):
def get_platform_name(self) -> str:
return 'Metaso'
async def __listen_response(self, response):
url = response.url
logger.debug(f'url: {url}')
if 'searchV2' in url:
answer = ''
results = []
search_results = list()
response_text = ftfy.fix_text(await response.text())
event_lines = response_text.split('\n\n')
self.completed_event.set()
for line in event_lines:
if line.startswith('data:'):
line = line[5:]
try:
event_json = json.loads(line)
except JSONDecodeError:
continue
# 开始event_json
type = event_json.get('type')
# 获取到搜索结果
if type == 'set-reference':
search_results = event_json.get('list', [])
# for search_result in search_results:
if response.status == 200:
# if 'searchV2' in url:
# answer = ''
# results = []
# search_results = list()
# response_text = ftfy.fix_text(await response.text())
# event_lines = response_text.split('\n\n')
# self.completed_event.set()
# for line in event_lines:
# if line.startswith('data:'):
# line = line[5:]
# try:
# event_json = json.loads(line)
# except JSONDecodeError:
# continue
# # 开始event_json
# type = event_json.get('type')
# # 获取到搜索结果
# if type == 'set-reference':
# search_results = event_json.get('list', [])
# # for search_result in search_results:
# # result = AiSearchResult(title=search_result.get('title', ''),
# # url=search_result.get('url', ''),
# # host_name=search_result.get('author', ''),
# # body=search_result.get('displaySource'),
# # publish_time=search_result.get('publish_time', ''))
# # results.append(result)
# # self.ai_answer.search_result = results
# # 获取到回答内容
# if type == 'append-text':
# answer = answer + event_json.get('text', '')
# pattern = r'\[(\d+)\]'
# index_data = list(set(re.findall(pattern, answer)))
# for index,search_result in enumerate(search_results):
# if str(index+1) in index_data:
# result = AiSearchResult(title=search_result.get('title', ''),
# url=search_result.get('url', ''),
# host_name=search_result.get('author', ''),
# body=search_result.get('displaySource'),
# publish_time=search_result.get('publish_time', ''))
# publish_time=search_result.get('publish_time', ''),
# is_referenced="1")
# else:
# result = AiSearchResult(title=search_result.get('title', ''),
# url=search_result.get('url', ''),
# host_name=search_result.get('author', ''),
# body=search_result.get('displaySource'),
# publish_time=search_result.get('publish_time', ''),
# is_referenced="0")
# results.append(result)
# self.ai_answer.search_result = results
# 获取到回答内容
if type == 'append-text':
answer = answer + event_json.get('text', '')
pattern = r'\[(\d+)\]'
index_data = list(set(re.findall(pattern, answer)))
for index,search_result in enumerate(search_results):
if str(index+1) in index_data:
result = AiSearchResult(title=search_result.get('title', ''),
url=search_result.get('url', ''),
host_name=search_result.get('author', ''),
body=search_result.get('displaySource'),
publish_time=search_result.get('publish_time', ''),
is_referenced="1")
else:
result = AiSearchResult(title=search_result.get('title', ''),
url=search_result.get('url', ''),
host_name=search_result.get('author', ''),
body=search_result.get('displaySource'),
publish_time=search_result.get('publish_time', ''),
is_referenced="0")
results.append(result)
self.ai_answer.search_result = results
self.ai_answer.answer = answer
self.completed_event.set()
# self.ai_answer.answer = answer
# self.completed_event.set()
if 'api/captcha/get' in url:
logger.info(await response.json())
captcha_data = await response.json()
bg = captcha_data.get("repData").get('originalImageBase64')
slider = captcha_data.get("repData").get('jigsawImageBase64')
x_box = get_slide_offset_from_base64(bg,slider)
slider = await self.browser_page.query_selector('.verify-move-block') # 替换为实际滑块的 CSS 选择器
if slider:
box = await slider.bounding_box()
start_x = box['x'] + box['width'] / 2 # 获取滑块的初始位置 X
start_y = box['y'] + box['height'] / 2 # 获取滑块的初始位置 Y
# 目标位置 (替换为你想要的目标位置)
target_x = start_x + int(x_box) # 向右移动 100px
target_y = start_y
await self.browser_page.mouse.move(start_x, start_y)
await self.browser_page.mouse.down()
await self.browser_page.mouse.move(target_x, target_y, steps=20) # 分步模拟滑动
await self.browser_page.mouse.up()
else:
pass
def handle_listen_response_error(self, func):
"""

Loading…
Cancel
Save