You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
2.6 KiB

# -*- coding: utf-8 -*-
import argparse
import asyncio
import os
import sys
import config
import db
import utils.date_format as date_format
from base.enums import Platform
from models import monitor_task_model
from utils.scheduler import SchedulerManager
from utils.utils import logger
def task_group(tasks):
groups = {}
for name, enum in Platform.__members__.items():
groups[enum.value] = []
for task in tasks:
if task.platform in groups:
groups[task.platform].append(task)
return list(groups.values())
async def do_get_task_job():
"""
获取任务信息
:return:
"""
await db.init()
tasks = await monitor_task_model.get_today_task()
if not tasks:
logger.info(F"没有获取到任务信息")
return
# 分组
# groups = task_group(tasks)
# random.shuffle(groups)
schedular_manager = SchedulerManager()
logger.info(F"============================== 获取到{len(tasks)}条任务信息 ==============================")
schedular_manager.add_tasks(tasks, True)
def restart():
os.execl(sys.executable, sys.executable, *sys.argv)
def load_arg_parse():
"""
解析启动参数
:return:
"""
parse = argparse.ArgumentParser(description="抓取社媒新闻数据")
parse.add_argument("-a", "--active", help="启动脚本时 立即进行一次任务拉取", default='false')
args = parse.parse_args()
logger.info(F"启动参数: {args}")
return args
def clear_system_proxy():
# 清除系统代理相关的环境变量
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)
os.environ.pop('ftp_proxy', None)
os.environ.pop('no_proxy', None)
if __name__ == '__main__':
try:
clear_system_proxy()
logger.info(F'启动成功 将在每天的{config.GET_TASK_TIME}拉取任务信息')
get_task_time = date_format.gen_job_datetime(config.GET_TASK_TIME)
manager = SchedulerManager()
# 启动定时任务
manager.start()
# 添加拉取任务信息的任务
manager.scheduler.add_job(do_get_task_job, 'cron', hour=get_task_time.hour, minute=get_task_time.minute)
manager.scheduler.add_job(restart, 'cron', hour=get_task_time.hour, minute=0)
# 参数检查
args = load_arg_parse()
if args.active and args.active.lower() == 'true':
logger.info(F"立即执行一次任务拉取...")
asyncio.get_event_loop().run_until_complete(do_get_task_job())
# 开启事件循环
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
sys.exit()