You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
2.9 KiB

# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import httpx
import retry
from dateutil.parser import parse
import config
from utils.utils import logger
ip_pool = []
# 代理池
class ProxyIp:
ip: str
port: int
expire: datetime
city: str
isp: str
def __init__(self, ip, port, expire, city='未知', isp='未知'):
self.ip = ip
self.port = port
self.expire = parse(expire)
self.city = city
self.isp = isp
def __str__(self):
return F"({self.city}-{self.isp}){self.ip}:{self.port} 过期时间:{self.expire}"
def is_expire(self):
now = datetime.now()
expire = self.expire - timedelta(seconds=20)
return expire < now
def to_httpx_proxies(self):
return {"http://": F"http://{self.ip}:{self.port}"}
class ProxyError(Exception):
def __init__(self, message, code=-1000):
self.code = code
self.message = message
def __str__(self):
return F"错误码: {self.code} 错误消息: {self.message}"
@retry.retry(exceptions=ProxyError, tries=3, delay=2, backoff=2)
def add_ip(count=1) -> ProxyIp:
"""
向ip池中添加一个代理ip对象
:param count: 添加的数量 默认为1
:return:
"""
url = "http://api.tianqiip.com/getip"
params = {
"secret": config.PROXY_SECRET, # 密钥
"sign": config.PROXY_SIGN, # 签名
"num": count, # 数量
"type": "json", # 返回类型
"port": 1, # 协议
"time": 3, # 时长三分钟
"ts": 1, # 显示过期时间
"mr": 1, # 去重
"cs": 1, # 显示位置
"ys": 1 # 显示运营商
}
ips = []
result: dict = httpx.get(url, params=params, proxies={}).json()
if not result['code'] == 1000:
logger.error("[IP池]API获取代理IP失败")
raise ProxyError(result['code'], result['msg'])
for data in result["data"]:
ip = ProxyIp(data['ip'], data['port'], data['expire'], city=data['city'], isp=data['isp'])
ip_pool.append(ip)
ips.append(ip)
logger.info(F"[IP池]新增代理IP {str(ip)}")
return ips[0]
def del_ip(index):
if index > len(ip_pool) - 1:
return
logger.error(f"[IP池]代理IP被删除: {ip_pool[index]}")
del ip_pool[index]
def get_ip(cache=True) -> ProxyIp:
"""
获取一个代理ip对象
:param cache: 使用缓存
:return:
"""
if not cache:
# 不使用缓存时 请求一个新的ip并放入池中 然后获取该ip
return add_ip()
# 从缓存中获取一个有效的ip
if not ip_pool:
return add_ip()
cur_ip = None
for index, ip in enumerate(ip_pool):
if not ip.is_expire():
# 没过期 返回
cur_ip = ip
break
if not cur_ip:
return add_ip()
logger.info(f"[IP池]从IP池中获取到代理IP: {cur_ip}")
return cur_ip