# coding=utf-8 import asyncio import json import uuid from dataclasses import asdict from datetime import datetime, timedelta, time import requests from apscheduler.schedulers.asyncio import AsyncIOScheduler from playwright.async_api import async_playwright, Browser import config from spiders.ai_seo import * from utils import create_logger from utils.ai import read_rank from utils.ai_seo_api_utils import AiSeoApis logger = create_logger(__name__) scheduler = AsyncIOScheduler() SPIDER_CLS = { 1: DeepseekSpider, 2: TongyiSpider, 3: YuanBaoSpider, 4: KimiSpider, 5: DouBaoSpider, 6: YiYanSpider, 7: NanometerSpider, 13: MetasoSpider } spider_pool: dict = {} async def init_browser() -> tuple: """ 初始化浏览器实例 :return: """ playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=config.BROWSER_HANDLESS, chromium_sandbox=config.BROWSER_ENABLE_SANDBOX, ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS, channel=config.BROWSER_CHANNEL, args=config.BROWSER_ARGS) return playwright, browser def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | TongyiSpider | YuanBaoSpider | KimiSpider | DouBaoSpider | YiYanSpider | NanometerSpider: """ 根据平台ID获取相应的爬虫实例。 参数: - platform_id: 平台标识符,用于选择合适的爬虫类。 - prompt: 用户查询提示,用于爬虫处理。 - brand: 品牌信息,用于爬虫处理。 - browser: 浏览器实例,供爬虫使用。 返回: - AbstractAiSeoSpider: 返回一个抽象的AI SEO爬虫实例。 异常: - ValueError: 如果未找到对应的爬虫类,则抛出此异常。 """ # 根据平台ID获取对应的爬虫类 cls = SPIDER_CLS.get(int(platform_id), None) # 如果没有找到对应的爬虫类,抛出异常 if not cls: return None # 创建并返回爬虫实例 return cls(browser, prompt, brand) async def ai_seo_job(browser, platform_ids, time_range, job_id, type_name, run_id): status, date = calc_task_date(time_range) if not status: # 是否有紧急任务 task_result = await AiSeoApis.get_urgent_task_count() if task_result['count'] <= 0: return current_job = scheduler.get_job(job_id) # current_job.pause() platform_str = ','.join(platform_ids) # 获取任务信息 task_data = await AiSeoApis.get_one_task(date=date, platform_ids=platform_str) if not task_data: logger.info(f'[{type_name}]未获取到任务信息') # current_job.resume() return task_id = task_data['id'] logger.info(f"获取到{task_data['project_id']}项目任务: id: {task_data['id']} 平台id: {task_data['platform_id']} " f"关键词: {task_data['keyword']} 品牌词: {task_data['brand']}") # 记录开始时间 start_time = datetime.now() # 创建爬虫实例 spider = get_spider(task_data['platform_id'], task_data['keyword'], task_data['brand'], browser) # 记录任务id spider.task_id = task_id spider_pool[run_id] = spider logger.info(f"RunId注册成功: TaskId: {task_id} 平台: {spider.platform_name}") # 是否开启深度思考 if task_data['thinking'] == 1: spider.think = True if not spider: await AiSeoApis.update_task_status(task_id, 5) logger.error(f"未找到对应的爬虫类 请检查任务信息: id: {task_data['id']} platform_id: {task_data['platform_id']}") return ai_answer = None try: # 运行爬虫并获取结果 ai_answer = await spider.run() except Exception as e: await AiSeoApis.update_task_status(task_id, 4) logger.info(f"回滚任务状态: id: {task_id}") spider_pool.pop(run_id, None) return if not ai_answer: await AiSeoApis.update_task_status(task_id, 4) logger.error(f"爬虫运行失败 id: {task_data['id']} platform_id: {task_data['platform_id']}") spider_pool.pop(run_id, None) return # 记录结束时间 end_time = datetime.now() # 提交爬虫结果 answer_data = asdict(ai_answer) # 上传截图 upload_data = await AiSeoApis.upload_screenshot_file(answer_data['screenshot_file']) # 结果参数 answer_data = { **config.AI_SEO_API_AUTH, **answer_data, 'task_id': task_data['id'], 'rank': 0, 'start_time': start_time.strftime("%Y-%m-%d %H:%M:%S"), 'end_time': end_time.strftime("%Y-%m-%d %H:%M:%S"), 'screenshot_url': upload_data['url'] } if not answer_data.get('answer', ''): answer_data['answer'] = '未知' answer_data['rank'] = 0 else: brands, rank = await read_rank(answer_data['answer'], task_data['brand']) answer_data['rank'] = rank answer_data['words'] = brands # print('answer_data',answer_data) # search_results = list() # for data in answer_data.get("search_result"): # data_ = {**config.AI_SEO_API_AUTH,"content": data.get("title")} # rest = '' # try: # resp = requests.post(url='https://geo-api.neicela.com/api/third/getSentimentType',json=data_,timeout=600) # # print(resp.text) # rest = resp.json() # except Exception as e: # print(str(e)) # # logger.info("调用getSentimentType接口出现异常:",str(e)) # # print("rest",rest) # if rest.get("code") == 0: # data.update(rest.get("data")) # search_results.append(data) # answer_data['search_result'] = search_results if len(answer_data.get('answer', '')) <= 20: await AiSeoApis.update_task_status(task_id, 4) logger.error(f"爬虫结果长度过短: id: {task_data['id']} platform_id: {task_data['platform_id']}") logger.error(f"回滚任务状态: id: {task_id}") logger.info(f"{answer_data.get('answer', '')}") return result = await AiSeoApis.submit_task(answer_data) logger.debug(json.dumps(answer_data, ensure_ascii=False)) logger.info(f"任务提交成功: id: {task_data['id']}") spider_pool.pop(run_id, None) async def ai_seo_job_with_timeout(browser, platform_ids, time_range, job_id, type_name, timeout=1200): # 生成一个唯一的run_id run_id = str(uuid.uuid4()).replace("-", "") try: await asyncio.wait_for(ai_seo_job(browser, platform_ids, time_range, job_id, type_name, run_id), timeout) except asyncio.TimeoutError: spider = spider_pool.get(run_id, None) if spider: await spider._close() logger.error(f"任务超时: 平台: {spider.platform_id}") spider_pool.pop(run_id, None) await AiSeoApis.update_task_status(spider.task_id, 4) logger.info(f"回滚任务状态: id: {spider.task_id}") async def heartbeat(browser: Browser): load_count = len(browser.contexts) result = await AiSeoApis.heartbeat(config.DC_ID, load_count) logger.success(f"心跳: 机器id: {config.DC_ID} 负载量: {load_count} 发送时间: {result.get('send_time', '')}") def calc_task_date(time_range): # 解析时间 start_time = datetime.strptime(time_range['start_time'], "%H:%M").time() end_time = datetime.strptime(time_range['end_time'], "%H:%M").time() # 获取带时区的当前时间 now = datetime.now() current_time = now.time() # 判断逻辑 if end_time < start_time: # 跨天时间段 if current_time >= start_time or current_time <= end_time: # 如果当前时间在次日结束时间前,开始日期是昨天 start_date = (now - timedelta(days=1)).date() if current_time <= end_time else now.date() return True, start_date.strftime("%Y-%m-%d") else: # 非跨天时间段 if start_time <= current_time <= end_time: return True, now.date().strftime("%Y-%m-%d") return False, None async def check_session_by_platform_id(platform_id): sessions = await AiSeoApis.list_spider_session(platform_id) playwright, browser = await init_browser() for session in sessions: spider = get_spider(session['platform_id'], '你好', '品牌词', browser) await spider.check_session(session['id']) async def check_session(platform_ids=None): if platform_ids is None: platform_ids = [] logger.info(f"开始检查session {len(platform_ids)}个平台") for platform_id in platform_ids: await check_session_by_platform_id(platform_id) logger.info(f"检查session完成") async def main(): # 初始化浏览器实例 playwright, browser = await init_browser() logger.info('初始化浏览器成功') if config.AI_SEO_JOB_ENABLE: # 启动一般平台aiseo任务 scheduler.add_job(ai_seo_job_with_timeout, 'interval', id='ai_seo_job', seconds=config.AI_SEO_JOB_INTERVAL, max_instances=config.AI_SEO_JOB_MAX_INSTANCES, coalesce=False, args=[browser, config.AI_SEO_JOB_PLATFORM_IDS, config.AI_SEO_JOB_RANGE, 'ai_seo_job', '一般AI平台']) logger.success('启动一般AI平台任务成功') if config.DEEPSEEK_JOB_ENABLE: # 启动deepseek-aiseo任务 scheduler.add_job(ai_seo_job_with_timeout, 'interval', id='deepseek_ai_seo_job', seconds=config.DEEPSEEK_JOB_INTERVAL, max_instances=config.DEEPSEEK_JOB_MAX_INSTANCES, coalesce=False, args=[browser, config.DEEPSEEK_JOB_PLATFORM_IDS, config.DEEPSEEK_SEO_JOB_RANGE, 'deepseek_ai_seo_job', 'DeepSeek']) logger.success('启动deepseek任务成功') # 启动心跳任务 # scheduler.add_job(heartbeat, 'interval', id='heartbeat', seconds=30,args=[browser]) # logger.info('启动心跳任务成功') if config.CHECK_SESSION_JOB_ENABLE: # 启动session检查任务 scheduler.add_job( check_session, 'cron', id='check_session', hour=15, minute=14, args=[[1,4,6,13]] ) logger.info('启动session检查任务成功') scheduler.start() await asyncio.Future() # 保持事件循环运行 if __name__ == '__main__': asyncio.run(main()) # 启动事件循环