You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
9.4 KiB

# coding=utf-8
import datetime
import json
import os
import httpx
import config
from utils import create_logger
logger = create_logger(__name__)
class AiSeoApis:
@staticmethod
def build_full_url(uri):
return f"{config.AI_SEO_BASE_URL}{uri}"
@staticmethod
async def get_one_task(date='', platform_ids=''):
"""
获取一个任务
:return:
"""
uri = '/api/third/getTask'
url = AiSeoApis.build_full_url(uri)
params = {**config.AI_SEO_API_AUTH}
if date:
params['date'] = date
if platform_ids:
params['platform_ids'] = platform_ids
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取任务失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def get_urgent_task_count():
"""
获取紧急任务数量
:return:
"""
uri = '/api/frontend/thirdParty/getUrgentTaskCount'
url = AiSeoApis.build_full_url(uri)
params = {**config.AI_SEO_API_AUTH}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取任务失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def upload_screenshot_file(file_path):
"""
上传截图文件
:param file_path:
:return:
"""
uri = '/api/third/oss/upload'
url = AiSeoApis.build_full_url(uri)
params = {
**config.AI_SEO_API_AUTH,
'oss_path': 'ai_seo/screenshot'
}
with open(file_path, 'rb') as file:
async with httpx.AsyncClient() as client:
files = {'file': (file_path, file, 'image/jpeg')}
response = await client.post(url, params=params, files=files, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取任务失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def submit_task(json_data):
"""
提交任务
:param json_data:
:return:
"""
uri = '/api/third/submitProjectTask'
url = AiSeoApis.build_full_url(uri)
async with httpx.AsyncClient() as client:
print("json_data",json.dumps(json_data))
response = await client.post(url, json=json_data, timeout=120)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取任务失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def get_task_result_list(project_id):
"""
获取任务结果列表
:return:
"""
uri = '/api/frontend/thirdParty/projectResult/list'
url = AiSeoApis.build_full_url(uri)
params = {**config.AI_SEO_API_AUTH, 'project_id': project_id}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取任务失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def update_result_rank(result_id, rank, read_rank_status):
"""
更新任务结果排名
:return:
"""
uri = '/api/frontend/thirdParty/projectResult/updateRank'
url = AiSeoApis.build_full_url(uri)
json_data = {**config.AI_SEO_API_AUTH, 'id': result_id, 'rank': rank, 'read_rank_status': read_rank_status}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=json_data)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取任务失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def update_task_status(task_id, status):
"""
更新任务状态
:param task_id:
:param status:
:return:
"""
uri = '/api/third/updateTask'
url = AiSeoApis.build_full_url(uri)
json_data = {**config.AI_SEO_API_AUTH, 'task_id': task_id, 'status': status}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=json_data, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"更新任务失败: {json_result['msg']}")
return None
return json_result['data']
@staticmethod
async def heartbeat(dc_id, load_count=0):
"""
心跳
:param dc_id:
:param load_count:
:return:
"""
uri = '/api/frontend/thirdParty/spider/heartbeat'
url = AiSeoApis.build_full_url(uri)
send_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
json_data = {
**config.AI_SEO_API_AUTH,
'dc_id': dc_id,
'load_count': load_count,
'send_time': send_time
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=json_data, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"心跳失败: {json_result['msg']}")
return None
return json_result['data']
@staticmethod
async def get_spider_session(platform_id):
"""
获取爬虫会话
:param platform_id:
:return:
"""
uri = '/api/third/getOneSpiderSession'
url = AiSeoApis.build_full_url(uri)
json_data = {**config.AI_SEO_API_AUTH, 'platform_id': platform_id}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=json_data, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"获取爬虫session失败")
return None
return json_result['data']
@staticmethod
async def download_spider_session_file(url, path):
"""
下载爬虫会话文件
:param url:
:param path:
:return:
"""
# 获取文件所在目录
dir_path = os.path.dirname(path)
os.makedirs(dir_path, exist_ok=True)
async with httpx.AsyncClient(verify=False) as client:
response = await client.get(url, follow_redirects=True)
with open(path, 'wb') as file:
file.write(response.content)
@staticmethod
async def update_spider_session(session_id, status=1):
"""
更新爬虫会话状态
:param session_id:
:param status:
:return:
"""
uri = '/api/third/updateSpiderSession'
url = AiSeoApis.build_full_url(uri)
json_data = {**config.AI_SEO_API_AUTH, 'id': session_id, 'status': status}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=json_data, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"更新爬虫session失败")
return None
return json_result['data']
@staticmethod
async def upload_session_file(file_path):
"""
上传session文件
:param file_path:
:return:
"""
uri = '/api/frontend/thirdParty/oss/upload'
url = AiSeoApis.build_full_url(uri)
params = {
**config.AI_SEO_API_AUTH,
'oss_path': 'ai_seo/session'
}
with open(file_path, 'rb') as file:
async with httpx.AsyncClient() as client:
files = {'file': (file_path, file, 'application/json')}
response = await client.post(url, params=params, files=files, timeout=60)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"上传session文件失败: {json_result['msg']}")
return json_result['data']
@staticmethod
async def save_spider_session(platform_id, file_url, file_hash, account=''):
"""
新增爬虫session
:param file_url:
:param platform_id:
:param url:
:param file_hash:
:param account:
:return:
"""
uri = '/api/frontend/thirdParty/spider/session/save'
url = AiSeoApis.build_full_url(uri)
json_data = {
**config.AI_SEO_API_AUTH,
'platform_id': platform_id,
'account': account,
'url': file_url,
'hash': file_hash
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=json_data, timeout=120)
json_result = response.json()
if not json_result['code'] == 0:
logger.error(f"保存session: {json_result['msg']}")
return json_result['data']