import sqlite3 import os from typing import List, Dict, Optional # 假设 DB_PATH 和 DB_TABLE 在脚本顶部已定义 # DB_PATH = "./kimi_tokens.db" # DB_TABLE = "tokens" def upsert_token(db_path: str, table: str, phone: Optional[str], platform_name: str, token_value: str, status: str = "active") -> bool: """ 插入或更新单条 token。 - 如果表有 UNIQUE(phone, platform_name) 约束(推荐),会替换已有记录。 - 如果没有 UNIQUE 约束,函数会先尝试按 (phone, platform_name) 查找更新,否则插入新行。 返回 True 表示成功。 """ if not os.path.exists(db_path): # 如果 db 文件不存在,create table 需事先调用 ensure_tokens_table() # 这里仍尝试连接,会在后续创建表时被捕获 pass conn = sqlite3.connect(db_path, timeout=5) cur = conn.cursor() try: # 先尝试按 phone+platform_name 更新(如果 phone 为 None,则只按 platform_name 更新——谨慎使用) if phone is not None: cur.execute(f"SELECT id FROM {table} WHERE phone = ? AND platform_name = ?", (phone, platform_name)) row = cur.fetchone() if row: cur.execute(f"UPDATE {table} SET token = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (token_value, status, row[0])) conn.commit() return True else: # phone is None:按 platform_name 更新最新一条 active/任意记录(谨慎) cur.execute(f"SELECT id FROM {table} WHERE platform_name = ? ORDER BY created_at DESC LIMIT 1", (platform_name,)) row = cur.fetchone() if row: cur.execute(f"UPDATE {table} SET token = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (token_value, status, row[0])) conn.commit() return True # 若没找到可更新的记录,插入新行 cur.execute(f"INSERT INTO {table} (phone, platform_name, token, status) VALUES (?, ?, ?, ?)", (phone, platform_name, token_value, status)) conn.commit() return True except sqlite3.IntegrityError: # 如果你的表定义了 UNIQUE(phone,platform_name) 并触发了冲突,可以用 replace try: cur.execute(f"INSERT OR REPLACE INTO {table} (phone, platform_name, token, status) VALUES (?, ?, ?, ?)", (phone, platform_name, token_value, status)) conn.commit() return True except Exception as e: raise finally: cur.close() conn.close() def bulk_upsert_tokens(db_path: str, table: str, rows: List[Dict[str, str]]) -> int: """ 批量插入/更新 tokens。rows 是 dict 列表,每项包含 keys: phone, platform_name, token, status (可选) 返回成功处理的行数。 示例 rows: [{"phone":"138...", "platform_name":"kimi", "token":"xxx"}, {...}, ...] """ if not rows: return 0 conn = sqlite3.connect(db_path, timeout=10) cur = conn.cursor() processed = 0 try: # 使用事务加速批量写入 cur.execute("BEGIN") for r in rows: phone = r.get("phone") platform_name = r["platform_name"] token_value = r["token"] status = r.get("status", "active") # 尝试按 phone+platform 更新 if phone is not None: cur.execute(f"SELECT id FROM {table} WHERE phone = ? AND platform_name = ?", (phone, platform_name)) row = cur.fetchone() if row: cur.execute(f"UPDATE {table} SET token = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (token_value, status, row[0])) processed += 1 continue else: cur.execute(f"SELECT id FROM {table} WHERE platform_name = ? ORDER BY created_at DESC LIMIT 1", (platform_name,)) row = cur.fetchone() if row: cur.execute(f"UPDATE {table} SET token = ?, status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", (token_value, status, row[0])) processed += 1 continue # 否则插入 cur.execute(f"INSERT INTO {table} (phone, platform_name, token, status) VALUES (?, ?, ?, ?)", (phone, platform_name, token_value, status)) processed += 1 conn.commit() return processed except Exception as e: conn.rollback() raise finally: cur.close() conn.close() def set_all_status_active(db_path: str = "./kimi_tokens.db", table: str = "tokens") -> int: """ 一键设置库里面所有记录的 status="active" 返回更新的记录数量 """ conn = sqlite3.connect(db_path, timeout=10) cur = conn.cursor() try: # 执行更新操作,将所有记录的 status 设置为 "active" cur.execute(f"UPDATE {table} SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE status != ?", ("active", "active")) updated_count = cur.rowcount conn.commit() print(f"成功将 {updated_count} 条记录的 status 设置为 'active'") return updated_count except Exception as e: conn.rollback() print(f"设置 status='active' 时出错: {e}") raise finally: cur.close() conn.close() def query_tokens(platform_name: str = None, status: str = None): TABLE = "tokens" conn = sqlite3.connect("./kimi_tokens.db") cur = conn.cursor() sql = f"SELECT id, phone, platform_name, token, status, created_at, updated_at FROM {TABLE} WHERE 1=1" params = [] if platform_name: sql += " AND platform_name=?" params.append(platform_name) if status: sql += " AND status=?" params.append(status) sql += " ORDER BY created_at ASC" cur.execute(sql, params) rows = cur.fetchall() cur.close() conn.close() for row in rows: print(row) # 示例 # query_tokens(platform_name="kimi", status="active") # ------------------------- # 示例用法 # ------------------------- if __name__ == "__main__": # 假设 ensure_tokens_table() 已经被调用过 DB = "./kimi_tokens.db" TABLE = "tokens" # 一键设置所有记录的 status="active" print("正在执行一键设置所有记录 status='active'...") updated_count = set_all_status_active(DB, TABLE) print(f"操作完成,共更新 {updated_count} 条记录") # 验证结果 print("\n验证结果:") query_tokens() # 单条 upsert # ok = upsert_token(DB, TABLE, phone="16223053815", platform_name="mita", token_value="JSESSIONID=356E6FEEBF81933FAFE924ED6A7C324C; aliyungf_tc=70df8ace7303f22dccbacee689c1abc32b0a7629a28579fdfefad08c7d04f536; tid=cc159aeb-4f9b-463a-9080-2ebc2d88f0bc; __eventn_id_UMO2dYNwFz=065s1a391e; traceid=2dc94bfd49cc4992; sid=c764bb074d3b496fbeb4ad5eaa8835f0; uid=69255736dddbb6ba7df1905c", status="active") # print("upsert single:", ok) # query_tokens(platform_name="mita", status="active") # # 批量 upsert # rows = [ # {"phone": "13800138000", "platform_name": "kimi", "token": "token_a"}, # {"phone": "13800138001", "platform_name": "kimi", "token": "token_b"}, # {"phone": None, "platform_name": "other", "token": "token_c"}, # ] # n = bulk_upsert_tokens(DB, TABLE, rows) # print("bulk processed:", n)