You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
462 lines
17 KiB
462 lines
17 KiB
import json
|
|
import random
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import requests
|
|
from loguru import logger
|
|
|
|
from utlit.retry import retry
|
|
|
|
# 日志配置
|
|
import os
|
|
|
|
cwd = os.path.dirname(os.path.abspath(__file__))
|
|
logger.add(f"{cwd}/yuanbao.log",
|
|
level="DEBUG",
|
|
rotation="00:00",
|
|
retention="3 days",
|
|
compression="zip",
|
|
backtrace=True)
|
|
|
|
|
|
class YuanbaoConfig:
|
|
"""元宝配置常量"""
|
|
# API 基础地址
|
|
API_BASE = 'http://granking-api.neicela.com'
|
|
TASK_BASE = 'https://api.granking.com'
|
|
|
|
# 应用凭证
|
|
APP_ID = 'aa65700299848d6f21b969dbc9f6cf7c'
|
|
SECRET = '5588071d36f0bc61af849c311a03f2c4'
|
|
|
|
# 签名服务地址
|
|
SIGN_URL = 'http://yuanbao-sign.granking-spider.neicela.com:45000/eval_js/get_sign'
|
|
|
|
# 设备信息
|
|
AGENT_ID = 'naQivTmsDa'
|
|
|
|
# 平台配置
|
|
PLATFORM_ID = '3'
|
|
PLATFORM_NAME = '腾讯元宝'
|
|
|
|
|
|
class ToolsLoad:
|
|
"""工具类:处理 Cookie、Session、任务等"""
|
|
|
|
@retry('获取元宝 cookie', 0, time_sleep=30)
|
|
def get_cookie(self, platform_id: str = "3", category: str = "1") -> Optional[Dict]:
|
|
"""获取 Cookie"""
|
|
url = (f'{YuanbaoConfig.API_BASE}/api/third/getOneSpiderSession'
|
|
f'?platform_id={platform_id}'
|
|
f'&app_id={YuanbaoConfig.APP_ID}'
|
|
f'&secret={YuanbaoConfig.SECRET}'
|
|
f'&category={category}')
|
|
|
|
headers = {
|
|
'Authorization': 'Bearer ',
|
|
'User-Agent': 'Apifox/1.0.0 (http://apifox.com)'
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, timeout=15).json()
|
|
return {'cookie':'_qimei_uuid42=1a5070e1600100bd2ae7eba8b030a74b46a515b160; _qimei_i_3=71c55184975a52da90c2ad390a8775b1f6e8f1f2145902d7b0d97b0e25c0726f353766943989e2bd8891; _qimei_h38=59a9d3d32ae7eba8b030a74b0200000d91a507; _ga=GA1.1.1074115897.1778134923; hy_user=3cd3834f60454aae961686d99bd45549; hy_token=8tE8bq6InCxff5mUqQZfc9aGHP6NPD80Cr/k258SiLJ0SRKVmpnUylkLLyDfCVTFvK/7nB+MZYTNT4Uqlq+s6Yp7BfDrLLADcMuiFDyFIujvz9fR3gbpcIY5v+CNwvpTDNXFFk6lQbRa8O8SLtfTtbbQL8M9a2giBaeONY84++bOHuFV/DuvBdLlXCBJfPYpfGYtVchTW4YU9oQbqUyA0UWEpnNfUcETr42GANzVByxFBvxY4M2sjBD3IdxPdqggd8vPpdTTzqwWnHihA8iQfQg+3rnxhtxA3TVszDbGBWYYsuC0fFOlYLhmBawgBfRzQj/21JvGMmnkgM6qQHj5kWNernwXqQEUgtYZJ5RG5c78bygYn1crOJxIPdtVEII9a9A5+wnKpkvGVcSaFUF83VbLafUADz6R8dfb99iD3S++VdlzDuqgewwnpBM1CAQDtXkTJoHId89+buL6mHvElKwFERjDE6JTUF4upi/LkoYd0uLUXGUAZ3JONQQUjW/T8u21Vr5tIK4KE0mPZDuj8dex9AdvXuQc6El49TgsPJNzOsbITDF6/gg8+rrrmqTBaAE9ufahRNyJD5Y+X3LA325N/zbwdmi710P9FKNYr15V9lOGVFImjUJy5bMlXykmskxLs+T5KjvI8Q+bcApUhsPfFG4AcVnrzu0lplNCTCY=; hy_source=web; _qimei_fingerprint=aa9b6f323b563e55df336a26e51bc99e; _qimei_i_1=79e746879c0b598fc2c3fd320ad174b5a5bfaca3125803d3b38b7a582493206c6163639c39d8e7dcd1a4fae3; _ga_6P1G7NCG3R=GS2.1.s1778201894$o4$g1$t1778203143$j60$l0$h1300909130','id':'0'}
|
|
|
|
if response.get("data") is None or response.get("data") == []:
|
|
logger.warning(f'没有获取到cookie: {response}')
|
|
return False
|
|
|
|
logger.info(f'成功获取到cookie: {response.get("data", {}).get("id", "")}')
|
|
|
|
return response.get("data")
|
|
|
|
@retry('上传cookie状态', 5)
|
|
def update_session(self, session_id: str, reload_time: str, status: str = "4") -> str:
|
|
"""更新 Session 状态"""
|
|
url = (f'{YuanbaoConfig.API_BASE}/api/third/updateSpiderSession'
|
|
f'?app_id={YuanbaoConfig.APP_ID}&secret={YuanbaoConfig.SECRET}')
|
|
|
|
payload = {
|
|
"id": session_id,
|
|
"status": status,
|
|
"reload_time": reload_time
|
|
}
|
|
|
|
headers = {
|
|
'Authorization': 'Bearer ',
|
|
'User-Agent': 'Apifox/1.0.0 (http://apifox.com)',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
response = requests.post(url, json=payload, headers=headers, timeout=15)
|
|
logger.debug(f'更新session响应: {response.text}')
|
|
return response.text
|
|
|
|
@retry('提交结果', 5)
|
|
def post_task(self, data: Dict) -> Dict:
|
|
"""提交任务结果"""
|
|
url = f"{YuanbaoConfig.TASK_BASE}/api/third/submitProjectTask"
|
|
resp = requests.post(url, json=data, timeout=(5, 300))
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
@retry('获取task消息', 5)
|
|
def get_task(self) -> Dict:
|
|
"""获取任务"""
|
|
url = (f"{YuanbaoConfig.TASK_BASE}/api/third/getTask"
|
|
f"?app_id={YuanbaoConfig.APP_ID}"
|
|
f"&secret={YuanbaoConfig.SECRET}"
|
|
f"&platform_ids=3")
|
|
resp = requests.get(url, timeout=(5, 20))
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
@retry('更新任务状态', 5)
|
|
def update_task_status(self, task_id: str, status: str) -> Dict:
|
|
"""更新任务状态"""
|
|
url = (f"{YuanbaoConfig.TASK_BASE}/api/third/updateTask"
|
|
f"?app_id={YuanbaoConfig.APP_ID}&secret={YuanbaoConfig.SECRET}")
|
|
return requests.post(url, json={'task_id': task_id, 'status': status},
|
|
headers={'Content-Type': 'application/json'}, timeout=15).json()
|
|
|
|
|
|
class YuanbaoSignClient:
|
|
"""签名客户端:调用远程签名服务获取签名"""
|
|
|
|
def __init__(self):
|
|
self.sign_url = YuanbaoConfig.SIGN_URL
|
|
self._cache = None
|
|
self._cache_ts = 0
|
|
self.cache_ttl = 50 # 缓存有效期(秒)
|
|
|
|
@retry('获取签名', 3, time_sleep=5)
|
|
def get_sign(self, force: bool = False) -> Dict:
|
|
"""获取签名,带缓存"""
|
|
logger.info('正在获取新的签名...')
|
|
response = requests.post(self.sign_url, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
|
|
if result.get('code') != 0:
|
|
raise RuntimeError(f'签名服务返回错误: {result}')
|
|
|
|
sign_data = result.get('data', {})
|
|
|
|
# 验证必要字段
|
|
required_fields = ['X-Uskey', 'X-Bus-Params-Md5', 'X-Timestamp', 'hy92', 'hy93']
|
|
for field in required_fields:
|
|
if field not in sign_data:
|
|
raise RuntimeError(f'签名数据缺少字段: {field}')
|
|
|
|
self._cache = sign_data
|
|
self._cache_ts = time.time()
|
|
|
|
logger.info(f'签名获取成功: X-Uskey={sign_data["X-Uskey"][:30]}...')
|
|
return sign_data
|
|
|
|
|
|
class YuanbaoChatClient:
|
|
"""元宝聊天客户端"""
|
|
|
|
def __init__(self, cookie: str, sign_data: Dict, h38: str):
|
|
self.cookie = cookie
|
|
self.sign_data = sign_data
|
|
self.h38 = h38
|
|
|
|
self.agent_id = YuanbaoConfig.AGENT_ID
|
|
|
|
def _base_headers(self) -> Dict:
|
|
"""构建基础请求头"""
|
|
devid = YuanbaoTaskProcessor.parse_cookies(self.cookie, '_qimei_uuid42')
|
|
return {
|
|
'Host': 'yuanbao.tencent.com',
|
|
'Connection': 'keep-alive',
|
|
'X-device-id': devid,
|
|
'X-Instance-ID': '5',
|
|
'sec-ch-ua-mobile': '?0',
|
|
'X-Language': 'zh-CN',
|
|
'X-Requested-With': 'XMLHttpRequest',
|
|
'X-AgentID': self.agent_id,
|
|
# 'x-commit-tag': 'd6af7421',
|
|
'X-Platform': 'win',
|
|
'X-Uskey': self.sign_data['X-Uskey'],
|
|
'X-Bus-Params-Md5': self.sign_data['X-Bus-Params-Md5'],
|
|
'X-Timestamp': str(self.sign_data['X-Timestamp']),
|
|
'X-os_version': 'Windows(10)-Blink',
|
|
'X-Source': 'web',
|
|
'X-ybuitest': '0',
|
|
'X-HY92': self.h38,
|
|
'X-HY93': devid,
|
|
'X-webdriver': '0',
|
|
'X-HY106': '',
|
|
"x-webversion": "2.67.1",
|
|
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
'Chrome/144.0.0.0 Safari/537.36'),
|
|
'Origin': 'https://yuanbao.tencent.com',
|
|
'Sec-Fetch-Site': 'same-origin',
|
|
'Sec-Fetch-Mode': 'cors',
|
|
'Sec-Fetch-Dest': 'empty',
|
|
'Accept-Encoding': 'gzip, deflate, br, zstd',
|
|
'Accept-Language': 'zh-CN,zh;q=0.9',
|
|
'Cookie': self.cookie,
|
|
}
|
|
|
|
@retry('创建会话', 3, time_sleep=5)
|
|
def create_conversation(self) -> str:
|
|
"""创建会话,返回会话ID"""
|
|
url = 'https://yuanbao.tencent.com/api/user/agent/conversation/create'
|
|
headers = self._base_headers()
|
|
user_id = YuanbaoTaskProcessor.parse_cookies(self.cookie, 'hy_user')
|
|
headers.update({
|
|
'Accept': 'application/json, text/plain, */*',
|
|
'Content-Type': 'application/json',
|
|
'X-ID': user_id,
|
|
'T-UserID': user_id
|
|
})
|
|
|
|
resp = requests.post(url, json={'agentId': self.agent_id}, headers=headers, timeout=30)
|
|
resp.raise_for_status()
|
|
|
|
result = resp.json()
|
|
conv_id = result.get('id', '')
|
|
|
|
if not conv_id:
|
|
raise RuntimeError(f'创建会话失败: {result}')
|
|
|
|
logger.info(f'创建会话成功: {conv_id}')
|
|
return conv_id
|
|
|
|
@retry('流式对话', 3, time_sleep=5)
|
|
def stream_chat(self, conv_id: str, keyword: str) -> Tuple[str, List[Dict]]:
|
|
"""流式对话,返回答案和引用"""
|
|
url = f'https://yuanbao.tencent.com/api/chat/{conv_id}'
|
|
headers = self._base_headers()
|
|
|
|
headers.update({
|
|
"x-agentid": f"{self.agent_id}/{conv_id}",
|
|
'Content-Type': 'text/plain;charset=UTF-8',
|
|
'Accept': '*/*',
|
|
'X-Trid-Channel': 'undefined',
|
|
'chat_version': 'v1',
|
|
'x-web-ch-id': 'null',
|
|
'Referer': f'https://yuanbao.tencent.com/chat/{self.agent_id}'
|
|
})
|
|
|
|
body = {
|
|
"model": "gpt_175B_0404",
|
|
"prompt": keyword,
|
|
"plugin": "Adaptive",
|
|
"displayPrompt": keyword,
|
|
"displayPromptType": 1,
|
|
"agentId": "naQivTmsDa",
|
|
"isTemporary": False,
|
|
"projectId": "",
|
|
"chatModelId": "hunyuan_gpt_175B_0404",
|
|
"supportFunctions": [
|
|
"openAutoSearchSwitch",
|
|
"autoInternetSearch"
|
|
],
|
|
"docOpenid": "",
|
|
"options": {
|
|
"imageIntention": {
|
|
"needIntentionModel": True,
|
|
"backendUpdateFlag": 2,
|
|
"intentionStatus": True
|
|
}
|
|
},
|
|
"multimedia": [],
|
|
"supportHint": 1,
|
|
"chatModelExtInfo": "{\"modelId\":\"hunyuan_gpt_175B_0404\",\"subModelId\":\"\",\"supportFunctions\":{\"internetSearch\":\"\"},\"internetSearch\":\"autoInternetSearch\"}",
|
|
"applicationIdList": [],
|
|
"version": "v2",
|
|
"extReportParams": None,
|
|
"isAtomInput": False,
|
|
"offsetOfHour": 8,
|
|
"offsetOfMinute": 0
|
|
}
|
|
body = json.dumps(body, ensure_ascii=False, separators=(',', ':'))
|
|
|
|
answer_parts = []
|
|
citations = []
|
|
|
|
with requests.post(url, data=body.encode('utf-8'), headers=headers,
|
|
stream=True, timeout=60) as r:
|
|
r.encoding = 'utf-8'
|
|
if r.status_code != 200:
|
|
raise RuntimeError(f'HTTP {r.status_code}: {r.text[:200]}')
|
|
|
|
r.raw.decode_content = True
|
|
for line in r.iter_lines(decode_unicode=True):
|
|
if not line or not line.startswith('data:'):
|
|
continue
|
|
|
|
raw = line[5:].strip()
|
|
if not raw.startswith('{'):
|
|
continue
|
|
|
|
try:
|
|
obj = json.loads(raw)
|
|
except Exception:
|
|
continue
|
|
|
|
if obj.get('type') == 'text':
|
|
answer_parts.append(obj.get('msg', ''))
|
|
elif obj.get('type') == 'searchGuid':
|
|
citations = obj.get('docs', [])
|
|
|
|
answer = ''.join(answer_parts)
|
|
|
|
if not answer:
|
|
raise RuntimeError('未获取到答案')
|
|
|
|
logger.info(f'对话完成,答案长度: {len(answer)}')
|
|
return answer, citations
|
|
|
|
|
|
class YuanbaoTaskProcessor:
|
|
"""元宝任务处理器"""
|
|
|
|
def __init__(self):
|
|
self.tools = ToolsLoad()
|
|
self.sign_client = YuanbaoSignClient()
|
|
|
|
def _parse_h38(self, cookie: str) -> str:
|
|
"""从 Cookie 中解析 h38"""
|
|
cookie_dict = dict(item.strip().split('=', 1)
|
|
for item in cookie.split(';') if '=' in item)
|
|
return cookie_dict.get('_qimei_h38', '')
|
|
|
|
@staticmethod
|
|
def parse_cookies(cookie: str, name) -> str:
|
|
"""从 Cookie 中解析 h38"""
|
|
cookie_dict = dict(item.strip().split('=', 1)
|
|
for item in cookie.split(';') if '=' in item)
|
|
return cookie_dict.get(name, '')
|
|
|
|
def _build_result(self, keyword: str, brand: str, platform_id: str,
|
|
task_id: str, answer: str, citations: List[Dict]) -> Dict:
|
|
"""构建提交结果"""
|
|
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
pattern = r'citation:(\d+)'
|
|
cited = set(re.findall(pattern, answer))
|
|
|
|
search_results = []
|
|
for idx, doc in enumerate(citations):
|
|
search_results.append({
|
|
'title': doc.get('title', ''),
|
|
'url': doc.get('url', ''),
|
|
'host_name': doc.get('web_site_name', ''),
|
|
'body': doc.get('quote', ''),
|
|
'publish_time': doc.get('publish_time', 0),
|
|
'is_referenced': '1' if str(idx + 1) in cited else '0',
|
|
})
|
|
|
|
return {
|
|
'app_id': YuanbaoConfig.APP_ID,
|
|
'secret': YuanbaoConfig.SECRET,
|
|
'platform_id': platform_id,
|
|
'platform_name': YuanbaoConfig.PLATFORM_NAME,
|
|
'prompt': keyword,
|
|
'keyword': brand,
|
|
'answer': answer,
|
|
'search_result': search_results,
|
|
'screenshot_file': '',
|
|
'run_status': True,
|
|
'task_id': task_id,
|
|
'rank': 0,
|
|
'start_time': now,
|
|
'end_time': now,
|
|
'screenshot_url': '',
|
|
'words': [],
|
|
}
|
|
|
|
@retry('处理元宝任务', for_work=10)
|
|
def process_task(self, task: Dict) -> bool:
|
|
"""处理单个任务"""
|
|
task_id = task.get("id", "")
|
|
keyword = task.get("keyword", "")
|
|
platform_id = task.get("platform_id", "3")
|
|
brand = task.get("brand", "")
|
|
|
|
logger.info(f"开始处理任务: {keyword} - {task_id}")
|
|
|
|
session_id = ""
|
|
try:
|
|
# 1. 获取 Cookie
|
|
session = self.tools.get_cookie(platform_id='3', category='1')
|
|
cookie = session.get('cookie', '')
|
|
session_id = session.get('id', '')
|
|
|
|
if not cookie:
|
|
logger.error('Cookie 获取失败')
|
|
return False
|
|
|
|
# 2. 解析 h38
|
|
h38 = self._parse_h38(cookie)
|
|
logger.debug(f'h38: {h38}')
|
|
|
|
# 3. 获取签名
|
|
sign_data = self.sign_client.get_sign()
|
|
|
|
# 4. 创建聊天客户端
|
|
chat_client = YuanbaoChatClient(cookie, sign_data, h38)
|
|
|
|
# 5. 创建会话
|
|
conv_id = chat_client.create_conversation()
|
|
|
|
# 6. 流式对话
|
|
answer, citations = chat_client.stream_chat(conv_id, keyword)
|
|
logger.info(f'答案预览: {answer[:100]}...')
|
|
|
|
# 7. 构建结果并提交
|
|
result = self._build_result(keyword, brand, platform_id, task_id, answer, citations)
|
|
logger.debug(f'提交结果: {json.dumps(result, ensure_ascii=False)[:200]}')
|
|
|
|
post_resp = self.tools.post_task(result)
|
|
logger.info(f'任务 {task_id} 提交返回: {post_resp}')
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f'任务处理异常: {e}')
|
|
if task_id:
|
|
self.tools.update_task_status(task_id, '4')
|
|
if session_id:
|
|
self.tools.update_session(session_id, '', status='4')
|
|
raise
|
|
|
|
@retry('主运行窗口', for_work=3)
|
|
def start_task_msg(self) -> bool:
|
|
"""获取并处理任务"""
|
|
task_resp = self.tools.get_task()
|
|
logger.info(f'获取任务响应: {task_resp}')
|
|
|
|
if not task_resp:
|
|
logger.info("get_task 未返回有效数据,等待后重试")
|
|
time.sleep(5)
|
|
return True
|
|
|
|
task_data = task_resp.get("data")
|
|
if not task_data:
|
|
logger.info("没有任务数据,等待下一轮")
|
|
time.sleep(random.uniform(30, 60))
|
|
return True
|
|
|
|
return self.process_task(task_data)
|
|
|
|
def run(self):
|
|
"""主循环"""
|
|
logger.info('元宝爬虫启动...')
|
|
while True:
|
|
try:
|
|
self.start_task_msg()
|
|
except Exception as e:
|
|
logger.error(f'主循环异常: {e}')
|
|
time.sleep(10)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
YuanbaoTaskProcessor().run()
|