ai项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

462 lines
17 KiB

import json
import random
import re
import time
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import requests
from loguru import logger
from utlit.retry import retry
# 日志配置
import os
cwd = os.path.dirname(os.path.abspath(__file__))
logger.add(f"{cwd}/yuanbao.log",
level="DEBUG",
rotation="00:00",
retention="3 days",
compression="zip",
backtrace=True)
class YuanbaoConfig:
"""元宝配置常量"""
# API 基础地址
API_BASE = 'http://granking-api.neicela.com'
TASK_BASE = 'https://api.granking.com'
# 应用凭证
APP_ID = 'aa65700299848d6f21b969dbc9f6cf7c'
SECRET = '5588071d36f0bc61af849c311a03f2c4'
# 签名服务地址
SIGN_URL = 'http://yuanbao-sign.granking-spider.neicela.com:45000/eval_js/get_sign'
# 设备信息
AGENT_ID = 'naQivTmsDa'
# 平台配置
PLATFORM_ID = '3'
PLATFORM_NAME = '腾讯元宝'
class ToolsLoad:
"""工具类:处理 Cookie、Session、任务等"""
@retry('获取元宝 cookie', 0, time_sleep=30)
def get_cookie(self, platform_id: str = "3", category: str = "1") -> Optional[Dict]:
"""获取 Cookie"""
url = (f'{YuanbaoConfig.API_BASE}/api/third/getOneSpiderSession'
f'?platform_id={platform_id}'
f'&app_id={YuanbaoConfig.APP_ID}'
f'&secret={YuanbaoConfig.SECRET}'
f'&category={category}')
headers = {
'Authorization': 'Bearer ',
'User-Agent': 'Apifox/1.0.0 (http://apifox.com)'
}
response = requests.get(url, headers=headers, timeout=15).json()
# return {'cookie':'_qimei_uuid42=1a5070e1600100bd2ae7eba8b030a74b46a515b160; _qimei_i_3=71c55184975a52da90c2ad390a8775b1f6e8f1f2145902d7b0d97b0e25c0726f353766943989e2bd8891; _qimei_h38=59a9d3d32ae7eba8b030a74b0200000d91a507; _ga=GA1.1.1074115897.1778134923; hy_user=3cd3834f60454aae961686d99bd45549; hy_token=8tE8bq6InCxff5mUqQZfc9aGHP6NPD80Cr/k258SiLJ0SRKVmpnUylkLLyDfCVTFvK/7nB+MZYTNT4Uqlq+s6Yp7BfDrLLADcMuiFDyFIujvz9fR3gbpcIY5v+CNwvpTDNXFFk6lQbRa8O8SLtfTtbbQL8M9a2giBaeONY84++bOHuFV/DuvBdLlXCBJfPYpfGYtVchTW4YU9oQbqUyA0UWEpnNfUcETr42GANzVByxFBvxY4M2sjBD3IdxPdqggd8vPpdTTzqwWnHihA8iQfQg+3rnxhtxA3TVszDbGBWYYsuC0fFOlYLhmBawgBfRzQj/21JvGMmnkgM6qQHj5kWNernwXqQEUgtYZJ5RG5c78bygYn1crOJxIPdtVEII9a9A5+wnKpkvGVcSaFUF83VbLafUADz6R8dfb99iD3S++VdlzDuqgewwnpBM1CAQDtXkTJoHId89+buL6mHvElKwFERjDE6JTUF4upi/LkoYd0uLUXGUAZ3JONQQUjW/T8u21Vr5tIK4KE0mPZDuj8dex9AdvXuQc6El49TgsPJNzOsbITDF6/gg8+rrrmqTBaAE9ufahRNyJD5Y+X3LA325N/zbwdmi710P9FKNYr15V9lOGVFImjUJy5bMlXykmskxLs+T5KjvI8Q+bcApUhsPfFG4AcVnrzu0lplNCTCY=; hy_source=web; _qimei_fingerprint=aa9b6f323b563e55df336a26e51bc99e; _qimei_i_1=79e746879c0b598fc2c3fd320ad174b5a5bfaca3125803d3b38b7a582493206c6163639c39d8e7dcd1a4fae3; _ga_6P1G7NCG3R=GS2.1.s1778201894$o4$g1$t1778203143$j60$l0$h1300909130','id':'0'}
if response.get("data") is None or response.get("data") == []:
logger.warning(f'没有获取到cookie: {response}')
return False
logger.info(f'成功获取到cookie: {response.get("data", {}).get("id", "")}')
return response.get("data")
@retry('上传cookie状态', 5)
def update_session(self, session_id: str, reload_time: str, status: str = "4") -> str:
"""更新 Session 状态"""
url = (f'{YuanbaoConfig.API_BASE}/api/third/updateSpiderSession'
f'?app_id={YuanbaoConfig.APP_ID}&secret={YuanbaoConfig.SECRET}')
payload = {
"id": session_id,
"status": status,
"reload_time": reload_time
}
headers = {
'Authorization': 'Bearer ',
'User-Agent': 'Apifox/1.0.0 (http://apifox.com)',
'Content-Type': 'application/json'
}
response = requests.post(url, json=payload, headers=headers, timeout=15)
logger.debug(f'更新session响应: {response.text}')
return response.text
@retry('提交结果', 5)
def post_task(self, data: Dict) -> Dict:
"""提交任务结果"""
url = f"{YuanbaoConfig.TASK_BASE}/api/third/submitProjectTask"
resp = requests.post(url, json=data, timeout=(5, 300))
resp.raise_for_status()
return resp.json()
@retry('获取task消息', 5)
def get_task(self) -> Dict:
"""获取任务"""
url = (f"{YuanbaoConfig.TASK_BASE}/api/third/getTask"
f"?app_id={YuanbaoConfig.APP_ID}"
f"&secret={YuanbaoConfig.SECRET}"
f"&platform_ids=3")
resp = requests.get(url, timeout=(5, 20))
resp.raise_for_status()
return resp.json()
@retry('更新任务状态', 5)
def update_task_status(self, task_id: str, status: str) -> Dict:
"""更新任务状态"""
url = (f"{YuanbaoConfig.TASK_BASE}/api/third/updateTask"
f"?app_id={YuanbaoConfig.APP_ID}&secret={YuanbaoConfig.SECRET}")
return requests.post(url, json={'task_id': task_id, 'status': status},
headers={'Content-Type': 'application/json'}, timeout=15).json()
class YuanbaoSignClient:
"""签名客户端:调用远程签名服务获取签名"""
def __init__(self):
self.sign_url = YuanbaoConfig.SIGN_URL
self._cache = None
self._cache_ts = 0
self.cache_ttl = 50 # 缓存有效期(秒)
@retry('获取签名', 3, time_sleep=5)
def get_sign(self, force: bool = False) -> Dict:
"""获取签名,带缓存"""
logger.info('正在获取新的签名...')
response = requests.post(self.sign_url, timeout=30)
response.raise_for_status()
result = response.json()
if result.get('code') != 0:
raise RuntimeError(f'签名服务返回错误: {result}')
sign_data = result.get('data', {})
# 验证必要字段
required_fields = ['X-Uskey', 'X-Bus-Params-Md5', 'X-Timestamp', 'hy92', 'hy93']
for field in required_fields:
if field not in sign_data:
raise RuntimeError(f'签名数据缺少字段: {field}')
self._cache = sign_data
self._cache_ts = time.time()
logger.info(f'签名获取成功: X-Uskey={sign_data["X-Uskey"][:30]}...')
return sign_data
class YuanbaoChatClient:
"""元宝聊天客户端"""
def __init__(self, cookie: str, sign_data: Dict, h38: str):
self.cookie = cookie
self.sign_data = sign_data
self.h38 = h38
self.agent_id = YuanbaoConfig.AGENT_ID
def _base_headers(self) -> Dict:
"""构建基础请求头"""
devid = YuanbaoTaskProcessor.parse_cookies(self.cookie, '_qimei_uuid42')
return {
'Host': 'yuanbao.tencent.com',
'Connection': 'keep-alive',
'X-device-id': devid,
'X-Instance-ID': '5',
'sec-ch-ua-mobile': '?0',
'X-Language': 'zh-CN',
'X-Requested-With': 'XMLHttpRequest',
'X-AgentID': self.agent_id,
# 'x-commit-tag': 'd6af7421',
'X-Platform': 'win',
'X-Uskey': self.sign_data['X-Uskey'],
'X-Bus-Params-Md5': self.sign_data['X-Bus-Params-Md5'],
'X-Timestamp': str(self.sign_data['X-Timestamp']),
'X-os_version': 'Windows(10)-Blink',
'X-Source': 'web',
'X-ybuitest': '0',
'X-HY92': self.h38,
'X-HY93': devid,
'X-webdriver': '0',
'X-HY106': '',
"x-webversion": "2.67.1",
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/144.0.0.0 Safari/537.36'),
'Origin': 'https://yuanbao.tencent.com',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': self.cookie,
}
@retry('创建会话', 3, time_sleep=5)
def create_conversation(self) -> str:
"""创建会话,返回会话ID"""
url = 'https://yuanbao.tencent.com/api/user/agent/conversation/create'
headers = self._base_headers()
user_id = YuanbaoTaskProcessor.parse_cookies(self.cookie, 'hy_user')
headers.update({
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'X-ID': user_id,
'T-UserID': user_id
})
resp = requests.post(url, json={'agentId': self.agent_id}, headers=headers, timeout=30)
resp.raise_for_status()
result = resp.json()
conv_id = result.get('id', '')
if not conv_id:
raise RuntimeError(f'创建会话失败: {result}')
logger.info(f'创建会话成功: {conv_id}')
return conv_id
@retry('流式对话', 3, time_sleep=5)
def stream_chat(self, conv_id: str, keyword: str) -> Tuple[str, List[Dict]]:
"""流式对话,返回答案和引用"""
url = f'https://yuanbao.tencent.com/api/chat/{conv_id}'
headers = self._base_headers()
headers.update({
"x-agentid": f"{self.agent_id}/{conv_id}",
'Content-Type': 'text/plain;charset=UTF-8',
'Accept': '*/*',
'X-Trid-Channel': 'undefined',
'chat_version': 'v1',
'x-web-ch-id': 'null',
'Referer': f'https://yuanbao.tencent.com/chat/{self.agent_id}'
})
body = {
"model": "gpt_175B_0404",
"prompt": keyword,
"plugin": "Adaptive",
"displayPrompt": keyword,
"displayPromptType": 1,
"agentId": "naQivTmsDa",
"isTemporary": False,
"projectId": "",
"chatModelId": "hunyuan_gpt_175B_0404",
"supportFunctions": [
"openAutoSearchSwitch",
"autoInternetSearch"
],
"docOpenid": "",
"options": {
"imageIntention": {
"needIntentionModel": True,
"backendUpdateFlag": 2,
"intentionStatus": True
}
},
"multimedia": [],
"supportHint": 1,
"chatModelExtInfo": "{\"modelId\":\"hunyuan_gpt_175B_0404\",\"subModelId\":\"\",\"supportFunctions\":{\"internetSearch\":\"\"},\"internetSearch\":\"autoInternetSearch\"}",
"applicationIdList": [],
"version": "v2",
"extReportParams": None,
"isAtomInput": False,
"offsetOfHour": 8,
"offsetOfMinute": 0
}
body = json.dumps(body, ensure_ascii=False, separators=(',', ':'))
answer_parts = []
citations = []
with requests.post(url, data=body.encode('utf-8'), headers=headers,
stream=True, timeout=60) as r:
r.encoding = 'utf-8'
if r.status_code != 200:
raise RuntimeError(f'HTTP {r.status_code}: {r.text[:200]}')
r.raw.decode_content = True
for line in r.iter_lines(decode_unicode=True):
if not line or not line.startswith('data:'):
continue
raw = line[5:].strip()
if not raw.startswith('{'):
continue
try:
obj = json.loads(raw)
except Exception:
continue
if obj.get('type') == 'text':
answer_parts.append(obj.get('msg', ''))
elif obj.get('type') == 'searchGuid':
citations = obj.get('docs', [])
answer = ''.join(answer_parts)
if not answer:
raise RuntimeError('未获取到答案')
logger.info(f'对话完成,答案长度: {len(answer)}')
return answer, citations
class YuanbaoTaskProcessor:
"""元宝任务处理器"""
def __init__(self):
self.tools = ToolsLoad()
self.sign_client = YuanbaoSignClient()
def _parse_h38(self, cookie: str) -> str:
"""从 Cookie 中解析 h38"""
cookie_dict = dict(item.strip().split('=', 1)
for item in cookie.split(';') if '=' in item)
return cookie_dict.get('_qimei_h38', '')
@staticmethod
def parse_cookies(cookie: str, name) -> str:
"""从 Cookie 中解析 h38"""
cookie_dict = dict(item.strip().split('=', 1)
for item in cookie.split(';') if '=' in item)
return cookie_dict.get(name, '')
def _build_result(self, keyword: str, brand: str, platform_id: str,
task_id: str, answer: str, citations: List[Dict]) -> Dict:
"""构建提交结果"""
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
pattern = r'citation:(\d+)'
cited = set(re.findall(pattern, answer))
search_results = []
for idx, doc in enumerate(citations):
search_results.append({
'title': doc.get('title', ''),
'url': doc.get('url', ''),
'host_name': doc.get('web_site_name', ''),
'body': doc.get('quote', ''),
'publish_time': doc.get('publish_time', 0),
'is_referenced': '1' if str(idx + 1) in cited else '0',
})
return {
'app_id': YuanbaoConfig.APP_ID,
'secret': YuanbaoConfig.SECRET,
'platform_id': platform_id,
'platform_name': YuanbaoConfig.PLATFORM_NAME,
'prompt': keyword,
'keyword': brand,
'answer': answer,
'search_result': search_results,
'screenshot_file': '',
'run_status': True,
'task_id': task_id,
'rank': 0,
'start_time': now,
'end_time': now,
'screenshot_url': '',
'words': [],
}
@retry('处理元宝任务', for_work=10)
def process_task(self, task: Dict) -> bool:
"""处理单个任务"""
task_id = task.get("id", "")
keyword = task.get("keyword", "")
platform_id = task.get("platform_id", "3")
brand = task.get("brand", "")
logger.info(f"开始处理任务: {keyword} - {task_id}")
session_id = ""
try:
# 1. 获取 Cookie
session = self.tools.get_cookie(platform_id='3', category='1')
cookie = session.get('cookie', '')
session_id = session.get('id', '')
if not cookie:
logger.error('Cookie 获取失败')
return False
# 2. 解析 h38
h38 = self._parse_h38(cookie)
logger.debug(f'h38: {h38}')
# 3. 获取签名
sign_data = self.sign_client.get_sign()
# 4. 创建聊天客户端
chat_client = YuanbaoChatClient(cookie, sign_data, h38)
# 5. 创建会话
conv_id = chat_client.create_conversation()
# 6. 流式对话
answer, citations = chat_client.stream_chat(conv_id, keyword)
logger.info(f'答案预览: {answer[:100]}...')
# 7. 构建结果并提交
result = self._build_result(keyword, brand, platform_id, task_id, answer, citations)
logger.debug(f'提交结果: {json.dumps(result, ensure_ascii=False)[:200]}')
post_resp = self.tools.post_task(result)
logger.info(f'任务 {task_id} 提交返回: {post_resp}')
return True
except Exception as e:
logger.error(f'任务处理异常: {e}')
if task_id:
self.tools.update_task_status(task_id, '4')
if session_id:
self.tools.update_session(session_id, '', status='4')
raise
@retry('主运行窗口', for_work=3)
def start_task_msg(self) -> bool:
"""获取并处理任务"""
task_resp = self.tools.get_task()
logger.info(f'获取任务响应: {task_resp}')
if not task_resp:
logger.info("get_task 未返回有效数据,等待后重试")
time.sleep(5)
return True
task_data = task_resp.get("data")
if not task_data:
logger.info("没有任务数据,等待下一轮")
time.sleep(random.uniform(30, 60))
return True
return self.process_task(task_data)
def run(self):
"""主循环"""
logger.info('元宝爬虫启动...')
while True:
try:
self.start_task_msg()
except Exception as e:
logger.error(f'主循环异常: {e}')
time.sleep(10)
if __name__ == '__main__':
YuanbaoTaskProcessor().run()