ai项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

568 lines
23 KiB

import os
import json
import time
import sqlite3
import threading
from typing import Any, Dict, List, Optional, Tuple
import requests
from utlit.retry import retry
from loguru import logger
cwd = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(cwd)
logger.add(f"{cwd}/my.log",
level="DEBUG",
rotation="00:00",
retention="3 days",
compression="zip",
backtrace=True)
class TokenExpiredError(RuntimeError):
"""上游明确告知该 token 已被限流/失效"""
pass
class ToolsLoad:
"""工具类:封装数据库和API操作"""
def __init__(self, db_path: str = "./kimi_tokens.db", db_table: str = "tokens"):
self.db_path = db_path
self.db_table = db_table
self.db_write_lock = threading.Lock()
self._ensure_tokens_table()
def _ensure_tokens_table(self):
"""确保tokens表存在"""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.db_table} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phone TEXT,
platform_name TEXT NOT NULL,
token TEXT NOT NULL,
status TEXT DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
reserved_at DATETIME,
last_used_at DATETIME,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
"""
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cur = conn.cursor()
cur.execute("PRAGMA journal_mode=WAL;")
cur.execute("PRAGMA synchronous=NORMAL;")
cur.execute("PRAGMA busy_timeout=5000;")
cur.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name = ?", (self.db_table,))
if not cur.fetchone():
logger.info("tokens 表不存在,创建:{}", self.db_table)
cur.executescript(create_sql)
conn.commit()
cur.close()
conn.close()
except Exception as e:
logger.exception("ensure_tokens_table 错误: {}", e)
@retry('获取token', 3)
def get_and_reserve_token(self, platform_name: str = "mita",
exclude_tokens: Optional[List[str]] = None,
reserve_timeout: int = 60) -> Optional[Tuple[int, str, Optional[str]]]:
"""原子抢占 token,返回 (rowid, token, phone) 或 None"""
exclude_tokens = exclude_tokens or []
for attempt in range(6):
conn = None
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cur = conn.cursor()
cur.execute("PRAGMA busy_timeout=5000;")
# 回收超时的 reserved
cur.execute(f"""
UPDATE {self.db_table}
SET status='active', reserved_at=NULL, updated_at=CURRENT_TIMESTAMP
WHERE platform_name = ?
AND status = 'reserved'
AND reserved_at IS NOT NULL
AND (strftime('%s','now') - strftime('%s', reserved_at)) > ?
""", (platform_name, reserve_timeout))
if cur.rowcount:
conn.commit()
# 取候选
cur.execute(f"""
SELECT rowid, token, phone FROM {self.db_table}
WHERE platform_name = ? AND status = 'active'
ORDER BY created_at ASC, id ASC
LIMIT 10
""", (platform_name,))
rows = cur.fetchall()
candidate = None
for r in rows:
rid, tkn, phone = r
if tkn not in exclude_tokens:
candidate = (rid, tkn, phone)
break
if not candidate:
cur.close()
conn.close()
return None
rid, token, phone = candidate
cur.execute(f"""
UPDATE {self.db_table}
SET status = 'reserved', reserved_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE rowid = ? AND status = 'active'
""", (rid,))
if cur.rowcount == 1:
conn.commit()
cur.close()
conn.close()
return (rid, token, phone)
else:
conn.rollback()
cur.close()
conn.close()
time.sleep(0.05)
continue
except sqlite3.OperationalError as e:
logger.debug("get_and_reserve_token sqlite busy: {}", e)
try:
if conn:
conn.rollback()
except Exception:
pass
time.sleep(0.05)
continue
except Exception as e:
logger.exception("get_and_reserve_token 异常: {}", e)
try:
if conn:
conn.rollback()
conn.close()
except Exception:
pass
return None
return None
def _with_sqlite_write_retry(self, op_fn, max_attempts: int = 6, base_delay: float = 0.05) -> bool:
"""写入重试包装器"""
import random
attempt = 0
while attempt < max_attempts:
attempt += 1
conn = None
try:
conn = sqlite3.connect(self.db_path, timeout=30)
cur = conn.cursor()
cur.execute("PRAGMA busy_timeout=5000;")
op_fn(conn, cur)
conn.commit()
cur.close()
conn.close()
return True
except sqlite3.OperationalError as e:
logger.debug("sqlite write attempt {} failed: {}", attempt, e)
try:
if conn:
conn.rollback()
conn.close()
except Exception:
pass
sleep_time = base_delay * (2 ** (attempt - 1)) + random.uniform(0, base_delay)
time.sleep(min(sleep_time, 5.0))
continue
except Exception as e:
logger.exception("sqlite write unexpected error: {}", e)
try:
if conn:
conn.rollback()
conn.close()
except Exception:
pass
return False
logger.error("sqlite write failed after {} attempts", max_attempts)
return False
def release_token_back_to_active_by_rowid(self, rowid: int):
"""释放token回active状态"""
def op(conn, cur):
cur.execute(f"""
UPDATE {self.db_table}
SET status='active', reserved_at = NULL, last_used_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE rowid = ?
""", (rowid,))
try:
with self.db_write_lock:
ok = self._with_sqlite_write_retry(op)
if ok:
logger.debug("release token rowid={} -> active", rowid)
else:
logger.error("release token rowid={} 失败(重试耗尽)", rowid)
except Exception as e:
logger.exception("release_token_back_to_active_by_rowid 错误: {}", e)
def mark_token_expired_and_release_by_rowid(self, rowid: int):
"""标记token为expired"""
def op(conn, cur):
cur.execute(f"""
UPDATE {self.db_table}
SET status='expired', reserved_at = NULL, updated_at = CURRENT_TIMESTAMP
WHERE rowid = ?
""", (rowid,))
try:
with self.db_write_lock:
ok = self._with_sqlite_write_retry(op)
if ok:
logger.info("mark token expired rowid={}", rowid)
else:
logger.error("mark token expired rowid={} 失败(重试耗尽)", rowid)
except Exception as e:
logger.exception("mark_token_expired_and_release_by_rowid 错误: {}", e)
@retry('获取task消息', 5)
def get_task(self):
"""从任务队列获取待处理任务"""
url = "https://api.granking.com/api/third/getTask?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4&date=2025-09-20&platform_ids=13"
resp = requests.get(url, timeout=(5, 20))
resp.raise_for_status()
return resp.json()
@retry('提交结果', 5)
def post_task(self, data):
"""提交任务处理结果到服务器"""
url = "https://api.granking.com/api/third/submitProjectTask"
resp = requests.post(url, json=data, timeout=(5, 300))
resp.raise_for_status()
return resp.json()
@retry('更新任务状态', 5)
def update_task_status(self, task_id: str, status: str):
"""更新任务状态"""
url = "https://api.granking.com/api/third/updateTask?app_id=aa65700299848d6f21b969dbc9f6cf7c&secret=5588071d36f0bc61af849c311a03f2c4"
payload = {"task_id": task_id, "status": status}
resp = requests.post(url, json=payload, timeout=(5, 20))
resp.raise_for_status()
return resp.json()
class MitaChatClient:
"""秘塔AI聊天客户端"""
def __init__(self):
self.api_url = "https://metaso.cn/api/search/chat"
self.headers = {
"Host": "metaso.cn",
"Connection": "keep-alive",
"metaso-pc": "pc",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
"accept": "text/event-stream",
"Content-Type": "application/json",
"sec-ch-ua-mobile": "?0",
"Origin": "https://metaso.cn",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def _build_payload(self, keyword: str, token: str) -> str:
"""构建请求payload"""
post_obj = {
"model": "fast_thinking",
"stream": True,
"messages": [{
"id": "gbt0t89g3qi08crvopt3rm2",
"key": "4u43disi3vqb7vonb37f",
"conversationId": "temp-" + str(int(time.time() * 1000)),
"role": "user",
"content": keyword,
"markdownContent": keyword,
"engineType": "",
"filter": "all",
"contentType": 0,
"outputHtml": False,
"mode": "detail",
"model": "fast_thinking",
"outputStyle": "正常"
}],
"engineType": "",
"mode": "detail",
"filter": "all",
"outputHtml": False,
"outputStyle": "正常",
"darkMode": False,
"outputLanguage": "中文",
"htmlNoDisplayEnable": True,
"metaso-pc": "pc",
"token": token
}
return json.dumps(post_obj, ensure_ascii=False)
def chat(self, keyword: str, token: str, cookie: str) -> Tuple[str, List[Dict[str, str]]]:
"""执行聊天请求并获取AI回复"""
headers = dict(self.headers)
headers['Cookie'] = cookie
data = self._build_payload(keyword, token)
refs_raw: List[Dict[str, str]] = []
answer_acc = ""
try:
with requests.post(self.api_url, data=data, headers=headers, stream=True, timeout=(5, 60)) as r:
r.encoding = "utf-8"
if r.status_code != 200:
text = r.text if hasattr(r, "text") else ""
raise RuntimeError(f"Upstream status {r.status_code}: {text[:200]}")
r.raw.decode_content = True
for ln in r.iter_lines(decode_unicode=True):
if not ln:
continue
# print(ln, end='',flush=True)
# logger.info("SSE line: {}", ln)
# 检测token失效
if isinstance(ln, str) and "搜索次数超出限制" in ln:
raise TokenExpiredError("搜索次数超出限制")
data_lines = []
if isinstance(ln, str) and ln.startswith("data:"):
data_lines.append(ln[5:].lstrip())
if not data_lines:
continue
data_str = "\n".join(data_lines)
try:
ev = json.loads(data_str)
except Exception:
continue
if ev.get("type") == "heartbeat":
continue
if ev.get("object") == "chat.completion.qa_with_agent" and "choices" in ev:
for ch in ev["choices"]:
delta = ch.get("delta") or {}
content = delta.get("content")
if isinstance(content, str) and content:
answer_acc += content
cits = delta.get("citations") or []
for c in cits:
if c.get("link") == "":
resp = requests.get("https://metaso.cn/api/knowledge/detail/" + c.get("root_id"))
id = resp.json().get("data").get("id")
second_url = c.get("file_meta").get("previewUrl")
url = "https://metaso.cn/subject-v2/" + id + "?displayUrl=" + second_url
item = {
"title": c.get("title", ""),
"url": url,
"author": (c.get("author") or c.get("authors_ori") or c.get(
"institution") or ""),
"displaySource": (c.get("snippet") or c.get("matched_snippet") or ""),
"publish_time": (c.get("date") or c.get("publish_date_str") or "")
}
else:
item = {
"title": c.get("title", ""),
"url": c.get("link", ""),
"author": (c.get("author") or c.get("authors_ori") or c.get(
"institution") or ""),
"displaySource": (c.get("snippet") or c.get("matched_snippet") or ""),
"publish_time": (c.get("date") or c.get("publish_date_str") or "")
}
refs_raw.append(item)
return answer_acc, refs_raw
except TokenExpiredError:
raise
except requests.exceptions.RequestException:
raise
except Exception:
raise
class Start:
"""秘塔任务处理器主类"""
def __init__(self):
self.tools = ToolsLoad()
self.client = MitaChatClient()
self.max_token_rotate_tries = 2
@retry('处理消息任务', for_work=10)
def process_task(self, task: Dict[str, Any], platform_name: str = "mita") -> Dict[str, Any]:
"""处理单个任务"""
task_id = task.get("id", "")
keyword = task.get("keyword", "")
brand = task.get("brand", "")
platform_id = task.get("platform_id", "")
logger.info("开始处理任务 {} (keyword={})", task_id, keyword)
exclude_tokens: List[str] = []
token_tries = 0
while token_tries < self.max_token_rotate_tries:
token_tries += 1
reserve = self.tools.get_and_reserve_token(platform_name=platform_name, exclude_tokens=exclude_tokens)
if not reserve:
logger.error("没有可用 token(platform={})", platform_name)
raise RuntimeError("未找到可用 TOKEN")
rowid, token, phone = reserve
logger.info("抢到 token rowid={} phone={} token={}...", rowid, phone or "N/A",
token[:20] if token else "N/A")
try:
# 使用抢到的 token 发起请求
answer_acc, refs_raw = self.client.chat(
keyword=keyword,
token="wr8+pHu3KYryzz0O2MaBSNUZbVLjLUYC1FR4sKqSW0r7cpL+iG/2N5cdq9x0z7Bq8yrjvk/Mn0n+M7QhbW+SLnTf1mYlCDjkvCg10L2wwrJzhdhzA38AvbPtjbzdPvpeBjSxLo7TFAba1lm8N3q7qA==",
cookie=token
)
# 成功 -> 释放 token 回 active
self.tools.release_token_back_to_active_by_rowid(rowid)
# 组装提交数据
ts = time.time()
dt = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts))
result = {
'app_id': 'aa65700299848d6f21b969dbc9f6cf7c',
'secret': '5588071d36f0bc61af849c311a03f2c4',
'platform_id': platform_id,
'platform_name': '秘塔',
'prompt': keyword,
'keyword': brand,
'answer': answer_acc,
'search_result': refs_raw,
'screenshot_file': '',
'run_status': True,
'task_id': task_id,
'rank': 0,
'start_time': dt,
'end_time': dt,
'screenshot_url': '',
'words': []
}
logger.info("任务 {} 结果返回: {}", task_id, result)
if answer_acc == '':
logger.error('获取结果异常重新获取')
return False
post_resp = self.tools.post_task(result)
logger.info("任务 {} 提交返回: {}", task_id, post_resp)
return {"task_id": task_id, "status": "ok", "detail": post_resp}
except TokenExpiredError as e:
try:
self.tools.update_task_status(task_id, "4")
except Exception:
logger.exception("更新任务状态失败")
msg = str(e)
logger.warning("TokenExpiredError: 标记 token expired (rowid={}) due to: {}", rowid, msg[:200])
try:
self.tools.mark_token_expired_and_release_by_rowid(rowid)
except Exception:
logger.exception("标记 token expired 失败(忽略并继续)")
exclude_tokens.append(token)
continue
except RuntimeError as e:
try:
self.tools.update_task_status(task_id, "4")
except Exception:
logger.exception("更新任务状态失败")
msg = str(e)
logger.exception("任务 {} 捕获运行时错误: {}", task_id, msg)
if "搜索次数超出限制" in msg:
logger.warning("识别到 runtime error 表示 token 问题,标记 expired (rowid={}): {}", rowid, msg[:200])
try:
self.tools.mark_token_expired_and_release_by_rowid(rowid)
except Exception:
logger.exception("标记 token expired 失败(忽略并继续)")
exclude_tokens.append(token)
continue
else:
self.tools.release_token_back_to_active_by_rowid(rowid)
try:
self.tools.update_task_status(task_id, "4")
print('密塔 提交任务成功')
except Exception:
logger.exception("更新任务状态失败")
return {"task_id": task_id, "status": "error", "detail": msg}
except requests.exceptions.RequestException as e:
logger.exception("网络异常:{}", e)
self.tools.release_token_back_to_active_by_rowid(rowid)
exclude_tokens.append(token)
time.sleep(1.0)
continue
except Exception as e:
msg = str(e)
logger.exception("未知异常:{}", msg)
if "搜索次数超出限制" in msg:
logger.warning("未知异常文本指示 token 失效,标记 expired (rowid={}): {}", rowid, msg[:200])
try:
self.tools.mark_token_expired_and_release_by_rowid(rowid)
except Exception:
logger.exception("标记 token expired 失败(忽略并继续)")
exclude_tokens.append(token)
continue
try:
self.tools.release_token_back_to_active_by_rowid(rowid)
except Exception:
logger.exception("密塔 释放 token 失败(忽略)")
try:
self.tools.update_task_status(task_id, "4")
except Exception:
logger.exception("更新任务状态失败")
return {"task_id": task_id, "status": "error", "detail": msg}
# 达到最大重试仍失败
logger.error("密塔 任务 {} 达到最大 token 重试次数 ({}) 仍失败", task_id, self.max_token_rotate_tries)
try:
self.tools.update_task_status(task_id, "4")
except Exception:
logger.exception("更新任务状态失败")
return {"task_id": task_id, "status": "error", "detail": "max token rotate tries exceeded"}
@retry('主运行窗口', for_work=1)
def start_task_msg(self):
"""获取并处理任务"""
task_resp = self.tools.get_task()
if not task_resp:
logger.info("密塔 get_task 未返回有效数据,等待后重试")
time.sleep(5)
return True
data = task_resp.get("data", False)
# data = {'id': '53191c4f7aa4f0e443d6bce6fd241e67', 'project_id': '019be3ee7b907329bd9114880445cd1f', 'keyword_id': '019be4653db470b39d13449ee5f7afe5', 'keyword': '广州有没有不通过装修公司直接找工长的平台', 'brand': '匠猫', 'platform_id': '4', 'gather_date': '2026-05-06', 'gather_time': '06:00', 'gather_filter': '2026-05-06 00:30:01', 'status': 2, 'retry_count': 2, 'screen_flag': 1, 'thinking': 1, 'is_deal': 1, 'is_init': 2, 'publish_time': '2026-05-06 11:29:15', 'screen_url': '', 'priority': 3, 'start_time': None, 'end_time': None, 'create_time': '2026-05-06 00:30:34', 'update_time': '2026-05-06 11:29:15', 'delete_time': 0, 'create_by': '', 'update_by': '', 'type': 1}
if not data:
logger.info("密塔 没有任务数据,等待下一轮")
time.sleep(30)
return True
return self.process_task(data)
def run(self):
"""主循环"""
logger.info("启动秘塔任务处理器")
while True:
self.start_task_msg()
if __name__ == "__main__":
Start().run()