You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
3.9 KiB

# -*- coding: utf-8 -*-
import datetime
import random
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from typing import Dict, List
import config
from models.monitor_task_db import MonitorTask
from spiders.xinhua.spider import XinHuaSpider
from spiders.renmin.spider import RenMinSpider
from spiders.yang_shi.spider import YangShiSpider
from utils.utils import logger
from base.base_spider import AbstractSpider
import utils.date_format as date_format
from base.enums import Platform
import logging
from datetime import timedelta
import copy
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance
@singleton
class SchedulerManager:
scheduler: AsyncIOScheduler
def __init__(self):
# 调整调度器日志等级
ap_logger = logging.getLogger('apscheduler')
ap_logger.setLevel(logging.WARNING)
self.scheduler = AsyncIOScheduler()
def get_scheduler(self):
"""
获取调度器对象
:return:
"""
if not self.scheduler:
self.scheduler = AsyncIOScheduler()
return self.scheduler
def start(self, paused=False):
self.scheduler.start(paused)
def add_task(self, task: MonitorTask, offset=0, is_random=False):
"""
添加任务
:param is_random: 是否随机偏移
:param offset: 偏移多少秒后执行
:param task:
:return:
"""
scheduler = self.get_scheduler()
spider: AbstractSpider = None
if task.platform == Platform.XIN_HUA:
spider = XinHuaSpider()
elif task.platform == Platform.REN_MIN:
spider = RenMinSpider()
elif task.platform == Platform.YANG_SHI:
spider = YangShiSpider()
if not spider:
# logger.error(F"未知的平台: {task.platform} 任务id: {task.id}")
return
if not task.gather_time:
logger.error(F"[调度器]采集时间不存在 任务id: {task.id}")
if is_random:
offset = offset + random.randint(1, 29)
# 时间向后偏移
task_date_time = date_format.gen_job_datetime(task.gather_time)
task_date_time = task_date_time + timedelta(seconds=offset)
if task_date_time < datetime.datetime.now():
task_date_time = datetime.datetime.now() + datetime.timedelta(seconds=60)
# 添加定时任务
scheduler.add_job(spider.start, "date", run_date=task_date_time, kwargs={"task_id": task.id})
logger.info(
F"[调度器]注册定时任务 ID: {task.id} 执行时间: {task_date_time} {F'偏移{offset}秒后执行' if offset > 0 else ''}")
def add_tasks(self, tasks: List[MonitorTask], is_random=False):
# 按平台和关键词分组
group = {}
for task in tasks:
if task.platform not in group:
group[task.platform] = {}
if task.keyword not in group[task.platform]:
group[task.platform][task.keyword] = []
group[task.platform][task.keyword].append(task)
# 遍历每个关键词组
for platform, platform_group in group.items():
for keyword, task_list in platform_group.items():
sorted_task_list = sorted(task_list, key=lambda e: date_format.parse_time(task.gather_time))
# 判断最后一个任务是否在极限时间之前
if date_format.lt_time(sorted_task_list[-1].gather_time, config.MAX_GATHER_TIME):
# 创建一个补偿任务
new_task = copy.deepcopy(sorted_task_list[-1])
new_task.gather_time = config.MAX_GATHER_TIME
sorted_task_list.append(new_task)
for sorted_task in sorted_task_list:
self.add_task(sorted_task, 0, is_random)