# coding=utf-8 import asyncio import uuid from abc import ABC, abstractmethod from asyncio import Event from playwright.async_api import Browser, BrowserContext, Page import config from domain.ai_seo import AiAnswer from utils import create_logger from utils.ai_seo_api_utils import AiSeoApis from utils.session_utils import get_spider_session logger = create_logger("abs_spider") class AbstractAiSeoSpider(ABC): browser: Browser browser_content: BrowserContext browser_page: Page platform_id: int platform_name: str prompt: str keyword: str completed_event: Event | None = None ai_answer: AiAnswer | None = None fail_status: bool = False fail_exception: Exception | None = None load_session: bool = True session_info: dict | None = None task_id: int = 0 think: bool = False def __init__(self, browser: Browser, prompt: str, keyword: str, think: bool = False, load_session: bool = True): self.browser = browser self.platform_id = self.get_platform_id() self.platform_name = self.get_platform_name() self.prompt = prompt self.keyword = keyword self.load_session = load_session self.think = think def _init_data(self): self.completed_event = asyncio.Event() self.ai_answer = AiAnswer(self.get_platform_id(), self.get_platform_name(), self.prompt, self.keyword) self.index_data = None def _get_session_path(self): sessions = { 1: "deepseek", 5: "doubao", 4: "kimi", 2: "tongyi", 6: "yiyan", 3: "yuanbao", 8: "wenxiaoyan" } # todo 支持多session管理 session_path = f"./data/session/{sessions.get(self.platform_id, 'deepseek')}.json" return session_path def _get_screenshot_path(self): unique_id = str(uuid.uuid4()).replace('-', '') screenshot_path = f'{config.SCREENSHOT_BASE_PATH}/{self.platform_name}_{unique_id}.png' return screenshot_path async def __init_page(self, id=''): if self.load_session: self.session_info = await get_spider_session(self.platform_id, id) if self.platform_id != 8: print(self.session_info['session_path']) self.browser_content = await self.browser.new_context(storage_state=self.session_info['session_path']) else: self.browser_content = await self.browser.new_context() else: self.browser_content = await self.browser.new_context() self.browser_page = await self.browser_content.new_page() await self.browser_page.set_viewport_size(config.PAGE_INIT_VIEWPORT_SIZE) # 加载伪装脚本 await self.browser_page.add_init_script(""" Object.defineProperties(navigator, {webdriver:{get:()=>false}}); """) await self.browser_page.add_init_script('static/stealth.min.js') async def _close(self): await self.browser_page.close() await self.browser_content.close() async def _login(self): """ 登录 :return: """ await self.__init_page() await self.browser_page.goto(self.get_home_url()) unique_id = str(uuid.uuid4()).replace('-', '') session_path = f"./data/session/{self.get_platform_name()}/{unique_id}.json" input("请手动登录后按回车继续...") await self.browser_content.storage_state(path=session_path) logger.info(f"[{self.platform_name}]登录成功: {session_path}") await self._close() async def run(self) -> AiAnswer | None: """ 运行爬虫 :return: """ try: await self.__init_page() logger.info(f"{self.platform_name}爬虫开始运行 提问词: {self.prompt}") return await self._do_spider() except Exception as e: logger.error(f"{self.platform_name}爬虫运行异常 参数: {self.prompt, self.keyword}") logger.error(f"异常信息: {str(e)}") raise e finally: await self._close() async def check_session(self, session_id) -> bool: await self.__init_page(session_id) result = await self.do_check_session() await self._close() if result: logger.success(f"[{self.get_platform_name()}]session状态有效! ✅ id: {session_id}") else: logger.error(f"[{self.get_platform_name()}]session状态无效! ❌ id: {session_id}") # 更新session状态 status = None if self.get_platform_id() != 13: status = 1 if result else 2 else: status = 1 if result else 3 await AiSeoApis.update_spider_session(session_id, status) return result @abstractmethod async def _do_spider(self) -> AiAnswer: """ 爬虫具体逻辑 :return: """ pass @abstractmethod def get_platform_id(self) -> int: pass @abstractmethod def get_platform_name(self) -> str: pass @abstractmethod def get_home_url(self) -> str: pass @abstractmethod async def do_check_session(self) -> bool: pass