import os import json import time import sqlite3 import threading from typing import Any, Dict, List, Optional, Tuple import requests from utlit.retry import retry from loguru import logger cwd = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(cwd) logger.add(f"{cwd}/my.log", level="DEBUG", rotation="00:00", retention="3 days", compression="zip", backtrace=True) class TokenExpiredError(RuntimeError): """上游明确告知该 token 已被限流/失效""" pass class ToolsLoad: """工具类:封装数据库和API操作""" def __init__(self, db_path: str = "./kimi_tokens.db", db_table: str = "tokens"): self.db_path = db_path self.db_table = db_table self.db_write_lock = threading.Lock() self._ensure_tokens_table() def _ensure_tokens_table(self): """确保tokens表存在""" create_sql = f""" CREATE TABLE IF NOT EXISTS {self.db_table} ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone TEXT, platform_name TEXT NOT NULL, token TEXT NOT NULL, status TEXT DEFAULT 'active', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, reserved_at DATETIME, last_used_at DATETIME, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); """ try: conn = sqlite3.connect(self.db_path, timeout=30) cur = conn.cursor() cur.execute("PRAGMA journal_mode=WAL;") cur.execute("PRAGMA synchronous=NORMAL;") cur.execute("PRAGMA busy_timeout=5000;") cur.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name = ?", (self.db_table,)) if not cur.fetchone(): logger.info("tokens 表不存在,创建:{}", self.db_table) cur.executescript(create_sql) conn.commit() cur.close() conn.close() except Exception as e: logger.exception("ensure_tokens_table 错误: {}", e) @retry('获取token', 3) def get_and_reserve_token(self, platform_name: str = "mita", exclude_tokens: Optional[List[str]] = None, reserve_timeout: int = 60) -> Optional[Tuple[int, str, Optional[str]]]: """原子抢占 token,返回 (rowid, token, phone) 或 None""" exclude_tokens = exclude_tokens or [] for attempt in range(6): conn = None try: conn = sqlite3.connect(self.db_path, timeout=30) cur = conn.cursor() cur.execute("PRAGMA busy_timeout=5000;") # 回收超时的 reserved cur.execute(f""" UPDATE {self.db_table} SET status='active', reserved_at=NULL, updated_at=CURRENT_TIMESTAMP WHERE platform_name = ? AND status = 'reserved' AND reserved_at IS NOT NULL AND (strftime('%s','now') - strftime('%s', reserved_at)) > ? """, (platform_name, reserve_timeout)) if cur.rowcount: conn.commit() # 取候选 cur.execute(f""" SELECT rowid, token, phone FROM {self.db_table} WHERE platform_name = ? AND status = 'active' ORDER BY created_at ASC, id ASC LIMIT 10 """, (platform_name,)) rows = cur.fetchall() candidate = None for r in rows: rid, tkn, phone = r if tkn not in exclude_tokens: candidate = (rid, tkn, phone) break if not candidate: cur.close() conn.close() return None rid, token, phone = candidate cur.execute(f""" UPDATE {self.db_table} SET status = 'reserved', reserved_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE rowid = ? AND status = 'active' """, (rid,)) if cur.rowcount == 1: conn.commit() cur.close() conn.close() return (rid, token, phone) else: conn.rollback() cur.close() conn.close() time.sleep(0.05) continue except sqlite3.OperationalError as e: logger.debug("get_and_reserve_token sqlite busy: {}", e) try: if conn: conn.rollback() except Exception: pass time.sleep(0.05) continue except Exception as e: logger.exception("get_and_reserve_token 异常: {}", e) try: if conn: conn.rollback() conn.close() except Exception: pass return None return None def _with_sqlite_write_retry(self, op_fn, max_attempts: int = 6, base_delay: float = 0.05) -> bool: """写入重试包装器""" import random attempt = 0 while attempt < max_attempts: attempt += 1 conn = None try: conn = sqlite3.connect(self.db_path, timeout=30) cur = conn.cursor() cur.execute("PRAGMA busy_timeout=5000;") op_fn(conn, cur) conn.commit() cur.close() conn.close() return True except sqlite3.OperationalError as e: logger.debug("sqlite write attempt {} failed: {}", attempt, e) try: if conn: conn.rollback() conn.close() except Exception: pass sleep_time = base_delay * (2 ** (attempt - 1)) + random.uniform(0, base_delay) time.sleep(min(sleep_time, 5.0)) continue except Exception as e: logger.exception("sqlite write unexpected error: {}", e) try: if conn: conn.rollback() conn.close() except Exception: pass return False logger.error("sqlite write failed after {} attempts", max_attempts) return False def release_token_back_to_active_by_rowid(self, rowid: int): """释放token回active状态""" def op(conn, cur): cur.execute(f""" UPDATE {self.db_table} SET status='active', reserved_at = NULL, last_used_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE rowid = ? """, (rowid,)) try: with self.db_write_lock: ok = self._with_sqlite_write_retry(op) if ok: logger.debug("release token rowid={} -> active", rowid) else: logger.error("release token rowid={} 失败(重试耗尽)", rowid) except Exception as e: logger.exception("release_token_back_to_active_by_rowid 错误: {}", e) def mark_token_expired_and_release_by_rowid(self, rowid: int): """标记token为expired""" def op(conn, cur): cur.execute(f""" UPDATE {self.db_table} SET status='expired', reserved_at = NULL, updated_at = CURRENT_TIMESTAMP WHERE rowid = ? """, (rowid,)) try: with self.db_write_lock: ok = self._with_sqlite_write_retry(op) if ok: logger.info("mark token expired rowid={}", rowid) else: logger.error("mark token expired rowid={} 失败(重试耗尽)", rowid) except Exception as e: logger.exception("mark_token_expired_and_release_by_rowid 错误: {}", e) @retry('获取task消息', 5) def get_task(self): """从任务队列获取待处理任务""" url = "https://api.granking.com/api/third/getTask?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4&date=2025-09-20&platform_ids=13" resp = requests.get(url, timeout=(5, 20)) resp.raise_for_status() return resp.json() @retry('提交结果', 5) def post_task(self, data): """提交任务处理结果到服务器""" url = "https://api.granking.com/api/third/submitProjectTask" resp = requests.post(url, json=data, timeout=(5, 300)) resp.raise_for_status() return resp.json() @retry('更新任务状态', 5) def update_task_status(self, task_id: str, status: str): """更新任务状态""" url = "https://api.granking.com/api/third/updateTask?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4" payload = {"task_id": task_id, "status": status} resp = requests.post(url, json=payload, timeout=(5, 20)) resp.raise_for_status() return resp.json() class MitaChatClient: """秘塔AI聊天客户端""" def __init__(self): self.api_url = "https://metaso.cn/api/search/chat" self.headers = { "Host": "metaso.cn", "Connection": "keep-alive", "metaso-pc": "pc", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "accept": "text/event-stream", "Content-Type": "application/json", "sec-ch-ua-mobile": "?0", "Origin": "https://metaso.cn", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9", } def _build_payload(self, keyword: str, token: str) -> str: """构建请求payload""" post_obj = { "model": "fast_thinking", "stream": True, "messages": [{ "id": "gbt0t89g3qi08crvopt3rm2", "key": "4u43disi3vqb7vonb37f", "conversationId": "temp-" + str(int(time.time() * 1000)), "role": "user", "content": keyword, "markdownContent": keyword, "engineType": "", "filter": "all", "contentType": 0, "outputHtml": False, "mode": "detail", "model": "fast_thinking", "outputStyle": "正常" }], "engineType": "", "mode": "detail", "filter": "all", "outputHtml": False, "outputStyle": "正常", "darkMode": False, "outputLanguage": "中文", "htmlNoDisplayEnable": True, "metaso-pc": "pc", "token": token } return json.dumps(post_obj, ensure_ascii=False) def chat(self, keyword: str, token: str, cookie: str) -> Tuple[str, List[Dict[str, str]]]: """执行聊天请求并获取AI回复""" headers = dict(self.headers) headers['Cookie'] = cookie data = self._build_payload(keyword, token) refs_raw: List[Dict[str, str]] = [] answer_acc = "" try: with requests.post(self.api_url, data=data, headers=headers, stream=True, timeout=(5, 60)) as r: r.encoding = "utf-8" if r.status_code != 200: text = r.text if hasattr(r, "text") else "" raise RuntimeError(f"Upstream status {r.status_code}: {text[:200]}") r.raw.decode_content = True for ln in r.iter_lines(decode_unicode=True): if not ln: continue # print(ln, end='',flush=True) # logger.info("SSE line: {}", ln) # 检测token失效 if isinstance(ln, str) and "搜索次数超出限制" in ln: raise TokenExpiredError("搜索次数超出限制") data_lines = [] if isinstance(ln, str) and ln.startswith("data:"): data_lines.append(ln[5:].lstrip()) if not data_lines: continue data_str = "\n".join(data_lines) try: ev = json.loads(data_str) except Exception: continue if ev.get("type") == "heartbeat": continue if ev.get("object") == "chat.completion.qa_with_agent" and "choices" in ev: for ch in ev["choices"]: delta = ch.get("delta") or {} content = delta.get("content") if isinstance(content, str) and content: answer_acc += content cits = delta.get("citations") or [] for c in cits: if c.get("link") == "": resp = requests.get("https://metaso.cn/api/knowledge/detail/" + c.get("root_id")) id = resp.json().get("data").get("id") second_url = c.get("file_meta").get("previewUrl") url = "https://metaso.cn/subject-v2/" + id + "?displayUrl=" + second_url item = { "title": c.get("title", ""), "url": url, "author": (c.get("author") or c.get("authors_ori") or c.get( "institution") or ""), "displaySource": (c.get("snippet") or c.get("matched_snippet") or ""), "publish_time": (c.get("date") or c.get("publish_date_str") or "") } else: item = { "title": c.get("title", ""), "url": c.get("link", ""), "author": (c.get("author") or c.get("authors_ori") or c.get( "institution") or ""), "displaySource": (c.get("snippet") or c.get("matched_snippet") or ""), "publish_time": (c.get("date") or c.get("publish_date_str") or "") } refs_raw.append(item) return answer_acc, refs_raw except TokenExpiredError: raise except requests.exceptions.RequestException: raise except Exception: raise class Start: """秘塔任务处理器主类""" def __init__(self): self.tools = ToolsLoad() self.client = MitaChatClient() self.max_token_rotate_tries = 2 @retry('处理消息任务', for_work=10) def process_task(self, task: Dict[str, Any], platform_name: str = "mita") -> Dict[str, Any]: """处理单个任务""" task_id = task.get("id", "") keyword = task.get("keyword", "") brand = task.get("brand", "") platform_id = task.get("platform_id", "") logger.info("开始处理任务 {} (keyword={})", task_id, keyword) exclude_tokens: List[str] = [] token_tries = 0 while token_tries < self.max_token_rotate_tries: token_tries += 1 reserve = self.tools.get_and_reserve_token(platform_name=platform_name, exclude_tokens=exclude_tokens) if not reserve: logger.error("没有可用 token(platform={})", platform_name) raise RuntimeError("未找到可用 TOKEN") rowid, token, phone = reserve logger.info("抢到 token rowid={} phone={} token={}...", rowid, phone or "N/A", token[:20] if token else "N/A") try: # 使用抢到的 token 发起请求 answer_acc, refs_raw = self.client.chat( keyword=keyword, token="wr8+pHu3KYryzz0O2MaBSNUZbVLjLUYC1FR4sKqSW0r7cpL+iG/2N5cdq9x0z7Bq8yrjvk/Mn0n+M7QhbW+SLnTf1mYlCDjkvCg10L2wwrJzhdhzA38AvbPtjbzdPvpeBjSxLo7TFAba1lm8N3q7qA==", cookie=token ) # 成功 -> 释放 token 回 active self.tools.release_token_back_to_active_by_rowid(rowid) # 组装提交数据 ts = time.time() dt = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) result = { 'app_id': 'aa65700299848d6f21b969dbc9f6cf7c', 'secret': '5588071d36f0bc61af849c311a03f2c4', 'platform_id': platform_id, 'platform_name': '秘塔', 'prompt': keyword, 'keyword': brand, 'answer': answer_acc, 'search_result': refs_raw, 'screenshot_file': '', 'run_status': True, 'task_id': task_id, 'rank': 0, 'start_time': dt, 'end_time': dt, 'screenshot_url': '', 'words': [] } logger.info("任务 {} 结果返回: {}", task_id, result) if answer_acc == '': logger.error('获取结果异常重新获取') return False post_resp = self.tools.post_task(result) logger.info("任务 {} 提交返回: {}", task_id, post_resp) return {"task_id": task_id, "status": "ok", "detail": post_resp} except TokenExpiredError as e: try: self.tools.update_task_status(task_id, "4") except Exception: logger.exception("更新任务状态失败") msg = str(e) logger.warning("TokenExpiredError: 标记 token expired (rowid={}) due to: {}", rowid, msg[:200]) try: self.tools.mark_token_expired_and_release_by_rowid(rowid) except Exception: logger.exception("标记 token expired 失败(忽略并继续)") exclude_tokens.append(token) continue except RuntimeError as e: try: self.tools.update_task_status(task_id, "4") except Exception: logger.exception("更新任务状态失败") msg = str(e) logger.exception("任务 {} 捕获运行时错误: {}", task_id, msg) if "搜索次数超出限制" in msg: logger.warning("识别到 runtime error 表示 token 问题,标记 expired (rowid={}): {}", rowid, msg[:200]) try: self.tools.mark_token_expired_and_release_by_rowid(rowid) except Exception: logger.exception("标记 token expired 失败(忽略并继续)") exclude_tokens.append(token) continue else: self.tools.release_token_back_to_active_by_rowid(rowid) try: self.tools.update_task_status(task_id, "4") print('密塔 提交任务成功') except Exception: logger.exception("更新任务状态失败") return {"task_id": task_id, "status": "error", "detail": msg} except requests.exceptions.RequestException as e: logger.exception("网络异常:{}", e) self.tools.release_token_back_to_active_by_rowid(rowid) exclude_tokens.append(token) time.sleep(1.0) continue except Exception as e: msg = str(e) logger.exception("未知异常:{}", msg) if "搜索次数超出限制" in msg: logger.warning("未知异常文本指示 token 失效,标记 expired (rowid={}): {}", rowid, msg[:200]) try: self.tools.mark_token_expired_and_release_by_rowid(rowid) except Exception: logger.exception("标记 token expired 失败(忽略并继续)") exclude_tokens.append(token) continue try: self.tools.release_token_back_to_active_by_rowid(rowid) except Exception: logger.exception("密塔 释放 token 失败(忽略)") try: self.tools.update_task_status(task_id, "4") except Exception: logger.exception("更新任务状态失败") return {"task_id": task_id, "status": "error", "detail": msg} # 达到最大重试仍失败 logger.error("密塔 任务 {} 达到最大 token 重试次数 ({}) 仍失败", task_id, self.max_token_rotate_tries) try: self.tools.update_task_status(task_id, "4") except Exception: logger.exception("更新任务状态失败") return {"task_id": task_id, "status": "error", "detail": "max token rotate tries exceeded"} @retry('主运行窗口', for_work=1) def start_task_msg(self): """获取并处理任务""" task_resp = self.tools.get_task() if not task_resp: logger.info("密塔 get_task 未返回有效数据,等待后重试") time.sleep(5) return True data = task_resp.get("data", False) # data = {'id': '53191c4f7aa4f0e443d6bce6fd241e67', 'project_id': '019be3ee7b907329bd9114880445cd1f', 'keyword_id': '019be4653db470b39d13449ee5f7afe5', 'keyword': '广州有没有不通过装修公司直接找工长的平台', 'brand': '匠猫', 'platform_id': '4', 'gather_date': '2026-05-06', 'gather_time': '06:00', 'gather_filter': '2026-05-06 00:30:01', 'status': 2, 'retry_count': 2, 'screen_flag': 1, 'thinking': 1, 'is_deal': 1, 'is_init': 2, 'publish_time': '2026-05-06 11:29:15', 'screen_url': '', 'priority': 3, 'start_time': None, 'end_time': None, 'create_time': '2026-05-06 00:30:34', 'update_time': '2026-05-06 11:29:15', 'delete_time': 0, 'create_by': '', 'update_by': '', 'type': 1} if not data: logger.info("密塔 没有任务数据,等待下一轮") time.sleep(30) return True return self.process_task(data) def run(self): """主循环""" logger.info("启动秘塔任务处理器") while True: self.start_task_msg() if __name__ == "__main__": from update_db import query_tokens DB = "./kimi_tokens.db" TABLE = "tokens" # 单条 upsert # ok = upsert_token(DB, TABLE, phone="16223053815", platform_name="mita", token_value="JSESSIONID=356E6FEEBF81933FAFE924ED6A7C324C; aliyungf_tc=70df8ace7303f22dccbacee689c1abc32b0a7629a28579fdfefad08c7d04f536; tid=cc159aeb-4f9b-463a-9080-2ebc2d88f0bc; __eventn_id_UMO2dYNwFz=065s1a391e; traceid=2dc94bfd49cc4992; sid=c764bb074d3b496fbeb4ad5eaa8835f0; uid=69255736dddbb6ba7df1905c", status="active") # print("upsert single:", ok) query_tokens(platform_name="mita", status="active") Start().run()