# coding=utf-8 import os import re import time import json import requests from DrissionPage import ChromiumPage, ChromiumOptions, Chromium from psutil import users class LubanSMS: """鲁班短信平台API封装""" def __init__(self, username, password, sid='97827'): self.sid = sid self.username = username self.password = password self.base_url = "https://api.haozhuma.com" self.token = self.get_token() self.get_token() def get_token(self): url = f'{self.base_url}/sms/?api=login&user={self.username}&pass={self.password}' res = requests.get(url).json() # print(res) token = res['token'] return token def get_balance(self): """查询余额""" url = f"{self.base_url}/sms/?api=getSummary&token={self.token}" response = requests.get(url).json() print('账号信息:', response) return response def get_number(self): """获取手机号码""" url = f'{self.base_url}/sms/?api=getPhone&token={self.token}&sid={self.sid}' response = requests.get(url) return response.json() def get_sms(self, phone): """获取短信验证码""" url = f"{self.base_url}/sms/?api=getMessage&token={self.token}&sid={self.sid}&phone={phone}" response = requests.get(url) return response.json() def set_status(self, phone): """拉黑号码""" url = f'{self.base_url}/sms/?api=addBlacklist&token={self.token}&sid={self.sid}&phone={phone}' response = requests.get(url).json() print(response) return response def save_spider_session(platform_id='4', file_url='', account='test', cookie=''): """ 新增爬虫session :param file_url: :param platform_id: :param url: :param file_hash: :param account: :return: """ url = 'http://granking-api.neicela.com/api/third/saveSpiderSession' json_data = { 'app_id': 'aa65700299848d6f21b969dbc9f6cf7c', 'secret': '5588071d36f0bc61af849c311a03f2c4', 'platform_id': platform_id, 'account': account, 'url': file_url, 'hash': str(int(time.time() * 1000)), 'cookie': cookie } res = requests.post(url, json=json_data).json() print(res) return res def kimi_auto_login(): """ 使用DrissionPage和鲁班短信平台API自动登录Kimi api_key: 鲁班短信平台API密钥 service_id: 服务ID,默认为Tinder越南服务 """ # 初始化鲁班短信API # 检查余额 balance_info = luban.get_balance() if balance_info.get("code") != 0: print(f"API密钥错误: {balance_info.get('msg')}") return None print(f"当前余额: {balance_info.get('money')}元") co = ChromiumOptions() co.incognito(True) # 匿名模式 co.auto_port() # co.set_argument('--no-sandbox') # 无沙盒模式 # 创建浏览器页面 br = Chromium(co) page = br.new_tab() # page.set.load_mode.eager() try: print("正在打开Kimi网站...") # 访问Kimi主页 page.get('https://www.kimi.ai', timeout=5) # 等待页面加载 # 点击登录按钮 try: login_btn = page.ele('text=登录', timeout=10) if login_btn: login_btn.click() print("已点击登录按钮") except: print("未找到登录按钮,尝试其他方式...") # 尝试点击用户头像区域 try: user_info = page.ele('.user-info', timeout=10) if user_info: user_info.click() print("已点击用户头像") except: pass time.sleep(2) page.ele('xpath:/html/body/div[3]/div/div/div/label/span').click() # 获取手机号码 number_info = {} for i in range(30): print("正在获取手机号码...") number_info = luban.get_number() print(number_info) if str(number_info.get("code")) != '0': print(f"获取手机号码失败: {number_info.get('msg')}") continue else: break phone_number = number_info.get('phone') if not phone_number: return False print(f"获取到手机号码: {phone_number}") # 输入手机号码 try: phone_input = page.ele('.phone-login-mobile-number', timeout=10) if phone_input: phone_input.clear() phone_input.input(phone_number) print("已输入手机号码") except Exception as e: print(f"输入手机号码失败: {e}") return None # 点击发送验证码按钮 try: send_code_btn = page.ele('text:发送验证码', timeout=10) if send_code_btn: send_code_btn.click() print("已点击发送验证码") if page.ele('.yidun_tips', timeout=5): print('出现验证码换一个') time.sleep(5) return False except Exception as e: print(f"点击发送验证码按钮失败: {e}") return None # 等待并获取验证码 print("等待验证码...") sms_code = None max_attempts = 60 # 最多等待30次,每次3秒 for i in range(max_attempts): sms_info = luban.get_sms(phone_number) print(sms_info) if str(sms_info.get("code")) == '0': sms_code = sms_info.get("sms") print(f"收到验证码: {sms_code}") break else: print(f"等待验证码中...({i + 1}/{max_attempts})") time.sleep(2) if not sms_code: print("未收到验证码,超时") # 释放号码 luban.set_status(phone_number) return None # 输入验证码 try: code_input = page.ele('css:input[placeholder="请输入验证码"]') if code_input: code_input.clear() yzm = re.search(r'验证码[::]\s*(\d{4,6})', sms_code).group(1) code_input.input(yzm) print("已输入验证码") except Exception as e: print(f"输入验证码失败: {e}") return None # 点击登录按钮 try: page.listen.start(['/api/user/wx/register_login/', '/api/device/register', '/api/user']) login_submit_btn = page.ele('text:登录', timeout=10) if login_submit_btn: login_submit_btn.click() print("已点击登录按钮") except Exception as e: print(f"点击登录按钮失败: {e}") # return None # 等待登录完成 print("等待...") time.sleep(5) token = '' while True: req = page.listen.wait(timeout=3) if not req: print('没有找到登录接口') time.sleep(3) break else: headers = req.request.headers print(headers) if 'authorization' in headers: token = headers['authorization'] token = token.split(' ')[-1] print(token) save_spider_session(cookie=token, account=str(phone_number)) break return token except Exception as e: print(f"发生错误: {e}") return None finally: # 关闭浏览器 try: luban.set_status(phone_number) page.close() br.quit() except Exception as e: print(e) # eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1c2VyLWNlbnRlciIsImV4cCI6MTc4MDY1MDUzMSwiaWF0IjoxNzc4MDU4NTMxLCJqdGkiOiJkN3RnYTh0cWlwNjhpa3ZqdGtqMCIsInR5cCI6ImFjY2VzcyIsImFwcF9pZCI6ImtpbWkiLCJzdWIiOiJkN3RnYTh0cWlwNjhpa3ZqdGtlMCIsInNwYWNlX2lkIjoiZDd0Z2E4dHFpcDY4aWt2anRrZGciLCJhYnN0cmFjdF91c2VyX2lkIjoiZDd0Z2E4dHFpcDY4aWt2anRrZDAiLCJzc2lkIjoiMTczMTc0MTgwODg3MjkyMjE1MyIsImRldmljZV9pZCI6Ijc2MzY3MDMwMDkwOTA1ODg0MjciLCJyZWdpb24iOiJjbiIsIm1lbWJlcnNoaXAiOnsibGV2ZWwiOjEwfX0.t9mZ7v-XRWQp91QAWxhvG2l5-VKNwJUIv2R2YDD9ilOUStOh6Oz03wiwQkKLcxke17PuSfcj6DUY_8YByTl1rA if __name__ == '__main__': # 需要用户输入API密钥 luban = LubanSMS( 'ef7f3e7dcafdcfcbe7aa4afda8efbabed39432e897032fbc6d43958513acaf83', 'c9d369d3fa0f3481158aa2b320b8067f61ae4f9d3801c132eae1c9e48e350791', sid='97827') a = 0 for u in range(100): try: result = kimi_auto_login( ) if result: a += 1 print(f"自动登录完成,cookies文件: {result}") print('成功:', a) else: print("自动登录失败") time.sleep(5) except Exception as e: print(e) pass