You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
262 lines
9.9 KiB
262 lines
9.9 KiB
# coding=utf-8
|
|
import asyncio
|
|
import uuid
|
|
from dataclasses import asdict
|
|
from datetime import datetime, timedelta
|
|
|
|
import requests
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from playwright.async_api import async_playwright, Browser
|
|
|
|
import config
|
|
from spiders.ai_seo import *
|
|
from utils import create_logger
|
|
from utils.ai import read_rank
|
|
from utils.ai_seo_api_utils import AiSeoApis
|
|
|
|
logger = create_logger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
SPIDER_CLS = {
|
|
1: DeepseekSpider,
|
|
2: TongyiSpider,
|
|
3: YuanBaoSpider,
|
|
4: KimiSpider,
|
|
5: DouBaoSpider,
|
|
6: YiYanSpider,
|
|
7: NanometerSpider,
|
|
13: MetasoSpider
|
|
}
|
|
|
|
spider_pool: dict = {}
|
|
|
|
async def init_browser() -> tuple:
|
|
"""
|
|
初始化浏览器实例
|
|
:return:
|
|
"""
|
|
playwright = await async_playwright().start()
|
|
browser = await playwright.chromium.launch(headless=config.BROWSER_HANDLESS,
|
|
chromium_sandbox=config.BROWSER_ENABLE_SANDBOX,
|
|
ignore_default_args=config.BROWSER_IGNORE_DEFAULT_ARGS,
|
|
channel=config.BROWSER_CHANNEL,
|
|
args=config.BROWSER_ARGS)
|
|
return playwright, browser
|
|
|
|
def get_spider(platform_id, prompt, brand, browser) -> None | DeepseekSpider | TongyiSpider | YuanBaoSpider | KimiSpider | DouBaoSpider | YiYanSpider | NanometerSpider:
|
|
"""
|
|
根据平台ID获取相应的爬虫实例。
|
|
|
|
参数:
|
|
- platform_id: 平台标识符,用于选择合适的爬虫类。
|
|
- prompt: 用户查询提示,用于爬虫处理。
|
|
- brand: 品牌信息,用于爬虫处理。
|
|
- browser: 浏览器实例,供爬虫使用。
|
|
|
|
返回:
|
|
- AbstractAiSeoSpider: 返回一个抽象的AI SEO爬虫实例。
|
|
|
|
异常:
|
|
- ValueError: 如果未找到对应的爬虫类,则抛出此异常。
|
|
"""
|
|
# 根据平台ID获取对应的爬虫类
|
|
cls = SPIDER_CLS.get(int(platform_id), None)
|
|
# 如果没有找到对应的爬虫类,抛出异常
|
|
if not cls:
|
|
return None
|
|
# 创建并返回爬虫实例
|
|
return cls(browser, prompt, brand)
|
|
|
|
|
|
async def ai_seo_job(browser, platform_ids, time_range, job_id, type_name, run_id):
|
|
status, date = calc_task_date(time_range)
|
|
|
|
# if not status:
|
|
# # 是否有紧急任务
|
|
# task_result = await AiSeoApis.get_urgent_task_count()
|
|
# if task_result['count'] <= 0:
|
|
# return
|
|
current_job = scheduler.get_job(job_id)
|
|
# current_job.pause()
|
|
platform_str = ','.join(platform_ids)
|
|
# 获取任务信息
|
|
task_data = await AiSeoApis.get_one_task(date=date, platform_ids=platform_str)
|
|
# task_data = {
|
|
# "id": "778877538494716115",
|
|
# "project_id": "778865951931242406",
|
|
# "keyword_id": "778865954200359933",
|
|
# "keyword": "考研班哪个机构好",
|
|
# "brand": "高顿教育",
|
|
# "platform_id": "13",
|
|
# "gather_date": "2025-06-26",
|
|
# "gather_time": "00:00",
|
|
# "gather_filter": "2025-06-26 00:00:00",
|
|
# "status": 2,
|
|
# "retry_count": 0,
|
|
# "screen_flag": 2,
|
|
# "thinking": 1,
|
|
# "is_deal": 1,
|
|
# "screen_url": "",
|
|
# "priority": 999,
|
|
# "start_time": "null",
|
|
# "end_time": "null",
|
|
# "create_time": "2025-06-26 15:09:24",
|
|
# "update_time": "2025-06-26 18:19:55",
|
|
# "delete_time": 0,
|
|
# "create_by": "777786292267261539",
|
|
# "update_by": "777786292267261539"
|
|
# }
|
|
if not task_data:
|
|
logger.info(f'[{type_name}]未获取到任务信息')
|
|
# current_job.resume()
|
|
return
|
|
task_id = task_data['id']
|
|
logger.info(f"获取到{task_data['project_id']}项目任务: id: {task_data['id']} 平台id: {task_data['platform_id']} "
|
|
f"关键词: {task_data['keyword']} 品牌词: {task_data['brand']}")
|
|
|
|
# 记录开始时间
|
|
start_time = datetime.now()
|
|
# 创建爬虫实例
|
|
spider = get_spider(task_data['platform_id'], task_data['keyword'], task_data['brand'], browser)
|
|
# 记录任务id
|
|
spider.task_id = task_id
|
|
spider_pool[run_id] = spider
|
|
logger.info(f"RunId注册成功: TaskId: {task_id} 平台: {spider.platform_name}")
|
|
# 是否开启深度思考
|
|
if task_data['thinking'] == 1:
|
|
spider.think = True
|
|
if not spider:
|
|
await AiSeoApis.update_task_status(task_id, 5)
|
|
logger.error(f"未找到对应的爬虫类 请检查任务信息: id: {task_data['id']} platform_id: {task_data['platform_id']}")
|
|
return
|
|
ai_answer = None
|
|
try:
|
|
# 运行爬虫并获取结果
|
|
ai_answer = await spider.run()
|
|
except Exception as e:
|
|
await AiSeoApis.update_task_status(task_id, 4)
|
|
logger.info(f"回滚任务状态: id: {task_id}")
|
|
spider_pool.pop(run_id, None)
|
|
return
|
|
if not ai_answer:
|
|
await AiSeoApis.update_task_status(task_id, 4)
|
|
logger.error(f"爬虫运行失败 id: {task_data['id']} platform_id: {task_data['platform_id']}")
|
|
spider_pool.pop(run_id, None)
|
|
return
|
|
# 记录结束时间
|
|
end_time = datetime.now()
|
|
|
|
# 提交爬虫结果
|
|
answer_data = asdict(ai_answer)
|
|
# 上传截图
|
|
upload_data = await AiSeoApis.upload_screenshot_file(answer_data['screenshot_file'])
|
|
# 结果参数
|
|
answer_data = {
|
|
**config.AI_SEO_API_AUTH,
|
|
**answer_data,
|
|
'task_id': task_data['id'],
|
|
'rank': 0,
|
|
'start_time': start_time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
'end_time': end_time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
'screenshot_url': upload_data['url']
|
|
}
|
|
if not answer_data.get('answer', ''):
|
|
answer_data['answer'] = '未知'
|
|
answer_data['rank'] = 0
|
|
else:
|
|
brands, rank = await read_rank(answer_data['answer'], task_data['brand'])
|
|
answer_data['rank'] = rank
|
|
answer_data['words'] = brands
|
|
# print('answer_data',answer_data)
|
|
# search_results = list()
|
|
# for data in answer_data.get("search_result"):
|
|
# data_ = {**config.AI_SEO_API_AUTH,"content": data.get("title")}
|
|
# rest = ''
|
|
# try:
|
|
# resp = requests.post(url='https://geo-api.neicela.com/api/third/getSentimentType',json=data_,timeout=600)
|
|
# # print(resp.text)
|
|
# rest = resp.json()
|
|
# except Exception as e:
|
|
# print(str(e))
|
|
# # logger.info("调用getSentimentType接口出现异常:",str(e))
|
|
#
|
|
# print("rest",rest)
|
|
# if rest.get("code") == 0:
|
|
# data.update(rest.get("data"))
|
|
# search_results.append(data)
|
|
# answer_data['search_result'] = search_results
|
|
result = await AiSeoApis.submit_task(answer_data)
|
|
logger.info(f"任务提交成功: id: {task_data['id']}")
|
|
spider_pool.pop(run_id, None)
|
|
|
|
async def ai_seo_job_with_timeout(browser, platform_ids, time_range, job_id, type_name, timeout=1200):
|
|
# 生成一个唯一的run_id
|
|
run_id = str(uuid.uuid4()).replace("-", "")
|
|
try:
|
|
await asyncio.wait_for(ai_seo_job(browser, platform_ids, time_range, job_id, type_name, run_id), timeout)
|
|
except asyncio.TimeoutError:
|
|
spider = spider_pool.get(run_id, None)
|
|
if spider:
|
|
await spider._close()
|
|
logger.error(f"任务超时: 平台: {spider.platform_id}")
|
|
spider_pool.pop(run_id, None)
|
|
await AiSeoApis.update_task_status(spider.task_id, 4)
|
|
logger.info(f"回滚任务状态: id: {spider.task_id}")
|
|
|
|
async def heartbeat(browser: Browser):
|
|
load_count = len(browser.contexts)
|
|
result = await AiSeoApis.heartbeat(config.DC_ID, load_count)
|
|
logger.success(f"心跳: 机器id: {config.DC_ID} 负载量: {load_count} 发送时间: {result.get('send_time', '')}")
|
|
|
|
def calc_task_date(time_range):
|
|
|
|
# 解析时间
|
|
start_time = datetime.strptime(time_range['start_time'], "%H:%M").time()
|
|
end_time = datetime.strptime(time_range['end_time'], "%H:%M").time()
|
|
|
|
# 获取带时区的当前时间
|
|
now = datetime.now()
|
|
current_time = now.time()
|
|
|
|
# 判断逻辑
|
|
if end_time < start_time:
|
|
# 跨天时间段
|
|
if current_time >= start_time or current_time <= end_time:
|
|
# 如果当前时间在次日结束时间前,开始日期是昨天
|
|
start_date = (now - timedelta(days=1)).date() if current_time <= end_time else now.date()
|
|
return True, start_date.strftime("%Y-%m-%d")
|
|
else:
|
|
# 非跨天时间段
|
|
if start_time <= current_time <= end_time:
|
|
return True, now.date().strftime("%Y-%m-%d")
|
|
|
|
return False, None
|
|
|
|
async def main():
|
|
# 初始化浏览器实例
|
|
playwright, browser = await init_browser()
|
|
logger.info('初始化浏览器成功')
|
|
if config.AI_SEO_JOB_ENABLE:
|
|
# 启动一般平台aiseo任务
|
|
scheduler.add_job(ai_seo_job_with_timeout, 'interval',
|
|
id='ai_seo_job', seconds=config.AI_SEO_JOB_INTERVAL, max_instances=config.AI_SEO_JOB_MAX_INSTANCES, coalesce=False,
|
|
args=[browser, config.AI_SEO_JOB_PLATFORM_IDS, config.AI_SEO_JOB_RANGE, 'ai_seo_job', '一般AI平台'])
|
|
logger.success('启动一般AI平台任务成功')
|
|
if config.DEEPSEEK_JOB_ENABLE:
|
|
# 启动deepseek-aiseo任务
|
|
scheduler.add_job(ai_seo_job_with_timeout, 'interval',
|
|
id='deepseek_ai_seo_job', seconds=config.DEEPSEEK_JOB_INTERVAL,
|
|
max_instances=config.DEEPSEEK_JOB_MAX_INSTANCES, coalesce=False,
|
|
args=[browser, config.DEEPSEEK_JOB_PLATFORM_IDS, config.DEEPSEEK_SEO_JOB_RANGE,
|
|
'deepseek_ai_seo_job', 'DeepSeek'])
|
|
logger.success('启动deepseek任务成功')
|
|
# 启动心跳任务
|
|
# scheduler.add_job(heartbeat, 'interval', id='heartbeat', seconds=30,args=[browser])
|
|
# logger.info('启动心跳任务成功')
|
|
scheduler.start()
|
|
await asyncio.Future() # 保持事件循环运行
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main()) # 启动事件循环
|
|
|