You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
109 lines
3.9 KiB
109 lines
3.9 KiB
# -*- coding: utf-8 -*-
|
|
import datetime
|
|
import random
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from typing import Dict, List
|
|
|
|
import config
|
|
from models.monitor_task_db import MonitorTask
|
|
from spiders.xinhua.spider import XinHuaSpider
|
|
from spiders.renmin.spider import RenMinSpider
|
|
from spiders.yang_shi.spider import YangShiSpider
|
|
from utils.utils import logger
|
|
from base.base_spider import AbstractSpider
|
|
import utils.date_format as date_format
|
|
from base.enums import Platform
|
|
import logging
|
|
from datetime import timedelta
|
|
import copy
|
|
|
|
|
|
def singleton(cls):
|
|
instances = {}
|
|
|
|
def getinstance():
|
|
if cls not in instances:
|
|
instances[cls] = cls()
|
|
return instances[cls]
|
|
|
|
return getinstance
|
|
|
|
|
|
@singleton
|
|
class SchedulerManager:
|
|
scheduler: AsyncIOScheduler
|
|
|
|
def __init__(self):
|
|
# 调整调度器日志等级
|
|
ap_logger = logging.getLogger('apscheduler')
|
|
ap_logger.setLevel(logging.WARNING)
|
|
self.scheduler = AsyncIOScheduler()
|
|
|
|
def get_scheduler(self):
|
|
"""
|
|
获取调度器对象
|
|
:return:
|
|
"""
|
|
if not self.scheduler:
|
|
self.scheduler = AsyncIOScheduler()
|
|
return self.scheduler
|
|
|
|
def start(self, paused=False):
|
|
self.scheduler.start(paused)
|
|
|
|
def add_task(self, task: MonitorTask, offset=0, is_random=False):
|
|
"""
|
|
添加任务
|
|
:param is_random: 是否随机偏移
|
|
:param offset: 偏移多少秒后执行
|
|
:param task:
|
|
:return:
|
|
"""
|
|
scheduler = self.get_scheduler()
|
|
spider: AbstractSpider = None
|
|
if task.platform == Platform.XIN_HUA:
|
|
spider = XinHuaSpider()
|
|
elif task.platform == Platform.REN_MIN:
|
|
spider = RenMinSpider()
|
|
elif task.platform == Platform.YANG_SHI:
|
|
spider = YangShiSpider()
|
|
if not spider:
|
|
# logger.error(F"未知的平台: {task.platform} 任务id: {task.id}")
|
|
return
|
|
if not task.gather_time:
|
|
logger.error(F"[调度器]采集时间不存在 任务id: {task.id}")
|
|
if is_random:
|
|
offset = offset + random.randint(1, 29)
|
|
# 时间向后偏移
|
|
task_date_time = date_format.gen_job_datetime(task.gather_time)
|
|
task_date_time = task_date_time + timedelta(seconds=offset)
|
|
|
|
if task_date_time < datetime.datetime.now():
|
|
task_date_time = datetime.datetime.now() + datetime.timedelta(seconds=60)
|
|
# 添加定时任务
|
|
scheduler.add_job(spider.start, "date", run_date=task_date_time, kwargs={"task_id": task.id})
|
|
logger.info(
|
|
F"[调度器]注册定时任务 ID: {task.id} 执行时间: {task_date_time} {F'偏移{offset}秒后执行' if offset > 0 else ''}")
|
|
|
|
def add_tasks(self, tasks: List[MonitorTask], is_random=False):
|
|
# 按平台和关键词分组
|
|
group = {}
|
|
for task in tasks:
|
|
if task.platform not in group:
|
|
group[task.platform] = {}
|
|
if task.keyword not in group[task.platform]:
|
|
group[task.platform][task.keyword] = []
|
|
group[task.platform][task.keyword].append(task)
|
|
# 遍历每个关键词组
|
|
for platform, platform_group in group.items():
|
|
for keyword, task_list in platform_group.items():
|
|
sorted_task_list = sorted(task_list, key=lambda e: date_format.parse_time(task.gather_time))
|
|
# 判断最后一个任务是否在极限时间之前
|
|
if date_format.lt_time(sorted_task_list[-1].gather_time, config.MAX_GATHER_TIME):
|
|
# 创建一个补偿任务
|
|
new_task = copy.deepcopy(sorted_task_list[-1])
|
|
new_task.gather_time = config.MAX_GATHER_TIME
|
|
sorted_task_list.append(new_task)
|
|
for sorted_task in sorted_task_list:
|
|
self.add_task(sorted_task, 0, is_random)
|