Browse Source

feat(ai_seo): 支持 datetime 类型的 publish_time 字段并优化工作流爬虫集成

- 在 `AiAnswer` 数据类中扩展 `publish_time` 类型,支持传入 `datetime` 实例,并自动格式化为字符串- 新增 `WorkFlowApiSpider` 爬虫类,用于通过 API 方式获取工作流平台的 AI 回复结果
- 更新 `main.py` 和 `run.py`,注册并调用工作流爬虫逻辑
- 添加定时任务调度逻辑以支持工作流平台的任务执行
- 支持根据平台 ID 动态判断使用传统爬虫或工作流 API 爬虫- 增加对短结果、空结果的异常处理及任务状态回滚机制- 浏览器初始化逻辑优化,仅在需要时启动浏览器实例
master
zzx 2 weeks ago
parent
commit
1f110c160c
  1. 6
      domain/ai_seo.py
  2. 7
      main.py
  3. 100
      run.py
  4. 105
      spiders/ai_seo/workflow.py

6
domain/ai_seo.py

@ -21,7 +21,7 @@ class AiSearchResult:
# 描述
body: str = ''
# 发布时间
publish_time: str|int|float = ''
publish_time: str|int|float|datetime = ''
#是否被ai引用
is_referenced: str = '0'
#情感倾向" 1- 中立 2- 正面 3- 负面
@ -29,6 +29,10 @@ class AiSearchResult:
#情感类型
type = 0
def __post_init__(self):
if isinstance(self.publish_time, datetime):
self.publish_time = self.publish_time.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(self.publish_time, float):
self.publish_time = int(self.publish_time)

7
main.py

@ -13,6 +13,7 @@ from abs_spider import AbstractAiSeoSpider
from domain.ai_seo import AiAnswer
from spiders.ai_seo import *
from spiders.ai_seo.wenxiaoyan import WenxiaoyanSpider
from spiders.ai_seo.workflow import WorkFlowApiSpider
from utils.logger_utils import create_logger
from utils.ai import AiSeoApis
@ -30,6 +31,8 @@ SPIDER_CLS = {
8: WenxiaoyanSpider
}
WORKFLOW_PLATFORM_IDS = [2, 5]
async def init_browser() -> tuple:
"""
@ -46,7 +49,9 @@ async def init_browser() -> tuple:
return playwright, browser
def get_spider(platform_id, prompt, brand, browser) -> AbstractAiSeoSpider:
def get_spider(platform_id, prompt, brand, browser) -> AbstractAiSeoSpider|WorkFlowApiSpider:
if platform_id in WORKFLOW_PLATFORM_IDS:
return WorkFlowApiSpider(prompt, brand, platform_id)
cls = SPIDER_CLS.get(int(platform_id), None)
if not cls:
raise ValueError(f"未找到对应的爬虫类,platform_id={platform_id}")

100
run.py

@ -5,12 +5,12 @@ import uuid
from dataclasses import asdict
from datetime import datetime, timedelta, time
import requests
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from playwright.async_api import async_playwright, Browser
import config
from spiders.ai_seo import *
from spiders.ai_seo.workflow import WorkFlowApiSpider
from utils import create_logger
from utils.ai import read_rank
from utils.ai_seo_api_utils import AiSeoApis
@ -30,6 +30,7 @@ SPIDER_CLS = {
}
spider_pool: dict = {}
WORKFLOW_PLATFORM_IDS = [2, 5]
async def init_browser() -> tuple:
"""
@ -44,7 +45,7 @@ async def init_browser() -> tuple:
args=config.BROWSER_ARGS)
return playwright, browser
def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | TongyiSpider | YuanBaoSpider | KimiSpider | DouBaoSpider | YiYanSpider | NanometerSpider:
def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | TongyiSpider | YuanBaoSpider | KimiSpider | DouBaoSpider | YiYanSpider | NanometerSpider | WorkFlowApiSpider:
"""
ID获取相应的爬虫实例
@ -61,6 +62,8 @@ def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | T
- ValueError:
"""
# 根据平台ID获取对应的爬虫类
if str(platform_id) in config.AISEO_WORKFLOW_JOB_PLATFORM_IDS:
return WorkFlowApiSpider(prompt, brand, platform_id)
cls = SPIDER_CLS.get(int(platform_id), None)
# 如果没有找到对应的爬虫类,抛出异常
if not cls:
@ -188,6 +191,87 @@ async def ai_seo_job_with_timeout(browser, platform_ids, time_range, job_id, typ
await AiSeoApis.update_task_status(spider.task_id, 4)
logger.info(f"回滚任务状态: id: {spider.task_id}")
async def workflow_job(browser, platform_ids, time_range, type_name, run_id):
status, date = calc_task_date(time_range)
if not status:
# 是否有紧急任务
task_result = await AiSeoApis.get_urgent_task_count()
if task_result['count'] <= 0:
return
platform_str = ','.join(platform_ids)
# 获取任务信息
task_data = await AiSeoApis.get_one_task(date=date, platform_ids=platform_str)
if not task_data:
logger.info(f'[{type_name}]未获取到任务信息')
return
task_id = task_data['id']
logger.info(f"获取到{task_data['project_id']}项目任务: id: {task_data['id']} 平台id: {task_data['platform_id']} "
f"关键词: {task_data['keyword']} 品牌词: {task_data['brand']}")
# 记录开始时间
start_time = datetime.now()
# 创建爬虫实例
spider = get_spider(task_data['platform_id'], task_data['keyword'], task_data['brand'], browser)
# 记录任务id
spider.task_id = task_id
spider_pool[run_id] = spider
logger.info(f"RunId注册成功: TaskId: {task_id} 平台: {spider.platform_name}")
# 是否开启深度思考
if not spider:
await AiSeoApis.update_task_status(task_id, 5)
logger.error(
f"未找到对应的爬虫类 请检查任务信息: id: {task_data['id']} platform_id: {task_data['platform_id']}")
return
ai_answer = None
try:
# 运行爬虫并获取结果
ai_answer = await spider.run()
except Exception as e:
await AiSeoApis.update_task_status(task_id, 4)
logger.info(f"回滚任务状态: id: {task_id}")
spider_pool.pop(run_id, None)
return
if not ai_answer:
await AiSeoApis.update_task_status(task_id, 4)
logger.error(f"爬虫运行失败 id: {task_data['id']} platform_id: {task_data['platform_id']}")
spider_pool.pop(run_id, None)
return
# 记录结束时间
end_time = datetime.now()
# 提交爬虫结果
answer_data = asdict(ai_answer)
# 结果参数
answer_data = {
**config.AI_SEO_API_AUTH,
**answer_data,
'task_id': task_data['id'],
'rank': 0,
'start_time': start_time.strftime("%Y-%m-%d %H:%M:%S"),
'end_time': end_time.strftime("%Y-%m-%d %H:%M:%S"),
'screenshot_url':''
}
if not answer_data.get('answer', ''):
answer_data['answer'] = '未知'
answer_data['rank'] = 0
else:
brands, rank = await read_rank(answer_data['answer'], task_data['brand'])
answer_data['rank'] = rank
answer_data['words'] = brands
if len(answer_data.get('answer', '')) <= 20:
await AiSeoApis.update_task_status(task_id, 4)
logger.error(f"爬虫结果长度过短: id: {task_data['id']} platform_id: {task_data['platform_id']}")
logger.error(f"回滚任务状态: id: {task_id}")
logger.info(f"{answer_data.get('answer', '')}")
return
result = await AiSeoApis.submit_task(answer_data)
logger.debug(json.dumps(answer_data, ensure_ascii=False))
logger.info(f"任务提交成功: id: {task_data['id']}")
spider_pool.pop(run_id, None)
async def heartbeat(browser: Browser):
load_count = len(browser.contexts)
result = await AiSeoApis.heartbeat(config.DC_ID, load_count)
@ -237,7 +321,11 @@ async def check_session(platform_ids=None):
async def main():
# 初始化浏览器实例
if config.AI_SEO_JOB_ENABLE or config.DEEPSEEK_JOB_ENABLE:
playwright, browser = await init_browser()
else:
playwright = None
browser = None
logger.info('初始化浏览器成功')
if config.AI_SEO_JOB_ENABLE:
# 启动一般平台aiseo任务
@ -253,6 +341,14 @@ async def main():
args=[browser, config.DEEPSEEK_JOB_PLATFORM_IDS, config.DEEPSEEK_SEO_JOB_RANGE,
'deepseek_ai_seo_job', 'DeepSeek'])
logger.success('启动deepseek任务成功')
if config.AISEO_WORKFLOW_JOB_ENABLE:
# 启动工作流获取AI结果任务
scheduler.add_job(workflow_job, 'interval',
id='workflow_job', seconds=config.AISEO_WORKFLOW_JOB_INTERVAL,
max_instances=config.AISEO_WORKFLOW_JOB_MAX_INSTANCES, coalesce=False,
args=[browser, config.AISEO_WORKFLOW_JOB_PLATFORM_IDS, config.AI_SEO_JOB_RANGE,
'workflow_job', 'Workflow'])
logger.success('启动工作流获取AI结果任务成功')
# 启动心跳任务
# scheduler.add_job(heartbeat, 'interval', id='heartbeat', seconds=30,args=[browser])
# logger.info('启动心跳任务成功')

105
spiders/ai_seo/workflow.py

@ -0,0 +1,105 @@
# coding=utf-8
from datetime import datetime
import httpx
import config
from domain.ai_seo import AiAnswer, AiSearchResult
from utils import create_logger
# 平台信息
PLATFORMS = {
1: {'id': 1, 'name': 'Deepseek', 'api_key': ''},
2: {'id': 2, 'name': '通义千问', 'api_key': 'app-mQE0lOxB0G49r4tQSv2LgIOV'},
3: {'id': 3, 'name': '腾讯元宝', 'api_key': ''},
4: {'id': 4, 'name': 'Kimi', 'api_key': ''},
5: {'id': 5, 'name': '豆包', 'api_key': 'app-lD5HbD03EW7pamzIV2VEIyR6'},
6: {'id': 6, 'name': '文心一言', 'api_key': ''},
}
logger = create_logger(__name__)
class WorkFlowApiSpider:
platform_id: int
platform_name: str
prompt: str
keyword: str
ai_answer: AiAnswer | None = None
fail_status: bool = False
fail_exception: Exception | None = None
load_session: bool = True
task_id: int = 0
think: bool = False,
api_key: str = ''
def __init__(self, prompt: str, keyword: str, platform_id: int):
self.platform_id = platform_id
self.prompt = prompt
self.keyword = keyword
platform = PLATFORMS.get(int(platform_id), {})
if not platform:
raise Exception('平台不存在')
self.platform_name = platform.get('name', '')
self.api_key = platform.get('api_key', '')
if not self.api_key:
raise Exception('平台未配置api_key')
async def run(self):
logger.info(f"{self.platform_name}Api开始获取数据 提问词: {self.prompt}")
# 构建参数
params = {
"response_mode": "blocking",
"user": config.DIFY_USER,
"inputs": {
"prompt": self.prompt
}
}
headers = {
"Authorization": f"Bearer {self.api_key}"
}
# 发送请求
async with httpx.AsyncClient() as client:
response = await client.post(f"{config.DIFY_BASE_URL}/workflows/run", json=params, headers=headers, timeout=300)
json_result = response.json()
result = json_result.get('data', [])
if not result or not result['status'] == 'succeeded':
logger.error(f"{self.platform_name}Api获取数据失败: {json_result}")
raise Exception(f"{self.platform_name}Api获取数据失败")
# 获取工作流返回的数据
workflow_result = result['outputs']['work_flow_result']
# 用量数据
usage = workflow_result['usage']
# ai回复内容 带标签
answer = workflow_result['answer']
# ai回复内容 不带标签
pure_answer = workflow_result['pure_answer']
logger.debug(f"ai回复: {pure_answer}")
# 联网搜索结果
web_searches = workflow_result.get('web_search', [])
# 转换后的结果
search_items = []
for item in web_searches:
# 提取publish_time
if not item['datePublished']:
publish_time = None
else:
publish_time = datetime.strptime(item['datePublished'], "%Y-%m-%dT%H:%M:%S%z")
search_item = AiSearchResult(
title=item['name'],
url=item['url'],
host_name=item['siteName'],
body=item['summary'],
publish_time=publish_time,
is_referenced=item['is_ref'],
)
logger.debug(f"ai参考资料: [{search_item.host_name}]{search_item.title}({search_item.url})")
search_items.append(search_item)
# 组合结果
self.ai_answer = AiAnswer(self.platform_id, self.platform_name, self.prompt, self.keyword, answer, search_items, '', True)
logger.info(f"本次用量:\n总Token: {usage['total_tokens']}\n总资费: {round(float(usage['total_price']), 3)}")
return self.ai_answer
Loading…
Cancel
Save