You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
7.6 KiB

# -*- coding: utf-8 -*-
from playwright.async_api import async_playwright, Page, BrowserType, BrowserContext
from base.base_spider import AbstractSpider
from typing import Dict, List, Optional, Tuple
from .client import RenMinClient
from utils.utils import logger, is_blank
from models.monitor_task_model import get_task, running, complete, fail
from models.monitor_result_model import gen_result, save
from base.enums import Platform
import utils.date_format as date_format
import os
import config
import uuid
from .exception import DataFetchError
import utils.mail as mail
import asyncio
from tortoise.transactions import in_transaction
class RenMinSpider(AbstractSpider):
"""
人民网爬虫
"""
client: RenMinClient # 请求对象
context_page: Page # 浏览器页面上下文
browser_context: BrowserContext # 浏览器上下文
image_path: str
def __init__(self):
self.index_url = "http://www.people.com.cn/"
self.platform = Platform.REN_MIN
self.image_path = None
self.retry = 0 # 自旋次数
def init_config(self):
super().init_config()
async def start(self, task_id):
try:
async with in_transaction():
await self.do_spider(task_id)
except DataFetchError as e:
logger.error(F"[人民网]任务ID: {task_id} 获取数据异常")
logger.error(F"[人民网]任务ID: {task_id} 异常信息: {str(e)}")
# 尝试自旋
self.retry = self.retry + 1
if self.retry > 3:
await fail(task_id)
logger.error(F"[人民网]任务ID: {task_id} 重试达到最大次数 即将发送告警邮件")
await mail.send_post_mail(task_id, "人民网", str(e))
else:
logger.info(F"[人民网]任务ID: {task_id} 20秒后进行第{self.retry}次重试")
await asyncio.sleep(20)
await self.do_spider(task_id)
except Exception as e:
logger.error(F"[人民网]任务ID: {task_id} 爬虫异常")
logger.error(F"[人民网]任务ID: {task_id} 异常信息: {str(e)}")
# 切换代理ip并自旋
# 尝试自旋
self.retry = self.retry + 1
if self.retry > 3:
await fail(task_id)
logger.error(F"[人民网]任务ID: {task_id} 重试达到最大次数 即将发送告警邮件")
await mail.send_post_mail(task_id, "人民网", str(e))
else:
logger.info(F"[人民网]任务ID: {task_id} 20秒后进行第{self.retry}次重试")
await asyncio.sleep(20)
await self.do_spider(task_id)
async def create_client(self) -> RenMinClient:
return RenMinClient(playwright_page=None, cookie_dict={})
async def launch_browser(self,
chromium: BrowserType,
playwright_proxy: Optional[Dict],
user_agent: Optional[Dict],
headless: bool = True):
"""
启动一个浏览器上下文
:param chromium:
:param headless:
:param self: 类型
:param playwright_proxy: 代理
:param user_agent: 用户标识
:return:
"""
# 浏览器对象
browser = await chromium.launch(proxy=playwright_proxy, headless=headless)
# 浏览器上下文
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent=user_agent
)
return browser_context
async def do_search(self, task):
"""
获取任务信息
:return:
"""
start, end = date_format.today_timestamp_long() # 开始结束时间
results = []
cur_page = 1
logger.info(F"[人民网]开始执行任务 ID: {task.id} 关键词: {task.keyword} 语言: {task.lang}")
self.client = await self.create_client()
while True:
logger.info(F"[人民网]开始获取搜索结果 关键词: {task.keyword} 页码: {cur_page}")
search_datas = await self.client.search(task.keyword, cur_page)
logger.info(F"[人民网]获取到{len(search_datas)}条搜索结果")
if not search_datas:
logger.info(F"[人民网]关键词: {task.keyword} 页码: {cur_page}没有搜索到数据")
break
index = -1
for i, data in enumerate(search_datas):
# 找到一个不是今天的数据就结束
if not date_format.is_today(date_format.timestamp2date(data.get("displayTime")).strftime("%Y-%m-%d")):
index = i
break
# 切割
if index == -1:
# 搜索结果的最后一个依然是今天的 整个添加
results = results + search_datas
# 翻到下一页 继续找
cur_page = cur_page + 1
else:
# 搜索结果中有不是今天的 切割一部分添加
results = results + search_datas[:index]
# 结束本次搜索
break
logger.info(F"[人民网]关键词:{task.keyword} 搜索结束 总页码: {cur_page} 总条数: {len(results)}")
return results
async def cut_screen(self, url):
"""
网页截图
:param url: 地址
:return:
"""
if not self.image_path:
image_path = config.IMAGE_PATH
if is_blank(image_path):
self.image_path = "./data"
if not os.path.exists(self.image_path):
os.makedirs(self.image_path)
save_path = F"{self.image_path}/{uuid.uuid4()}.png"
# 开始截图
await self.context_page.goto(url)
await self.context_page.screenshot(path=save_path, full_page=True)
return save_path
async def do_spider(self, task_id):
# 获取任务信息
task = await get_task(task_id)
if not task:
logger.error(F"[人民网]任务ID: {task_id}不存在 任务结束")
return
logger.info(F"[人民网]任务ID: {task_id} 任务开始")
await running(task_id)
# 从api中获取数据
search_datas = await self.do_search(task)
if not search_datas:
logger.info(F"[人民网]任务ID: {task_id} 关键词:{task.keyword} 未搜索到结果 任务结束")
await complete(task_id)
return
# 保存result实体
results = []
# 启动浏览器
async with async_playwright() as playwright:
chromium = playwright.chromium
self.browser_context = await self.launch_browser(chromium, None, None, headless=True)
# 反反爬脚本
await self.browser_context.add_init_script(path="lib/stealth.min.js")
self.context_page: Page = await self.browser_context.new_page()
# 构建结果实体 截图
for data in search_datas:
result = gen_result(task, data.get("title"), data.get("url"), int(data.get("displayTime") / 1000))
# img_path = await self.cut_screen(data.get("url"))
# result.image = img_path
results.append(result)
# logger.info(F"[人民网]标题: {data.get('title')} 截图文件名: {img_path}")
# 结果落库
await save(results)
logger.info(F"[人民网]任务ID: {task_id} 关键词: {task.keyword} 保存{len(results)}条数据 任务结束")
await complete(task_id)