From 1f110c160c27a93b9743b364962a9e97122e0f0e Mon Sep 17 00:00:00 2001 From: zzx Date: Wed, 15 Oct 2025 19:53:56 +0800 Subject: [PATCH] =?UTF-8?q?feat(ai=5Fseo):=20=E6=94=AF=E6=8C=81=20datetime?= =?UTF-8?q?=20=E7=B1=BB=E5=9E=8B=E7=9A=84=20publish=5Ftime=20=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=B9=B6=E4=BC=98=E5=8C=96=E5=B7=A5=E4=BD=9C=E6=B5=81?= =?UTF-8?q?=E7=88=AC=E8=99=AB=E9=9B=86=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 `AiAnswer` 数据类中扩展 `publish_time` 类型,支持传入 `datetime` 实例,并自动格式化为字符串- 新增 `WorkFlowApiSpider` 爬虫类,用于通过 API 方式获取工作流平台的 AI 回复结果 - 更新 `main.py` 和 `run.py`,注册并调用工作流爬虫逻辑 - 添加定时任务调度逻辑以支持工作流平台的任务执行 - 支持根据平台 ID 动态判断使用传统爬虫或工作流 API 爬虫- 增加对短结果、空结果的异常处理及任务状态回滚机制- 浏览器初始化逻辑优化,仅在需要时启动浏览器实例 --- domain/ai_seo.py | 6 ++- main.py | 7 ++- run.py | 102 +++++++++++++++++++++++++++++++++++++++++-- spiders/ai_seo/workflow.py | 105 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+), 5 deletions(-) create mode 100644 spiders/ai_seo/workflow.py diff --git a/domain/ai_seo.py b/domain/ai_seo.py index 3c3db4b..88602e6 100644 --- a/domain/ai_seo.py +++ b/domain/ai_seo.py @@ -21,7 +21,7 @@ class AiSearchResult: # 描述 body: str = '' # 发布时间 - publish_time: str|int|float = '' + publish_time: str|int|float|datetime = '' #是否被ai引用 is_referenced: str = '0' #情感倾向" 1- 中立 2- 正面 3- 负面 @@ -29,6 +29,10 @@ class AiSearchResult: #情感类型 type = 0 def __post_init__(self): + + if isinstance(self.publish_time, datetime): + self.publish_time = self.publish_time.strftime('%Y-%m-%d %H:%M:%S') + if isinstance(self.publish_time, float): self.publish_time = int(self.publish_time) diff --git a/main.py b/main.py index d0a7964..3c93dc4 100644 --- a/main.py +++ b/main.py @@ -13,6 +13,7 @@ from abs_spider import AbstractAiSeoSpider from domain.ai_seo import AiAnswer from spiders.ai_seo import * from spiders.ai_seo.wenxiaoyan import WenxiaoyanSpider +from spiders.ai_seo.workflow import WorkFlowApiSpider from utils.logger_utils import create_logger from utils.ai import AiSeoApis @@ -30,6 +31,8 @@ SPIDER_CLS = { 8: WenxiaoyanSpider } +WORKFLOW_PLATFORM_IDS = [2, 5] + async def init_browser() -> tuple: """ @@ -46,7 +49,9 @@ async def init_browser() -> tuple: return playwright, browser -def get_spider(platform_id, prompt, brand, browser) -> AbstractAiSeoSpider: +def get_spider(platform_id, prompt, brand, browser) -> AbstractAiSeoSpider|WorkFlowApiSpider: + if platform_id in WORKFLOW_PLATFORM_IDS: + return WorkFlowApiSpider(prompt, brand, platform_id) cls = SPIDER_CLS.get(int(platform_id), None) if not cls: raise ValueError(f"未找到对应的爬虫类,platform_id={platform_id}") diff --git a/run.py b/run.py index d8647d0..b7d5b68 100644 --- a/run.py +++ b/run.py @@ -5,12 +5,12 @@ import uuid from dataclasses import asdict from datetime import datetime, timedelta, time -import requests from apscheduler.schedulers.asyncio import AsyncIOScheduler from playwright.async_api import async_playwright, Browser import config from spiders.ai_seo import * +from spiders.ai_seo.workflow import WorkFlowApiSpider from utils import create_logger from utils.ai import read_rank from utils.ai_seo_api_utils import AiSeoApis @@ -30,6 +30,7 @@ SPIDER_CLS = { } spider_pool: dict = {} +WORKFLOW_PLATFORM_IDS = [2, 5] async def init_browser() -> tuple: """ @@ -44,7 +45,7 @@ async def init_browser() -> tuple: args=config.BROWSER_ARGS) return playwright, browser -def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | TongyiSpider | YuanBaoSpider | KimiSpider | DouBaoSpider | YiYanSpider | NanometerSpider: +def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | TongyiSpider | YuanBaoSpider | KimiSpider | DouBaoSpider | YiYanSpider | NanometerSpider | WorkFlowApiSpider: """ 根据平台ID获取相应的爬虫实例。 @@ -61,6 +62,8 @@ def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | T - ValueError: 如果未找到对应的爬虫类,则抛出此异常。 """ # 根据平台ID获取对应的爬虫类 + if str(platform_id) in config.AISEO_WORKFLOW_JOB_PLATFORM_IDS: + return WorkFlowApiSpider(prompt, brand, platform_id) cls = SPIDER_CLS.get(int(platform_id), None) # 如果没有找到对应的爬虫类,抛出异常 if not cls: @@ -188,6 +191,87 @@ async def ai_seo_job_with_timeout(browser, platform_ids, time_range, job_id, typ await AiSeoApis.update_task_status(spider.task_id, 4) logger.info(f"回滚任务状态: id: {spider.task_id}") +async def workflow_job(browser, platform_ids, time_range, type_name, run_id): + status, date = calc_task_date(time_range) + + if not status: + # 是否有紧急任务 + task_result = await AiSeoApis.get_urgent_task_count() + if task_result['count'] <= 0: + return + platform_str = ','.join(platform_ids) + # 获取任务信息 + task_data = await AiSeoApis.get_one_task(date=date, platform_ids=platform_str) + if not task_data: + logger.info(f'[{type_name}]未获取到任务信息') + return + task_id = task_data['id'] + logger.info(f"获取到{task_data['project_id']}项目任务: id: {task_data['id']} 平台id: {task_data['platform_id']} " + f"关键词: {task_data['keyword']} 品牌词: {task_data['brand']}") + + # 记录开始时间 + start_time = datetime.now() + # 创建爬虫实例 + spider = get_spider(task_data['platform_id'], task_data['keyword'], task_data['brand'], browser) + # 记录任务id + spider.task_id = task_id + spider_pool[run_id] = spider + logger.info(f"RunId注册成功: TaskId: {task_id} 平台: {spider.platform_name}") + # 是否开启深度思考 + if not spider: + await AiSeoApis.update_task_status(task_id, 5) + logger.error( + f"未找到对应的爬虫类 请检查任务信息: id: {task_data['id']} platform_id: {task_data['platform_id']}") + return + ai_answer = None + try: + # 运行爬虫并获取结果 + ai_answer = await spider.run() + except Exception as e: + await AiSeoApis.update_task_status(task_id, 4) + logger.info(f"回滚任务状态: id: {task_id}") + spider_pool.pop(run_id, None) + return + if not ai_answer: + await AiSeoApis.update_task_status(task_id, 4) + logger.error(f"爬虫运行失败 id: {task_data['id']} platform_id: {task_data['platform_id']}") + spider_pool.pop(run_id, None) + return + # 记录结束时间 + end_time = datetime.now() + + # 提交爬虫结果 + answer_data = asdict(ai_answer) + # 结果参数 + answer_data = { + **config.AI_SEO_API_AUTH, + **answer_data, + 'task_id': task_data['id'], + 'rank': 0, + 'start_time': start_time.strftime("%Y-%m-%d %H:%M:%S"), + 'end_time': end_time.strftime("%Y-%m-%d %H:%M:%S"), + 'screenshot_url':'' + } + if not answer_data.get('answer', ''): + answer_data['answer'] = '未知' + answer_data['rank'] = 0 + else: + brands, rank = await read_rank(answer_data['answer'], task_data['brand']) + answer_data['rank'] = rank + answer_data['words'] = brands + + if len(answer_data.get('answer', '')) <= 20: + await AiSeoApis.update_task_status(task_id, 4) + logger.error(f"爬虫结果长度过短: id: {task_data['id']} platform_id: {task_data['platform_id']}") + logger.error(f"回滚任务状态: id: {task_id}") + logger.info(f"{answer_data.get('answer', '')}") + return + + result = await AiSeoApis.submit_task(answer_data) + logger.debug(json.dumps(answer_data, ensure_ascii=False)) + logger.info(f"任务提交成功: id: {task_data['id']}") + spider_pool.pop(run_id, None) + async def heartbeat(browser: Browser): load_count = len(browser.contexts) result = await AiSeoApis.heartbeat(config.DC_ID, load_count) @@ -237,7 +321,11 @@ async def check_session(platform_ids=None): async def main(): # 初始化浏览器实例 - playwright, browser = await init_browser() + if config.AI_SEO_JOB_ENABLE or config.DEEPSEEK_JOB_ENABLE: + playwright, browser = await init_browser() + else: + playwright = None + browser = None logger.info('初始化浏览器成功') if config.AI_SEO_JOB_ENABLE: # 启动一般平台aiseo任务 @@ -253,6 +341,14 @@ async def main(): args=[browser, config.DEEPSEEK_JOB_PLATFORM_IDS, config.DEEPSEEK_SEO_JOB_RANGE, 'deepseek_ai_seo_job', 'DeepSeek']) logger.success('启动deepseek任务成功') + if config.AISEO_WORKFLOW_JOB_ENABLE: + # 启动工作流获取AI结果任务 + scheduler.add_job(workflow_job, 'interval', + id='workflow_job', seconds=config.AISEO_WORKFLOW_JOB_INTERVAL, + max_instances=config.AISEO_WORKFLOW_JOB_MAX_INSTANCES, coalesce=False, + args=[browser, config.AISEO_WORKFLOW_JOB_PLATFORM_IDS, config.AI_SEO_JOB_RANGE, + 'workflow_job', 'Workflow']) + logger.success('启动工作流获取AI结果任务成功') # 启动心跳任务 # scheduler.add_job(heartbeat, 'interval', id='heartbeat', seconds=30,args=[browser]) # logger.info('启动心跳任务成功') diff --git a/spiders/ai_seo/workflow.py b/spiders/ai_seo/workflow.py new file mode 100644 index 0000000..ccce3eb --- /dev/null +++ b/spiders/ai_seo/workflow.py @@ -0,0 +1,105 @@ +# coding=utf-8 +from datetime import datetime + +import httpx + +import config +from domain.ai_seo import AiAnswer, AiSearchResult +from utils import create_logger + +# 平台信息 +PLATFORMS = { + 1: {'id': 1, 'name': 'Deepseek', 'api_key': ''}, + 2: {'id': 2, 'name': '通义千问', 'api_key': 'app-mQE0lOxB0G49r4tQSv2LgIOV'}, + 3: {'id': 3, 'name': '腾讯元宝', 'api_key': ''}, + 4: {'id': 4, 'name': 'Kimi', 'api_key': ''}, + 5: {'id': 5, 'name': '豆包', 'api_key': 'app-lD5HbD03EW7pamzIV2VEIyR6'}, + 6: {'id': 6, 'name': '文心一言', 'api_key': ''}, +} + + + +logger = create_logger(__name__) + +class WorkFlowApiSpider: + platform_id: int + platform_name: str + prompt: str + keyword: str + ai_answer: AiAnswer | None = None + fail_status: bool = False + fail_exception: Exception | None = None + load_session: bool = True + task_id: int = 0 + think: bool = False, + api_key: str = '' + + def __init__(self, prompt: str, keyword: str, platform_id: int): + self.platform_id = platform_id + self.prompt = prompt + self.keyword = keyword + + platform = PLATFORMS.get(int(platform_id), {}) + if not platform: + raise Exception('平台不存在') + self.platform_name = platform.get('name', '') + self.api_key = platform.get('api_key', '') + if not self.api_key: + raise Exception('平台未配置api_key') + + async def run(self): + logger.info(f"{self.platform_name}Api开始获取数据 提问词: {self.prompt}") + # 构建参数 + params = { + "response_mode": "blocking", + "user": config.DIFY_USER, + "inputs": { + "prompt": self.prompt + } + } + headers = { + "Authorization": f"Bearer {self.api_key}" + } + # 发送请求 + async with httpx.AsyncClient() as client: + response = await client.post(f"{config.DIFY_BASE_URL}/workflows/run", json=params, headers=headers, timeout=300) + json_result = response.json() + result = json_result.get('data', []) + if not result or not result['status'] == 'succeeded': + logger.error(f"{self.platform_name}Api获取数据失败: {json_result}") + raise Exception(f"{self.platform_name}Api获取数据失败") + # 获取工作流返回的数据 + workflow_result = result['outputs']['work_flow_result'] + # 用量数据 + usage = workflow_result['usage'] + # ai回复内容 带标签 + answer = workflow_result['answer'] + # ai回复内容 不带标签 + pure_answer = workflow_result['pure_answer'] + logger.debug(f"ai回复: {pure_answer}") + # 联网搜索结果 + web_searches = workflow_result.get('web_search', []) + # 转换后的结果 + search_items = [] + for item in web_searches: + # 提取publish_time + if not item['datePublished']: + publish_time = None + else: + publish_time = datetime.strptime(item['datePublished'], "%Y-%m-%dT%H:%M:%S%z") + search_item = AiSearchResult( + title=item['name'], + url=item['url'], + host_name=item['siteName'], + body=item['summary'], + publish_time=publish_time, + is_referenced=item['is_ref'], + ) + logger.debug(f"ai参考资料: [{search_item.host_name}]{search_item.title}({search_item.url})") + search_items.append(search_item) + + # 组合结果 + self.ai_answer = AiAnswer(self.platform_id, self.platform_name, self.prompt, self.keyword, answer, search_items, '', True) + logger.info(f"本次用量:\n总Token: {usage['total_tokens']}\n总资费: {round(float(usage['total_price']), 3)}") + return self.ai_answer +