You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
201 lines
8.6 KiB
201 lines
8.6 KiB
# -*- coding: utf-8 -*-
|
|
|
|
from playwright.async_api import async_playwright, Page, BrowserType, BrowserContext
|
|
|
|
from base.base_spider import AbstractSpider
|
|
from typing import Dict, List, Optional, Tuple
|
|
from .client import XinHuaClient
|
|
from utils.utils import logger, is_blank
|
|
from models.monitor_task_model import get_task, running, complete, fail
|
|
from models.monitor_result_model import gen_result, save
|
|
from base.enums import Platform
|
|
import utils.date_format as date_format
|
|
import os
|
|
import config
|
|
import uuid
|
|
from .exception import DataFetchError
|
|
import asyncio
|
|
import utils.mail as mail
|
|
from tortoise.transactions import in_transaction
|
|
|
|
|
|
class XinHuaSpider(AbstractSpider):
|
|
"""
|
|
新华网爬虫
|
|
"""
|
|
client: XinHuaClient # 请求对象
|
|
context_page: Page # 浏览器页面上下文
|
|
browser_context: BrowserContext # 浏览器上下文
|
|
image_path: str
|
|
|
|
def __init__(self):
|
|
self.index_url = "http://www.xinhuanet.com/"
|
|
self.platform = Platform.XIN_HUA
|
|
self.image_path = None
|
|
self.retry = 0 # 自旋次数
|
|
self.context_page = None
|
|
|
|
def init_config(self):
|
|
super().init_config()
|
|
|
|
async def start(self, task_id):
|
|
try:
|
|
async with in_transaction():
|
|
await self.do_spider(task_id)
|
|
except DataFetchError as e:
|
|
logger.error(F"[新华网]任务ID: {task_id} 获取数据异常")
|
|
logger.error(F"[新华网]任务ID: {task_id} 异常信息: {str(e)}")
|
|
# 尝试自旋
|
|
self.retry = self.retry + 1
|
|
if self.retry > 3:
|
|
await fail(task_id)
|
|
logger.error(F"[新华网]任务ID: {task_id} 重试达到最大次数 即将发送告警邮件")
|
|
await mail.send_post_mail(task_id, "新华网", str(e))
|
|
else:
|
|
logger.info(F"[新华网]任务ID: {task_id} 20秒后进行第{self.retry}次重试")
|
|
await asyncio.sleep(20)
|
|
await self.do_spider(task_id)
|
|
except Exception as e:
|
|
logger.error(F"[新华网]任务ID: {task_id} 爬虫异常")
|
|
logger.error(F"[新华网]任务ID: {task_id} 异常信息: {str(e)}")
|
|
# 尝试自旋
|
|
self.retry = self.retry + 1
|
|
await fail(task_id)
|
|
if self.retry > 3:
|
|
logger.error(F"[新华网]任务ID: {task_id} 重试达到最大次数 即将发送告警邮件")
|
|
await mail.send_post_mail(task_id, "新华网", str(e))
|
|
else:
|
|
logger.info(F"[新华网]任务ID: {task_id} 20秒后进行第{self.retry}次重试")
|
|
await asyncio.sleep(20)
|
|
await self.do_spider(task_id)
|
|
|
|
async def create_xinhua_client(self, httpx_proxy: Optional[str]) -> XinHuaClient:
|
|
# 请求头
|
|
headers = {
|
|
"Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Encoding": "gzip, deflate, br, zstd",
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Cache-Control": "no-cache", "Connection": "keep-alive",
|
|
"Cookie": "org.springframework.web.servlet.i18n.CookieLocaleResolver.LOCALE=zh_CN; wdcid=7af5eba7b2f8b44b; arialoadData=false; acw_tc=2760778017108394678246790e1403779a009cc2c5fe412f126407bf171637",
|
|
"Host": "so.news.cn", "Pragma": "no-cache", "Referer": "https://so.news.cn/", "Sec-Fetch-Dest": "empty",
|
|
"Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin",
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|
"X-Requested-With": "XMLHttpRequest",
|
|
"sec-ch-ua": "\"Chromium\";v=\"122\", \"Not(A:Brand\";v=\"24\", \"Google Chrome\";v=\"122\"",
|
|
"sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": "\"Windows\""}
|
|
client = XinHuaClient(headers=headers, cookie_dict=None, playwright_page=self.context_page)
|
|
return client
|
|
|
|
async def launch_browser(self,
|
|
chromium: BrowserType,
|
|
playwright_proxy: Optional[Dict],
|
|
user_agent: Optional[Dict],
|
|
headless: bool = True):
|
|
"""
|
|
启动一个浏览器上下文
|
|
:param chromium:
|
|
:param headless:
|
|
:param self: 类型
|
|
:param playwright_proxy: 代理
|
|
:param user_agent: 用户标识
|
|
:return:
|
|
"""
|
|
# 浏览器对象
|
|
browser = await chromium.launch(proxy=playwright_proxy, headless=headless)
|
|
|
|
# 浏览器上下文
|
|
browser_context = await browser.new_context(
|
|
viewport={"width": 1920, "height": 1080},
|
|
user_agent=user_agent
|
|
)
|
|
return browser_context
|
|
|
|
async def do_search(self, task):
|
|
"""
|
|
获取任务信息
|
|
:return:
|
|
"""
|
|
results = []
|
|
cur_page = 1
|
|
logger.info(F"[新华网]开始执行任务 ID: {task.id} 关键词: {task.keyword} 语言: {task.lang}")
|
|
self.client = await self.create_xinhua_client(None)
|
|
while True:
|
|
logger.info(F"[新华网]开始获取搜索结果 关键词: {task.keyword} 页码: {cur_page}")
|
|
search_datas = await self.client.search(keyword=task.keyword, cur_page=cur_page, lang=task.lang)
|
|
logger.info(F"[新华网]获取到{len(search_datas)}条搜索结果")
|
|
if not search_datas:
|
|
logger.info(F"[新华网]关键词: {task.keyword} 页码: {cur_page}没有搜索到数据")
|
|
break
|
|
index = -1
|
|
for i, data in enumerate(search_datas):
|
|
# 找到一个不是今天的数据就结束
|
|
if not date_format.is_today(data.get("pubtime")):
|
|
index = i
|
|
break
|
|
# 如果全都是今天的 就翻页
|
|
if index == -1:
|
|
# 搜索结果的最后一个依然是今天的 整个添加
|
|
results = results + search_datas
|
|
# 翻到下一页 继续找
|
|
cur_page = cur_page + 1
|
|
else:
|
|
# 搜索结果中有不是今天的 切割一部分添加
|
|
results = results + search_datas[:index]
|
|
# 结束本次搜索
|
|
break
|
|
logger.info(F"[新华网]关键词: {task.keyword} 搜索结束 总页码: {cur_page} 总条数: {len(results)}")
|
|
return results
|
|
|
|
async def cut_screen(self, url):
|
|
"""
|
|
网页截图
|
|
:param url: 地址
|
|
:return:
|
|
"""
|
|
if not self.image_path:
|
|
image_path = config.IMAGE_PATH
|
|
if is_blank(image_path):
|
|
self.image_path = "./data"
|
|
if not os.path.exists(self.image_path):
|
|
os.makedirs(self.image_path)
|
|
save_path = F"{self.image_path}/{uuid.uuid4()}.png"
|
|
# 开始截图
|
|
await self.context_page.goto(url)
|
|
await self.context_page.screenshot(path=save_path, full_page=True)
|
|
return save_path
|
|
|
|
async def do_spider(self, task_id):
|
|
# 获取任务信息
|
|
task = await get_task(task_id)
|
|
if not task:
|
|
logger.error(F"[新华网]任务ID: {task_id}不存在 任务结束")
|
|
return
|
|
logger.info(F"[新华网]任务ID: {task_id} 任务开始")
|
|
await running(task_id)
|
|
# 从api中获取数据
|
|
search_datas = await self.do_search(task)
|
|
if not search_datas:
|
|
logger.info(F"[新华网]任务ID: {task_id} 关键词:{task.keyword} 未搜索到结果 任务结束")
|
|
await complete(task_id)
|
|
return
|
|
# 保存result实体
|
|
results = []
|
|
# 启动浏览器
|
|
async with async_playwright() as playwright:
|
|
chromium = playwright.chromium
|
|
self.browser_context = await self.launch_browser(chromium, None, None, headless=True)
|
|
# 反反爬脚本
|
|
await self.browser_context.add_init_script(path="lib/stealth.min.js")
|
|
self.context_page: Page = await self.browser_context.new_page()
|
|
|
|
# 构建结果实体 截图
|
|
for data in search_datas:
|
|
result = gen_result(task, data.get("title"), data.get("url"), data.get("pubtime"))
|
|
# img_path = await self.cut_screen(data.get("url"))
|
|
# result.image = img_path
|
|
results.append(result)
|
|
# logger.info(F"[新华网]标题: {data.get('title')} 截图文件名: {img_path}")
|
|
|
|
# 结果落库
|
|
await save(results)
|
|
logger.info(F"[新华网]任务ID: {task_id} 关键词: {task.keyword} 保存{len(results)}条数据 任务结束")
|
|
await complete(task_id)
|
|
|