import os import random import struct import time from datetime import datetime from typing import Any, Dict, Generator, List, Optional, Tuple import jwt import gzip import requests import json from utlit.retry import retry from loguru import logger cwd = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(cwd) logger.add(f"{cwd}/my.log", # log文件地址 level="DEBUG", # log记录最低级别 rotation="00:00", # 将日志记录以大小、时间等方式进行分割或划分 retention="3 days", # 文件保留时间 compression="zip", # 文件压缩格式 backtrace=True, # rotation="1 MB" # 滚动大日志文件 ) class ToolsLoad: @retry('获取kimi cookie', 0, time_sleep=30) def get_cookie(self, platform_id="4", category="1"): url = "http://granking-api.neicela.com/api/third/getOneSpiderSession?platform_id=" + platform_id + "&app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4&category=" + category payload = {} headers = { 'Authorization': 'Bearer ', 'User-Agent': 'Apifox/1.0.0 (http://apifox.com)' } response = requests.request("GET", url, headers=headers, data=payload).json() if response.get("data", []) == []: print('没有获取到cookie', response) return False # print('返回写死cookie') # return { # # 'cookie': 'eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1c2VyLWNlbnRlciIsImV4cCI6MTc4MDYyNjM4NiwiaWF0IjoxNzc4MDM0Mzg2LCJqdGkiOiJkN3RhZGtrY2htdGdkcnRoZ3NiZyIsInR5cCI6ImFjY2VzcyIsImFwcF9pZCI6ImtpbWkiLCJzdWIiOiJkMzhoNm9hNzgzbWszYm5rcjVhZyIsInNwYWNlX2lkIjoiZDM4aDZvYTc4M21rM2Jua3I1OWciLCJhYnN0cmFjdF91c2VyX2lkIjoiZDM4aDZvYTc4M21rM2Jua3I1OTAiLCJzc2lkIjoiMTczMTQyOTU0NzYzMDI1Njk1NCIsImRldmljZV9pZCI6Ijc2MzY1OTkyODk5MjUzNDUwMjQiLCJyZWdpb24iOiJjbiIsIm1lbWJlcnNoaXAiOnsibGV2ZWwiOjEwfX0.GscKsuyzUNS6J-z3U0mrkL90q1djfBhJdQ5rElxyX7nV5a4kTv931Y2eBa10sON4utceXOCdL0lyF_gzjQ1nzA', # 'cookie': 'eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1c2VyLWNlbnRlciIsImV4cCI6MTc4MDYyMTEyNywiaWF0IjoxNzc4MDI5MTI3LCJqdGkiOiJkN3Q5NGhwZzZpOHFiZG04djV2ZyIsInR5cCI6ImFjY2VzcyIsImFwcF9pZCI6ImtpbWkiLCJzdWIiOiJjbzBoZ2dtY3A3ZmN0Z3RraWVoMCIsInNwYWNlX2lkIjoiY28waGdnbWNwN2ZjdGd0a2llZzAiLCJhYnN0cmFjdF91c2VyX2lkIjoiY28waGdnbWNwN2ZjdGd0a2llZmciLCJzc2lkIjoiMTczMDEyNzcyNTgxNDI1MTUzMiIsImRldmljZV9pZCI6Ijc2MzY1NzY4Nzg3ODYyNjUzNTgiLCJyZWdpb24iOiJjbiIsIm1lbWJlcnNoaXAiOnsibGV2ZWwiOjEwfX0.0oSaXClxhCGajb32nR7DIOfusiiWL5R7MZRHVuHxmdGCE_XmCAwM4_TUgDiDn5tB2wtpOnW0O1X5j-vwMlnDjg', # # 'id': 0 # } print('成功获取到cookie', response) return response.get("data") @retry('上传cookie状态 cookie', 5) def update_session(self, id, reload_time, status="4"): url = "http://granking-api.neicela.com/api/third/updateSpiderSession?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4" payload = json.dumps({ "id": id, "status": status, "reload_time": reload_time }) headers = { 'lang': '{{lang}}', 'Authorization': 'Bearer ', 'User-Agent': 'Apifox/1.0.0 (http://apifox.com)', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) print('提交不能用的cookie:', response.text) return response.text @retry('提交结果', 5) def post_task(self, data): url = "https://api.granking.com/api/third/submitProjectTask" resp = requests.post(url, json=data, timeout=(5, 300)) resp.raise_for_status() return resp.json() @retry('获取task消息', 5) def get_task(self, ): url = "https://api.granking.com/api/third/getTask?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4&date=2025-09-20&platform_ids=4" resp = requests.get(url, timeout=(5, 20)) resp.raise_for_status() return resp.json() @retry('获取消息id', 5) def request_tobid(self, app_id: int, user_unique_id: str, web_id: str, user_agent: str = None ): url = "https://gator.volces.com/tobid" headers = { "Content-Type": "application/json", "User-Agent": user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.0.0 Safari/537.36" } payload = { "app_id": app_id, "user_unique_id": user_unique_id, "web_id": web_id } response = requests.post( url, headers=headers, json=payload, timeout=10 ) response.raise_for_status() return response.json() class KimiChatClient: def __init__( self, token: str, device_id: str, session_id: str, traffic_id: str, platform: str = "web", version: str = "1.0.0", language: str = "zh-CN", timezone: str = "Asia/Shanghai", scenario: str = "SCENARIO_K2D5", use_search: bool = True, thinking: bool = False, ): # ===== 可变量参数 ===== self.MAX_TOKEN_ROTATE_TRIES = 3 self.url = "https://www.kimi.com/apiv2/kimi.gateway.chat.v1.ChatService/Chat" self.token = token self.device_id = device_id self.session_id = session_id self.traffic_id = traffic_id self.platform = platform self.version = version self.language = language self.timezone = timezone self.scenario = scenario self.use_search = use_search self.thinking = thinking # ============================== # Connect 协议封包 # ============================== def try_parse_json_from_bytes(self, b: bytes) -> Optional[Dict[str, Any]]: try: idx = b.find(b"{") if idx != -1: return json.loads(b[idx:].decode("utf-8", errors="replace")) except Exception: pass try: unz = gzip.decompress(b) idx = unz.find(b"{") if idx != -1: return json.loads(unz[idx:].decode("utf-8", errors="replace")) except Exception: pass return None def read_stream_and_parse(self, response: requests.Response, max_frames: Optional[int] = None, read_chunk: int = 8192) -> \ Generator[Dict[str, Any], None, None]: buf = bytearray() frames = 0 response.raw.decode_content = False while True: try: chunk = response.raw.read(read_chunk) except Exception as e: yield {"error": "read_error", "detail": str(e)} break if not chunk: break buf += chunk while True: if len(buf) == 0: break flag = buf[0] if flag in (0x00, 0x01, 0x02) and len(buf) >= 5: length = int.from_bytes(buf[1:5], "big") total_len = 5 + length if len(buf) < total_len: break payload = bytes(buf[5:total_len]) del buf[:total_len] frames += 1 parsed = self.try_parse_json_from_bytes(payload) if parsed is not None: yield {"flag": flag, "payload": parsed, "raw_len": len(payload)} else: prefix = payload[:512] try: txt = prefix.decode("utf-8", errors="replace") yield {"flag": flag, "payload_text_prefix": txt, "raw_len": len(payload)} except Exception: yield {"flag": flag, "payload_hex_prefix": prefix.hex(), "raw_len": len(payload)} if max_frames and frames >= max_frames: return continue else: idx = buf.find(b"{") if idx == -1: if len(buf) > 10 * 1024 * 1024: buf = buf[-1024:] break try_part = bytes(buf[idx:]) parsed = self.try_parse_json_from_bytes(try_part) if parsed is not None: frames += 1 yield {"flag": None, "payload": parsed, "raw_len": len(try_part)} buf.clear() if max_frames and frames >= max_frames: return continue else: break if buf: parsed = self.try_parse_json_from_bytes(bytes(buf)) if parsed is not None: yield {"flag": None, "payload": parsed, "raw_len": len(buf)} else: try: tail_txt = bytes(buf).decode("utf-8", errors="replace") yield {"flag": None, "payload_raw_tail_text": tail_txt, "raw_len": len(buf)} except Exception: yield {"flag": None, "payload_raw_tail_hex": bytes(buf).hex(), "raw_len": len(buf)} def _encode_connect(self, payload: dict) -> bytes: json_bytes = json.dumps(payload, separators=(",", ":")).encode("utf-8") return b"\x00" + struct.pack(">I", len(json_bytes)) + json_bytes def _decode_connect(self, resp_bytes: bytes) -> dict: if len(resp_bytes) < 5: raise ValueError("Invalid Connect frame") length = struct.unpack(">I", resp_bytes[1:5])[0] body = resp_bytes[5:5 + length] return json.loads(body.decode("utf-8")) # ============================== # 构造 headers(全部变量化) # ============================== def _build_headers(self): return { "Host": "www.kimi.com", "Connection": "keep-alive", "x-msh-session-id": self.session_id, "authorization": f"Bearer {self.token}", # "authorization": f"Bearer eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1c2VyLWNlbnRlciIsImV4cCI6MTc4MDYyNjM4NiwiaWF0IjoxNzc4MDM0Mzg2LCJqdGkiOiJkN3RhZGtrY2htdGdkcnRoZ3NiZyIsInR5cCI6ImFjY2VzcyIsImFwcF9pZCI6ImtpbWkiLCJzdWIiOiJkMzhoNm9hNzgzbWszYm5rcjVhZyIsInNwYWNlX2lkIjoiZDM4aDZvYTc4M21rM2Jua3I1OWciLCJhYnN0cmFjdF91c2VyX2lkIjoiZDM4aDZvYTc4M21rM2Jua3I1OTAiLCJzc2lkIjoiMTczMTQyOTU0NzYzMDI1Njk1NCIsImRldmljZV9pZCI6Ijc2MzY1OTkyODk5MjUzNDUwMjQiLCJyZWdpb24iOiJjbiIsIm1lbWJlcnNoaXAiOnsibGV2ZWwiOjEwfX0.GscKsuyzUNS6J-z3U0mrkL90q1djfBhJdQ5rElxyX7nV5a4kTv931Y2eBa10sON4utceXOCdL0lyF_gzjQ1nzA", "x-msh-platform": self.platform, "x-msh-device-id": self.device_id, "sec-ch-ua-mobile": "?0", "connect-protocol-version": "1", "x-msh-version": self.version, "x-language": self.language, "r-timezone": self.timezone, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", "content-type": "application/connect+json", "x-traffic-id": self.traffic_id, "Accept": "*/*", "Origin": "https://www.kimi.com", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://www.kimi.com/", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9" } # ============================== # 构造 body(全部变量化) # ============================== def _build_payload(self, content: str): tools = [] if self.use_search: tools.append({ "type": "TOOL_TYPE_SEARCH", "search": {} }) return { "scenario": self.scenario, "tools": tools, "message": { "role": "user", "blocks": [ { "message_id": "", "text": { "content": content } } ], "scenario": self.scenario }, "options": { "thinking": self.thinking } } # ============================== # 发送请求 # ============================== def chat(self, platform_id: str, keyword: str, brand: str, task_id: str, cookie_id: str, platform_name="kimi"): payload = self._build_payload(keyword) framed = self._encode_connect(payload) token_tries = 0 print(f'正在获取: {keyword}') while token_tries < self.MAX_TOKEN_ROTATE_TRIES: token_tries += 1 try: search_result: List[Dict[str, Any]] = [] answer_parts: List[str] = [] with requests.post(self.url, headers=self._build_headers(), data=framed, stream=True, timeout=60) as resp: # print(resp.text) if resp.status_code != 200: text = None try: text = resp.text[:500] except Exception: pass # 非200 视情况可能是 token 问题(例如 401),抛出以触发 rotate raise RuntimeError(f"http_status_{resp.status_code}: {text}") resp.raw.decode_content = False got_any = False for item in self.read_stream_and_parse(resp, max_frames=None): # print(item) got_any = True payload_text_total = json.dumps(item, ensure_ascii=False) # print(payload_text_total) # 判定 token 相关错误 if "当前模型对话次数已达上限" in payload_text_total or "模型对话次数或参数错误" in payload_text_total or "REASON_INVALID_AUTH_TOKEN" in payload_text_total or '请登录后继续使用' in payload_text_total: raise RuntimeError("模型对话次数或参数错误: " + payload_text_total) payload_dict = item.get("payload") if isinstance(item.get("payload"), dict) else {} block = payload_dict.get("block") if isinstance(payload_dict, dict) else None if block and isinstance(block, dict): txt = block.get("text", {}).get("content", "") if txt: answer_parts.append(txt) message_obj = payload_dict.get("message") if isinstance(payload_dict, dict) else None if message_obj and isinstance(message_obj, dict): blocks = message_obj.get("blocks", []) if isinstance(blocks, list): for b in blocks: if isinstance(b, dict): content = b.get("text", {}).get("content", "") if content: answer_parts.append(content) refs = payload_dict.get("message", {}).get("refs", {}).get("searchChunks", []) if isinstance( payload_dict.get("message"), dict) else [] if refs and isinstance(refs, list): for page in refs: base = page.get("base", {}) if isinstance(page, dict) else {} search_result.append({ "title": base.get("title", ""), "url": base.get("url", ""), "host_name": base.get("siteName", ""), "publish_time": base.get("publishTime", ""), "body": base.get("snippet", ""), "is_referenced": "1" }) if not got_any: print("未解析到任何 frames(可能 body 为空或连接被关闭)") # logger.warning("未解析到任何 frames(可能 body 为空或连接被关闭)") # 成功 — 释放 token 回 active 并更新 last_used_at full_answer = "".join(answer_parts).strip() if full_answer == '': print('未获取到数据重试...') return False now_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") result = { "app_id": "aa65700299848d6f21b969dbc9f6cf7c", "secret": "5588071d36f0bc61af849c311a03f2c4", "platform_id": platform_id, "platform_name": platform_name, "prompt": keyword, "keyword": brand, "answer": full_answer, "search_result": search_result, "screenshot_file": "", "run_status": True, "task_id": task_id, "rank": 0, "start_time": now_dt, "end_time": now_dt, "screenshot_url": "", "words": [] } return result except Exception as e: msg = str(e) print('异常:', msg) time.sleep(5) # 判定为 token 无效或次数上限等,标记 expired 并尝试下一个 token if '请登录后继续使用' in msg or "模型对话次数或参数错误" in msg or "当前模型对话次数已达上限" in msg or msg.startswith( "http_status_401"): if '请登录后继续使用' in msg: ToolsLoad().update_session(cookie_id, "", "2") else: ToolsLoad().update_session(cookie_id, "", "3") # 继续尝试下一个 token return False else: # 非 token 错误:把 token 放回 active 以便他人使用 raise raise RuntimeError("达到最大 token 切换尝试仍未成功") # ==================================== # 使用示例(全部参数外部控制) # ==================================== class Start: def __init__(self): self.tools = ToolsLoad() @retry('处理消息任务', for_work=10) def process_task(self, task): task_id = task.get("id", "") keyword = task.get("keyword", "") platform_id = task.get("platform_id", "") brand = task.get("brand", "") logger.info(f"开始处理任务:{keyword} {task_id}") response = self.tools.get_cookie() # {'id': '53191c4f7aa4f0e443d6bce6fd241e67', 'project_id': '019be3ee7b907329bd9114880445cd1f', 'keyword_id': '019be4653db470b39d13449ee5f7afe5', 'keyword': '广州有没有不通过装修公司直接找工长的平台', 'brand': '匠猫', 'platform_id': '4', 'gather_date': '2026-05-06', 'gather_time': '06:00', 'gather_filter': '2026-05-06 00:30:01', 'status': 2, 'retry_count': 2, 'screen_flag': 1, 'thinking': 1, 'is_deal': 1, 'is_init': 2, 'publish_time': '2026-05-06 11:29:15', 'screen_url': '', 'priority': 3, 'start_time': None, 'end_time': None, 'create_time': '2026-05-06 00:30:34', 'update_time': '2026-05-06 11:29:15', 'delete_time': 0, 'create_by': '', 'update_by': '', 'type': 1} token = response.get("cookie", None) if token is None: logger.info(f'cookie获取失败: {token}') return False id = response.get("id") payload = jwt.decode(token, options={"verify_signature": False}) DEVICE_ID = payload.get("device_id") TRAFFIC_ID = payload.get("sub") session_data = self.tools.request_tobid(20001731, TRAFFIC_ID, DEVICE_ID) SESSION_ID = session_data.get("tobid") client = KimiChatClient( token=token, device_id=DEVICE_ID, session_id=SESSION_ID, traffic_id=TRAFFIC_ID, scenario="SCENARIO_K2D5", use_search=True, thinking=False, ) # 执行聊天任务 result = client.chat(platform_id=platform_id, keyword=keyword, brand=brand, task_id=task_id, cookie_id=id) logger.info(f"任务生成结果: {str(result)[:500]}") if not result: logger.info(f"任务生成结果异常重新获取: {keyword}") return False post_resp = self.tools.post_task(result) logger.info(f"KIMI 任务 {task_id} 提交返回: {post_resp}") print('任务处理完成') return result @retry('主运行窗口', for_work=3) def start_task_msg(self): task_resp = self.tools.get_task() # task_resp = {'code': 0, 'msg': 'success', 'data': {'id': 'e07a6ffddf62a61c8072a0d2d518a655', 'project_id': '019b97b0da35706a9f5aba211a201226', 'keyword_id': '019b97bc96c573b1825716bc35c78a24', 'keyword': '国泰基金怎么样', 'brand': '国泰基金', 'platform_id': '4', 'gather_date': '2026-05-07', 'gather_time': '06:00', 'gather_filter': '2026-05-07 00:30:01', 'status': 2, 'retry_count': 1, 'screen_flag': 1, 'thinking': 1, 'is_deal': 1, 'is_init': 2, 'publish_time': '2026-05-07 09:06:04', 'screen_url': '', 'priority': 3, 'start_time': None, 'end_time': None, 'create_time': '2026-05-07 00:30:10', 'update_time': '2026-05-07 09:06:04', 'delete_time': 0, 'create_by': '', 'update_by': '', 'type': 1}} print('KIMI消息内容:', task_resp) if not task_resp: logger.info("get_task 未返回有效数据,等待后重试") time.sleep(5) return True tasks = task_resp.get("data", False) if not tasks: logger.info("KIMI 没有任务数据,等待下一轮") time.sleep(30) return True # logger.info(f'开始:{tasks}') return self.process_task(tasks) def run(self): while True: # 获取任务 self.start_task_msg() if __name__ == "__main__": # TOKEN = "eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1c2VyLWNlbnRlciIsImV4cCI6MTc3NTEwODA1NywiaWF0IjoxNzcyNTE2MDU3LCJqdGkiOiJkNmo3NW1hbTUydGNrdmM1bGpuMCIsInR5cCI6ImFjY2VzcyIsImFwcF9pZCI6ImtpbWkiLCJzdWIiOiJkNmo3NW1hbTUydGNrdmM1bGprZyIsInNwYWNlX2lkIjoiZDZqNzVtYW01MnRja3ZjNWxqazAiLCJhYnN0cmFjdF91c2VyX2lkIjoiZDZqNzVtYW01MnRja3ZjNWxqamciLCJzc2lkIjoiMTczMTY2NzA0MjA3NTUxNTcxNCIsImRldmljZV9pZCI6Ijc2MTI4NTI4NjI0MjI1NTU5MTUiLCJyZWdpb24iOiJjbiIsIm1lbWJlcnNoaXAiOnsibGV2ZWwiOjEwfX0.7Lx5UcmhrZ0UNuvaydFNTJX7ta_9qZarXh-GCVpVJiAnan3iI8y8OHFYgKiUTQEjbSS-WQrvP85276_NsH4l-A" # response =get_cookie(platform_id="1") # TOKEN = response.get("cookie") # id = response.get("id") Start().run()