# yuanbao_sms_auto.py """ Playwright 自动登录(sms_auto 完整实现模板) 流程: 1. 向接码平台下单获取一个手机号(得到 order_id / phone / phone_id 等) 2. 将手机号填写到页面并触发发送验证码 3. 轮询接码平台查询该订单的短信列表并抽取验证码 4. 把验证码填入页面并完成登录 5. 登录成功后导出 cookie 并上传到 /cookies/add 注意:需要你把 PROVIDER_* 常量替换为真实接码平台的 API 信息或实现 provider adapter。 """ import os import re import requests from typing import Optional, Dict, Any, List from playwright.sync_api import sync_playwright, TimeoutError as PWTimeoutError from utlit.update_db import upsert_token # ========== 配置区(按照你要对接的接码平台修改) ========== COOKIES_ADD_URL = os.getenv("COOKIES_ADD_URL", "http://127.0.0.1:5001/cookies/add") API_KEY = os.getenv("COOKIES_ADD_API_KEY", "144defc3b314ef9f37d45651e6b1228f") TARGET = os.getenv("TARGET", "https://metaso.cn/") HEADLESS = False # 接码平台通用配置(下面是示例占位) LUBAN_API_BASE = os.getenv("LUBAN_API_BASE", "https://lubansms.com/v2/api") LUBAN_API_KEY = os.getenv("LUBAN_API_KEY", "5ebc10c8e8c35797de15c9af46063b36") DEFAULT_TIMEOUT = 15 # seconds DEFAULT_CARD_TYPE = os.getenv("LUBAN_CARDTYPE", "1") # 根据平台说明填写默认卡种 # 超时与轮询参数 ORDER_TIMEOUT = 60 # 等待平台分配号码的超时(秒) SMS_POLL_TIMEOUT = 180 # 等待短信到达的超时(秒) SMS_POLL_INTERVAL = 3 # 轮询间隔(秒) CODE_REGEX = re.compile(r"(\d{4,8})") # 匹配 4~8 位数字验证码 # ========== provider adapter 模式(必须实现这些函数) ========== # 这些函数为示例实现,你需要根据真实接码平台 API 调整请求及解析逻辑。 # Helper to build URL def _url(path: str) -> str: return LUBAN_API_BASE.rstrip("/") + "/" + path.lstrip("/") # ---------- provider_request_number ---------- def provider_request_number(card_type: Optional[str] = None, **kwargs) -> Dict[str, Any]: """ 请求/租用一个带关键字的号码(getKeywordNumber) 返回示例: {"ok": True, "order_id": "", "phone": "13800138000", "raw": } 失败返回: {"ok": False, "error": "...", "raw": } 参数: - country: 国家码(字符串),lubansms 的 API 若无 country 字段可忽略 - card_type: 卡种/套餐类型(平台文档里的 cardType 值) """ card_type = card_type or DEFAULT_CARD_TYPE apikey = LUBAN_API_KEY if not apikey or apikey == "YOUR_APIKEY": return {"ok": False, "error": "no_api_key_configured"} params = { "apikey": apikey, "phone": "", # 有些接口要求 phone param 空或占位,保留 "cardType": card_type } try: resp = requests.get(_url("getKeywordNumber"), params=params, timeout=DEFAULT_TIMEOUT) except Exception as e: return {"ok": False, "error": f"request_failed: {e}"} # 尝试解析 JSON try: j = resp.json() if j.get("code") == 0: return j except Exception: return {"ok": False, "error": "invalid_json", "raw": resp.text} # ---------- provider_get_messages ---------- def provider_get_messages(phone: str, keyword: Optional[str] = None, **kwargs) -> Dict[str, Any]: """ 拉取短信(getKeywordSms)。 返回: 收到短信: {"ok": True, "messages": [{"text": "...", "ts": 1234567890}], "raw": ...} 等待短信: {"ok": False, "error": "waiting_sms", "raw": ...} API 错误: {"ok": False, "error": "api_error", "raw": ...} """ apikey = LUBAN_API_KEY if not apikey or apikey == "YOUR_APIKEY": return {"ok": False, "error": "no_api_key_configured"} params = {"apikey": apikey, "phone": phone} if keyword: params["keyword"] = keyword try: resp = requests.get(_url("getKeywordSms"), params=params, timeout=DEFAULT_TIMEOUT) resp.raise_for_status() except Exception as e: return {"ok": False, "error": f"request_failed: {e}"} try: j = resp.json() except Exception: return {"ok": False, "error": "invalid_json", "raw": resp.text} code = j.get("code") msg = j.get("msg", "") if code == 0: # 收到短信 return {"ok": True, "messages": j.get("msg", ""), "raw": j} elif code == 400 and "尚未收到短信" in msg: # 等待短信 return {"ok": False, "error": "waiting_sms", "raw": j} else: # 其他 API 错误 return {"ok": False, "error": "api_error", "raw": j} # ---------- provider_cancel_order ---------- def provider_cancel_order( phone: str = None, **kwargs) -> Dict[str, Any]: """ 释放/取消号码(delKeywordNumber)。 返回 {"ok": True} 或 {"ok": False, "error": "..."} """ apikey = LUBAN_API_KEY if not apikey or apikey == "YOUR_APIKEY": return {"ok": False, "error": "no_api_key_configured"} params = {"apikey": apikey, "phone": phone} try: resp = requests.get(_url("delKeywordNumber"), params=params, timeout=DEFAULT_TIMEOUT) except Exception as e: return {"ok": False, "error": f"request_failed: {e}"} try: j = resp.json() except Exception: return {"ok": False, "error": "invalid_json", "raw": resp.text} # 成功判断:根据平台文档调整条件(常见 code==0 或 success==True) if isinstance(j, dict) and (j.get("code") in (0, "0") or j.get("success") in (True, "true")): return {"ok": True, "raw": j} # 仍返回原始响应以便调试 return {"ok": False, "error": "provider_error", "raw": j} # ========== 工具函数 ========== def extract_code_from_text(text: str) -> Optional[str]: m = CODE_REGEX.search(text or "") return m.group(1) if m else None def cookies_to_string(cookies): return "; ".join(f"{c['name']}={c['value']}" for c in cookies) def upload_cookie(cookie_str): headers = {"X-API-Key": API_KEY, "Content-Type": "application/json"} resp = requests.post(COOKIES_ADD_URL, headers=headers, json={"cookie": cookie_str}, timeout=30) resp.raise_for_status() return resp.json() from playwright.sync_api import Page, TimeoutError import time def ensure_logged_in_ui(page: Page, wait_for_login_selector: str = None, timeout: int = 30): """ 检查页面是否显示未登录(基于 .nick-info-name 包含 '未登录')。 若未登录则点击触发器(.t-trigger 或 avatar 容器),并等待登录页面/弹窗出现。 参数: page: Playwright Page 对象 wait_for_login_selector: 登录弹窗/页面可见后用于等待的选择器(可选) timeout: 等待登录出现的超时(秒) 返回: True if we clicked/opened login (or already logged in False if already logged?) """ try: # 方法一:用按钮文本(最简单、最稳) btn = page.get_by_role("button", name="登录/注册") btn.wait_for(state="visible", timeout=5000) btn.click() return True except PWTimeoutError: # 备选:更精确的 CSS 选择器(你贴的 HTML 中有很多 class) try: btn2 = page.locator("button.css-veatkv", has_text="登录/注册") btn2.wait_for(state="visible", timeout=3000) btn2.click() except Exception as e: print("点击登录按钮失败:", e) page.screenshot(path="fail_login.png") return False from playwright.sync_api import Page, TimeoutError import time def click_phone_tab(page: Page, timeout: int = 10) -> bool: """ 尝试点击“手机”选项(返回 True 表示已点击并选中) """ try: btn = page.locator("p.css-141qk0f", has_text="手机验证") btn.wait_for(state="visible", timeout=5000) btn.click() print("已点击:手机验证") return True except Exception as e: print("点击手机验证失败:", e) # page.screenshot(path="fail_login.png") return False import re from playwright.sync_api import Page, TimeoutError import time def click_agree_span(page: Page, timeout: float = 5.0) -> bool: """ 尝试点击页面中显示文本 '我已阅读并同意 ' 的 元素。 返回 True 表示已点击(或已勾选),False 表示失败。 """ try: # 首选:直接 check input(安全且会跳过已选状态) checkbox = page.locator("#desktop-login-policy") checkbox.wait_for(state="attached", timeout=3000) checkbox.check(timeout=3000) # 如果已经选中不会抛错 print("使用 input#desktop-login-policy.check() 成功") except Exception as e: print("直接 check 失败,尝试点击包含的 label/span ->", e) try: # 备选:点击包含 checkbox 的 label(对自定义样式友好) page.locator("label:has(#desktop-login-policy)").first.click(timeout=3000) print("通过点击 label 成功") except Exception as e2: print("label 点击失败,尝试强制点击渲染的 checkbox span ->", e2) try: page.locator("input#desktop-login-policy").first.click(force=True) print("force click 成功") except Exception as e3: print("所有方法失败:", e3) page.screenshot(path="checkbox_click_fail.png") # 验证:检查是否真的被选中 try: is_checked = page.locator("#desktop-login-policy").is_checked() print("当前 checked 状态:", is_checked) except Exception: print("无法读取 checked 状态,可能是自定义组件(需检查 aria- 属性或 class)") def fill_phone_and_send(page: Page, phone: str, wait_for_sent: float = 10.0) -> bool: """ 填手机号并点击“获取验证码”。 phone: e.g. "13800138000" or "+8613800138000" or "8613800138000" wait_for_sent: 点击后等待发送按钮变为 loading 或不可见的超时时间(秒) 返回 True 表示已触发发送;False 表示未触发(或出错) """ # 规范化号码:去掉空白,保留前导 + 或数字 phone = (phone or "").strip() m = re.match(r'^\+?(\d+)$', phone) if not m: raise ValueError("phone 格式不合法: " + repr(phone)) num = m.group(1) # 如果传入带国家码且是中国 86,可以把前缀去掉用于填写(页面通常期望输入国内号) # 你可按目标页面需求修改:这里默认若以86开头则去掉 if num.startswith("86") and len(num) > 8: num_to_fill = num[2:] else: num_to_fill = num # 1) 先确保手机选项已选(若你在页面切换到手机登录后再调用可跳过) # (可选) 点击国家码下拉以切换国家 # try: # # 点击区号标签以打开可能的面板(若不需要可跳过) # if page.query_selector(".yuanbao-oversea-input__wrap__formitem .yuanbao-oversea-input__wrap__formitem__areaCode"): # try: # page.click(".yuanbao-oversea-input__wrap__formitem .yuanbao-oversea-input__wrap__formitem__areaCode") # except Exception: # pass # except Exception: # pass # 2) 填手机号输入框 phone_selectors = [ "#desktop-login-phone input[type='tel']", "input[placeholder*='手机号']", ] phone_filled = False for sel in phone_selectors: try: el = page.query_selector(sel) if el: el.fill(num_to_fill) phone_filled = True break except Exception: pass if not phone_filled: raise RuntimeError("无法找到手机号输入框,检查选择器或页面是否加载完成") # 3) 点击“获取验证码”按钮(优先点击可见的 .hyc-phone-login__send-code) send_selectors = [ "button:has-text('发送验证码')", "text=发送验证码" ] clicked = False for s in send_selectors: try: el = page.query_selector(s) if not el: continue # 如果元素存在但看起来是禁用/灰色(可能用 style 或 aria-disabled 标示),尝试等待变为可点 # 尝试点击 try: el.click() clicked = True break except Exception: # fallback: use page.click(selector) try: page.click(s) clicked = True break except Exception: # 可能被禁用,继续检测 pass except Exception: pass if not clicked: return False return True def click_login_and_wait(page: Page, timeout: float = 10.0) -> bool: """ 点击登录按钮并等待登录完成或超时。 - page: Playwright Page - timeout: 等待登录成功的最长时间(秒) 返回 True 表示检测到登录成功(或提交已触发并可能成功),False 表示超时或失败。 """ btn2 = page.locator("button.css-l7o7jf", has_text="登录/注册") btn2.wait_for(state="visible", timeout=3000) btn2.click() return True # 用法示例(在你的 Playwright 脚本里) # page = context.new_page() # page.goto(TARGET) # opened = ensure_logged_in_ui(page, wait_for_login_selector="css=div.login-modal, form#loginForm", timeout=20) # if opened: # print("尝试触发登录弹窗,等待用户或自动化完成登录") # else: # print("未触发登录:可能已经登录或页面结构不同") # ========== 主流程:下单→填写→轮询→登录 ========== def run_sms_auto_login(): # 1) 向接码平台请求号码 print("[*] 向接码平台请求号码...") order = provider_request_number() if not order.get("phone"): raise RuntimeError("Request number failed: " + str(order.get("error"))) phone = order["phone"] print(f"[+] 获取到手机号: {phone} ") # phone = "18751835986" with sync_playwright() as p: browser = p.chromium.launch(headless=HEADLESS,executable_path= r'C:\Program Files\Google\Chrome\Application\chrome.exe') context = browser.new_context() page = context.new_page() page.goto(TARGET) print("[*] 已打开目标页面,请等待页面加载并自动填写手机号...") ensure_logged_in_ui(page) time.sleep(2) click_phone_tab(page) click_agree_span(page) # === 将号码输入到页面的手机号输入框 === # 请根据目标页面调整选择器 fill_phone_and_send(page,phone) # 2) 轮询接码平台获取短信 print("[*] 轮询接码平台获取短信,超时 %s s ..." % SMS_POLL_TIMEOUT) start = time.time() last_err = None code = None while True: if time.time() - start > SMS_POLL_TIMEOUT: last_err = "sms_poll_timeout" break try: resp = provider_get_messages(phone, "秘塔") print(f"获取短信响应{resp}") except Exception as e: last_err = str(e) resp = {"ok": False, "error": last_err} if not resp.get("ok"): # 可打印 provider 返回信息,便于排错 print("[...] provider no messages yet:", resp.get("error")) else: if "已被当前号码提供方屏蔽" in resp.get("messages",""): code = None break #{"ok": True, "messages": [{"text": "12345", "ts": 12345}, ...]} msgs = resp.get("messages") or [] # 按时间顺序遍历最新消息,尝试抽码 code = extract_code_from_text(msgs) if code: print("[+] 抽取到验证码:", code) break if code: break time.sleep(SMS_POLL_INTERVAL) if not code: print("[-] 未收到验证码或匹配失败:", last_err) try: data = provider_cancel_order(phone) print(f"释放号码响应{data}") except Exception: pass browser.close() raise RuntimeError("Failed to obtain SMS code: " + str(last_err)) # 3) 把验证码填入页面并提交 sms_input_candidates = [ "input[placeholder*='短信验证码']" ] filled_sms = False for sel in sms_input_candidates: try: el = page.query_selector(sel) if el: el.fill(code) filled_sms = True print("[+] 已把验证码填入选择器:", sel) break except Exception: pass if not filled_sms: # 允许用户手工粘贴 print("[!] 脚本无法定位验证码输入框,请在浏览器手动粘贴验证码后提交") print("验证码:", code) input("粘贴并提交后按回车继续...") click_login_and_wait(page) # 4) 等待登录成功并导出 cookie # 等待一定时间观察页面或 cookie # input("[*] 等待登录完成(若页面需手动点登录请操作)...") time.sleep(5) cookies = context.cookies() cookie_str = cookies_to_string(cookies) DB = "./kimi_tokens.db" TABLE = "tokens" # 单条 upsert ok = upsert_token(DB, TABLE, phone=phone, platform_name="mita", token_value=cookie_str, status="active") print("upsert single:", ok) browser.close() return cookie_str # ========== CLI ========== if __name__ == "__main__": while 1: try: c = run_sms_auto_login() print("[DONE] cookie length:", len(c or "")) except Exception as e: print("[ERROR]", e)