# -*- coding: utf-8 -*- import json from typing import Dict from urllib.parse import urlencode import httpx from playwright.async_api import Page from .exception import DataFetchError import asyncio import json import utils.date_format as date_format from utils.utils import count_characters from playwright.async_api import async_playwright import asyncio from utils.utils import logger class YangShiClient: def __init__(self, timeout=60, proxies=None, *, playwright_page: Page, cookie_dict: Dict[str, str]): self.proxies = proxies self.timeout = timeout self.headers = { "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Cache-Control": "no-cache", "Connection": "keep-alive", "Cookie": "__jsluid_h=103d2323e283c476b59b2fdd3b9a5371; sso_c=0; sfr=1", "Host": "search.people.cn", "Content-Length": "163", "Content-Type": "application/json", "Origin": "http://search.people.cn", "Pragma": "no-cache", "Referer": "http://search.people.cn/s?keyword=%E4%B9%A1%E6%9D%91%E6%8C%AF%E5%85%B4&st=0&_=1710919073824", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" } self._host = "https://search.cctv.com/" self.playwright_page = playwright_page self.cookie_dict = cookie_dict async def request(self, method, url, **kwargs): """ 请求方法 :param method: 请求方法 :param url: 地址 :param kwargs: 参数 :return: 返回结果 """ async with httpx.AsyncClient(proxies=self.proxies) as client: response = await client.request( method, url, timeout=self.timeout, **kwargs ) data: Dict = response.json() if data.get("code") != "0": raise DataFetchError(data.get("message", "未知错误")) else: return data.get("data", {}) async def get(self, uri: str, params=None) -> Dict: """ GET 请求方法 :param uri: 请求地址 :param params: 参数 :return: 返回结果 """ final_uri = uri if isinstance(params, dict): final_uri = (f"{uri}?" f"{urlencode(params)}") return await self.request(method="GET", url=F"{self._host}{final_uri}", headers=self.headers) async def post(self, uri: str, data: dict) -> Dict: """ POST 请求方法 :param uri: 请求地址 :param data: 参数 :return: 返回结果 """ json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) return await self.request(method="POST", url=F"{self._host}{uri}", data=json_str, headers=self.headers) async def search(self, keyword, cur_page): """ 搜索 :param keyword: 关键词 :param cur_page: 页码 :return: """ # 接口地址 uri = F"/search.php?qtext={keyword}&page={cur_page}&type=web&sort=date&datepid=1&channel=&vtime=-1&is_search=1" full_url = F"{self._host}{uri}" try: await self.playwright_page.goto(full_url) results = [] # 选择每一个结果元素 elements = await self.playwright_page.query_selector_all("div.tright") for element in elements: title = "" url = "" publish_time = "" # 标题元素 tit = await element.query_selector(".tit") if tit: # 标题下面的链接 span = await tit.query_selector("span") url = await span.get_attribute("lanmu1") # 存放标题的a标签 tit_a = await span.query_selector("a") if tit_a: title = await tit_a.inner_text() # 发布时间元素 tim = await element.query_selector(".src-tim .tim") if tim: tim_text = await tim.inner_text() publish_time = tim_text.split(":")[1] # 保存数据 results.append({ "keyword": keyword, "title": title, "url": url, "publish_time": publish_time }) return results except Exception as e: logger.error(F"[央视网]搜索方法异常: 关键词: {keyword} 页码: {cur_page} {full_url}") logger.error(F"[央视网]错误信息: {str(e)}") raise DataFetchError(str(e), full_url) async def run(): async with async_playwright() as playwright: # 启动浏览器 async with async_playwright() as playwright: chromium = playwright.chromium browser = await chromium.launch(headless=False) # 浏览器上下文 browser_context = await browser.new_context( viewport={"width": 1920, "height": 1080}, user_agent="" ) # 反反爬脚本 await browser_context.add_init_script(path="../../lib/stealth.min.js") context_page: Page = await browser_context.new_page() # 创建对象 client = YangShiClient(playwright_page=context_page, cookie_dict={}) result = await client.search("医保", 1) print(result) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(run())