import os import json import time import threading from datetime import datetime from typing import Dict, List import requests from loguru import logger from utlit.encrpty import encrypt_payload from utlit.retry import retry # 配置日志 cwd = os.path.dirname(os.path.abspath(__file__)) logger.add(f"{cwd}/wxyy.log", level="DEBUG", rotation="00:00", retention="3 days", compression="zip", backtrace=True) BASE = 'https://api.granking.com/api/third' HOST = 'api.granking.com' class ToolsLoad: """工具类:处理任务获取、提交等操作""" @retry('获取文心一言任务', 5, time_sleep=10) def get_task(self): url = f"{BASE}/getTask?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4&platform_ids=6" resp = requests.get(url, timeout=(5, 20)) resp.raise_for_status() return resp.json() @retry('提交文心一言结果', 5, time_sleep=5) def post_task(self, data): url = f"{BASE}/submitProjectTask" headers = { "User-Agent": "Apifox/1.0.0", "Content-Type": "application/json", "Host": HOST } resp = requests.post(url, headers=headers, json=data, timeout=(5, 300)) resp.raise_for_status() return resp.json() @retry('获取文心一言首页cookie', 0, time_sleep=5) def index(self): """获取文心一言首页cookie""" url = "https://yiyan.baidu.com/" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9", } resp = requests.get(url, headers=headers, timeout=15, verify=False) cookie_str = '; '.join([f'{k}={v}' for k, v in resp.cookies.items()]) return cookie_str, resp.headers.get("Set-Cookie", "") @retry('文心一言对话接口', 3, time_sleep=5) def conversation(self, Token, text, cookie_str): """文心一言对话接口""" url = "https://yiyan.baidu.com/eb/chat/conversation/v2" headers = { "Host": "yiyan.baidu.com", "Device-Type": "pc", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "Accept": "text/event-stream,application/json", "Acs-Token": Token, "Content-Type": "application/json", "Referer": "https://yiyan.baidu.com/chat/", "Cookie": cookie_str, } post_data = { "sign": Token, "timestamp": int(time.time() * 1000), "deviceType": "pc", "text": text, "sessionId": "", "sessionName": text, "type": 10, "deepThoughtStatus": 2, "model": "EB45T", "parentChatId": "0", "isNewYiyan": True } answer_acc = "" refs_raw: List[Dict[str, str]] = [] with requests.post(url, headers=headers, json=post_data, stream=True, timeout=300) as resp: resp.raise_for_status() resp.encoding = "utf-8" for raw in resp.iter_lines(decode_unicode=True): if not raw or not raw.startswith("data:"): continue data_str = raw[5:].strip() if "已达文心大模型使用上限" in data_str or "用户访问被限制" in data_str: return "", [] try: ev = json.loads(data_str) except Exception: continue if ev.get("searchCitations"): for c in ev.get("searchCitations", {}).get("list", []): refs_raw.append({ "title": c.get("title", ""), "url": c.get("url", ""), "name": c.get("site", ""), "body": c.get("wild_abstract", ""), "publishTime": c.get("date", "") }) if isinstance(ev.get("data"), Dict): content = ev.get("data").get("content") if isinstance(content, str) and content: answer_acc += content return answer_acc, refs_raw class WenxinChatClient: """文心一言聊天客户端""" def __init__(self): self.tools = ToolsLoad() def generate_token(self, baiduid): """生成Token""" t = int(time.time() * 1000) e = { "d0": "ka0oitptemc1jfshcdsx", "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)", "baiduid": baiduid, "platform": "Win32", "clientTs": t, "version": "1.4.0.3", } token_sign = encrypt_payload(e) return f"1769396406334_{t}_{token_sign}" def chat(self, platform_id, keyword, brand, task_id): """执行对话任务""" # logger.info(f"开始处理任务: {keyword} - {task_id}") cookie_str, baiduid = self.tools.index() if not cookie_str: logger.error("获取cookie失败") return False # 生成Token Token = self.generate_token(baiduid) # 执行对话 answer, refs = self.tools.conversation(Token, keyword, cookie_str) if not answer: logger.warning(f"未获取到回答: {keyword}") return False # 构造结果 now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") result = { "app_id": "aa65700299848d6f21b969dbc9f6cf7c", "secret": "5588071d36f0bc61af849c311a03f2c4", "platform_id": platform_id, "platform_name": "文心一言", "prompt": keyword, "keyword": brand, "answer": answer, "search_result": refs, "run_status": True, "task_id": task_id, "rank": 0, "start_time": now, "end_time": now, "screenshot_url": "", "words": [] } return result class Start: """主启动类""" def __init__(self): self.tools = ToolsLoad() self.client = WenxinChatClient() @retry('处理文心一言任务', for_work=10, time_sleep=10) def process_task(self, task): """处理单个任务""" task_id = task.get("id", "") keyword = task.get("keyword", "") platform_id = task.get("platform_id", "") brand = task.get("brand", "") logger.info(f"开始处理任务: {keyword} - {task_id}") # 执行对话获取结果 result = self.client.chat( platform_id=platform_id, keyword=keyword, brand=brand, task_id=task_id ) if not result: logger.warning(f"任务结果为空,重新处理: {keyword}") return False print(result) # 提交结果 post_resp = self.tools.post_task(result) logger.info(f"任务 {task_id} 提交返回: {post_resp}") return result @retry('主运行窗口', for_work=3, time_sleep=5) def start_task_msg(self): """获取并处理任务""" task_resp = self.tools.get_task() if not task_resp: logger.info("get_task 未返回有效数据,等待后重试") time.sleep(5) return True tasks = task_resp.get("data", False) if not tasks: logger.info("没有任务数据,等待下一轮") time.sleep(30) return True logger.info(f"获取到任务: {tasks}") return self.process_task(tasks) def run(self): """主循环""" logger.info("文心一言爬虫启动...") while True: try: self.start_task_msg() except Exception as e: logger.error(f"主循环异常: {e}") time.sleep(10) if __name__ == "__main__": from threading import Thread ts = [] for i in range(5): t = Thread(target=Start().run) ts.append(t) t.start() time.sleep(2) for t in ts: t.join()