You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
5.2 KiB
163 lines
5.2 KiB
# coding=utf-8
|
|
import asyncio
|
|
import uuid
|
|
from abc import ABC, abstractmethod
|
|
from asyncio import Event
|
|
|
|
from playwright.async_api import Browser, BrowserContext, Page
|
|
|
|
import config
|
|
from domain.ai_seo import AiAnswer
|
|
from utils import create_logger
|
|
from utils.ai_seo_api_utils import AiSeoApis
|
|
from utils.session_utils import get_spider_session
|
|
|
|
logger = create_logger("abs_spider")
|
|
|
|
|
|
|
|
|
|
class AbstractAiSeoSpider(ABC):
|
|
browser: Browser
|
|
browser_content: BrowserContext
|
|
browser_page: Page
|
|
platform_id: int
|
|
platform_name: str
|
|
prompt: str
|
|
keyword: str
|
|
completed_event: Event | None = None
|
|
ai_answer: AiAnswer | None = None
|
|
fail_status: bool = False
|
|
fail_exception: Exception | None = None
|
|
load_session: bool = True
|
|
session_info: dict | None = None
|
|
task_id: int = 0
|
|
think: bool = False
|
|
|
|
def __init__(self, browser: Browser, prompt: str, keyword: str, think: bool = False, load_session: bool = True):
|
|
self.browser = browser
|
|
self.platform_id = self.get_platform_id()
|
|
self.platform_name = self.get_platform_name()
|
|
self.prompt = prompt
|
|
self.keyword = keyword
|
|
self.load_session = load_session
|
|
self.think = think
|
|
|
|
def _init_data(self):
|
|
self.completed_event = asyncio.Event()
|
|
self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword)
|
|
self.index_data = None
|
|
|
|
def _get_session_path(self):
|
|
sessions = {
|
|
1: "deepseek",
|
|
5: "doubao",
|
|
4: "kimi",
|
|
2: "tongyi",
|
|
6: "yiyan",
|
|
3: "yuanbao",
|
|
8: "wenxiaoyan"
|
|
}
|
|
# todo 支持多session管理
|
|
|
|
session_path = f"./data/session/{sessions.get(self.platform_id, 'deepseek')}.json"
|
|
return session_path
|
|
|
|
def _get_screenshot_path(self):
|
|
unique_id = str(uuid.uuid4()).replace('-', '')
|
|
screenshot_path = f'{config.SCREENSHOT_BASE_PATH}/{self.platform_name}_{unique_id}.png'
|
|
return screenshot_path
|
|
|
|
async def __init_page(self, id=''):
|
|
if self.load_session:
|
|
self.session_info = await get_spider_session(self.platform_id, id)
|
|
if self.platform_id != 8:
|
|
print(self.session_info['session_path'])
|
|
self.browser_content = await self.browser.new_context(storage_state=self.session_info['session_path'])
|
|
else:
|
|
self.browser_content = await self.browser.new_context()
|
|
else:
|
|
self.browser_content = await self.browser.new_context()
|
|
self.browser_page = await self.browser_content.new_page()
|
|
await self.browser_page.set_viewport_size(config.PAGE_INIT_VIEWPORT_SIZE)
|
|
# 加载伪装脚本
|
|
await self.browser_page.add_init_script("""
|
|
Object.defineProperties(navigator, {webdriver:{get:()=>false}});
|
|
""")
|
|
await self.browser_page.add_init_script('static/stealth.min.js')
|
|
|
|
async def _close(self):
|
|
await self.browser_page.close()
|
|
await self.browser_content.close()
|
|
|
|
async def _login(self):
|
|
"""
|
|
登录
|
|
:return:
|
|
"""
|
|
await self.__init_page()
|
|
await self.browser_page.goto(self.get_home_url())
|
|
unique_id = str(uuid.uuid4()).replace('-', '')
|
|
session_path = f"./data/session/{self.get_platform_name()}/{unique_id}.json"
|
|
input("请手动登录后按回车继续...")
|
|
await self.browser_content.storage_state(path=session_path)
|
|
logger.info(f"[{self.platform_name}]登录成功: {session_path}")
|
|
await self._close()
|
|
|
|
async def run(self) -> AiAnswer | None:
|
|
"""
|
|
运行爬虫
|
|
:return:
|
|
"""
|
|
try:
|
|
await self.__init_page()
|
|
logger.info(f"{self.platform_name}爬虫开始运行 提问词: {self.prompt}")
|
|
return await self._do_spider()
|
|
except Exception as e:
|
|
logger.error(f"{self.platform_name}爬虫运行异常 参数: {self.prompt, self.keyword}")
|
|
logger.error(f"异常信息: {str(e)}")
|
|
raise e
|
|
finally:
|
|
await self._close()
|
|
|
|
async def check_session(self, session_id) -> bool:
|
|
await self.__init_page(session_id)
|
|
result = await self.do_check_session()
|
|
await self._close()
|
|
if result:
|
|
logger.success(f"[{self.get_platform_name()}]session状态有效! ✅ id: {session_id}")
|
|
else:
|
|
logger.error(f"[{self.get_platform_name()}]session状态无效! ❌ id: {session_id}")
|
|
|
|
# 更新session状态
|
|
status = None
|
|
if self.get_platform_id() != 13:
|
|
status = 1 if result else 2
|
|
else:
|
|
status = 1 if result else 3
|
|
await AiSeoApis.update_spider_session(session_id, status)
|
|
return result
|
|
|
|
|
|
@abstractmethod
|
|
async def _do_spider(self) -> AiAnswer:
|
|
"""
|
|
爬虫具体逻辑
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_platform_id(self) -> int:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_platform_name(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_home_url(self) -> str:
|
|
pass
|
|
@abstractmethod
|
|
async def do_check_session(self) -> bool:
|
|
pass
|