You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
4.2 KiB

# -*- coding: utf-8 -*-
import json
from typing import Dict
from urllib.parse import urlencode
from .exception import DataFetchError
import httpx
from playwright.async_api import Page
from httpx._exceptions import HTTPError
from utils.utils import logger
import asyncio
import utils.proxy as proxy
import config
class XinHuaClient:
def __init__(self,
timeout=10,
*,
headers: Dict[str, str],
playwright_page: Page,
cookie_dict: Dict[str, str]):
self.timeout = timeout
self.headers = headers
self._host = "https://so.news.cn/"
self.playwright_page = playwright_page
self.cookie_dict = cookie_dict
async def request(self, method, url, **kwargs):
"""
请求方法
:param method: 请求方法
:param url: 地址
:param kwargs: 参数
:return: 返回结果
"""
# api代理
proxies = proxy.get_ip().to_httpx_proxies() if config.API_PROXY else None
try:
async with httpx.AsyncClient(proxies=proxies) as client:
response = await client.request(
method, url, timeout=self.timeout,
**kwargs
)
# 返回不正确的状态码
if not response.status_code == 200:
logger.error(F"[新华网]httpx异常[{response.status_code}]: [{method}]{url} 参数: {kwargs}")
raise DataFetchError("httpx异常", url, method, kwargs)
# 返回正确的状态码
data: Dict = response.json()
if data.get("code") != 200:
# 有特殊情况 敏感词会直接把content返回为没有找到相关稿件
if data.get("content") == '没有找到相关稿件':
logger.warning(F"[新华网]触发敏感词 跳过请求 参数: {kwargs}")
return {}
raise DataFetchError(data.get("content", "API未知错误"), url, method, kwargs)
else:
return data.get("content", {})
except HTTPError as e:
logger.error(F"[新华网]httpx异常: [{method}]{url} 参数: {kwargs}")
logger.error(F"[新华网]错误信息{str(e)}")
raise DataFetchError(str(e), url)
except Exception as e:
logger.error(F"[新华网]未知的请求方法异常: [{method}]{url} 参数: {kwargs}")
logger.error(F"[新华网]错误信息{str(e)}")
raise Exception(str(e))
async def get(self, uri: str, params=None) -> Dict:
"""
GET 请求方法
:param uri: 请求地址
:param params: 参数
:return: 返回结果
"""
final_uri = uri
if isinstance(params, dict):
final_uri = (f"{uri}?"
f"{urlencode(params)}")
return await self.request(method="GET", url=F"{self._host}{final_uri}", headers=self.headers)
async def post(self, uri: str, data: dict) -> Dict:
"""
POST 请求方法
:param uri: 请求地址
:param data: 参数
:return: 返回结果
"""
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=F"{self._host}{uri}",
data=json_str, headers=self.headers)
async def search(self, keyword, cur_page, lang='cn', sort_field=0, search_fields=0):
"""
搜索
:param lang:
:param keyword: 关键词
:param cur_page: 页码
:param sort_field: 排序 0: 相关度 1: 时间
:param search_fields: 搜索类型: 0: 全文 1: 标题
:return:
"""
# 接口地址
uri = '/getNews'
get_param = {
'keyword': keyword,
'curPage': cur_page,
'sortField': sort_field,
'searchFields': search_fields,
'lang': lang
}
content = await self.get(uri, get_param)
if not content or not content.get('results'):
return []
return content.get('results', [])