如何构建一个自己的代理ip池
本文介绍了一个代理IP池的构建与管理方案,旨在为爬虫提供有效的代理IP支持,避免被反爬机制封 blocking。代理IP池的组件包括:
获取代理IP的脚本:使用Flask框架提供API接口,支持从站大爷等代理服务获取和补充代理IP。
代理IP池的验证机制:通过IP138接口验证代理IP的有效性,确保代理IP的可靠性。
代理IP池的生命周期:设置代理IP的过期时间,确保代理IP池的动态管理。
代理IP池的维护脚本:定期清理过期IP,确保代理池的有效性。
代理IP池的调度器:启动和管理各个组件,确保代理IP池的稳定运行。
该方案通过子进程启动和管理各个组件,实现代理IP池的高效维护和管理,同时支持多种代理IP服务的接入与切换。
前言
在爬虫领域,当爬虫的访问频率达到目标网站的预警值时,可能会触发该网站的反爬机制。而封禁访问者的IP地址是常见的反爬技术之一。
当ip被封禁后,所有来自该ip的请求将无法获得正确的响应。此时,我们通常会采用代理ip池来解决问题。
什么是代理ip池?
简单来说,可以想象成一个池子,里面塞满了代理IP。它具有以下特征:
-
- 池子里的IP具有生命周期特征,定期 undergo verification,失效的IP会被移除。
-
- 池子里的IP有补充来源,会有新的代理IP通过补充渠道被加入池子。
-
- 池子中的代理IP可以被随机抽取使用。
这样,代理池中始终保持着多个不断更替的优质代理IP,我们可以通过随机从代理池中选取代理IP,然后指导爬虫程序通过该代理IP访问目标网站,从而有效规避被网络爬虫封禁的风险。
今天,我们来聊聊如何构建自己的代理IP池。此外,我们设计一个灵活的代理池,它提供两种代理方式:
-
- 每次都会从http接口获取一个随机的代理IP,在爬虫中使用该代理IP(普遍采用这种方式)
-
- 通过squid3代理进行请求转发,配置squid3代理服务器地址,通过squid3代理自动转发给代理池中的代理
项目已开源至github,如需应用,可直接访问该github仓库:open_proxy_pool
地址:https://github.com/AaronJny/open_proxy_pool
原理请往下看。
转载请注明出处:<>
代理池结构
代理池的组件可以大致描述如下:
-
- 收集并补充代理IP的渠道,定期将收集到的代理IP纳入代理池
-
- 实施代理IP的有效性验证机制,定期对代理池中的IP进行有效性验证并淘汰失效的IP
-
- 开发一个提供获取随机代理IP的API服务
-
- 编写squid3维护脚本,该脚本定期从代理池中获取可用IP并更新squid的转发代理列表
-
- 设计一个调度器程序,作为整体系统的入口,负责协调各组件的运行
如果不是很理解,没关系,请往下看,我会细说。
环境说明
采用的软件环境,其主要功能是实现代理IP池。
- 1.redis服务器,用以存放代理池相关数据
- 2.flask,用以实现提取单个随机代理的api
- 3.squid3,用以实现代理转发
组件1-获取代理ip的渠道
获取代理IP的途径多种多样。从宏观上讲,大致可分为两大类:一类是免费代理,另一类是收费代理。
免费代理,不言而喻,其最大优势就是免费,无需投入任何成本,通过简单的搜索即可轻松获取。缺点也十分明显,由于免费代理的本质,其质量无法得到保障,绝大多数无法正常使用,能使用的那些,运行速度却极其缓慢。
相比而言,收费代理的质量更好。不同平台之间的代理质量存在差异,价格方面也有所差别,建议根据自身需求进行对比分析。
在个人学习中,如果资金优先,可以考虑使用免费代理;如果资金充裕,可以购买一到七天的代理,价格也不高。我更倾向于推荐付费代理,因为免费代理的质量确实让人难以恭维。
企业商用的话,优先考虑收费代理吧,会稳定很多。
我选择了站大爷作为代理服务商,声明一下,我绝对不收取任何广告费用。不得不承认,站大爷的代理服务质量一般,但能满足基本需求。其他几家的代理服务略胜一筹,但优质选择较为有限。选择站大爷的主要原因在于,其支持账号密码访问的便捷模式。
未使用过收费代理的朋友可能 unaware of the convenience of free alternatives, but those who have experience with paid proxy services understand that when using a paid proxy platform to obtain IP addresses, it is mandatory to bind your device's IP address to the proxy account. For instance, if your device's IP address is 123.123.123.123, you must first register this IP with the proxy platform under the account, enabling only devices with the same IP address to access the obtained IPs. This limitation becomes particularly inconvenient when dealing with multiple devices, as purchasing a proxy for each machine would be both time-consuming and costly.
在某些平台或论坛上,除了通过绑定IP地址的方法外,另一种方法是通过账号和密码获取代理信息。采用这种方法后,从而可以不受IP地址的限制。说实话,感觉挺不错的。
针对的编码,比如使用站大爷作为示例,可以自行编写相关脚本用于其他代理服务。其运行原理和基本逻辑是一致的,针对处理的细节部分则无需过多调整。
购买事宜我已经说明过了。如需购买,请直接访问官网并选择短效优质代理。
先放出这部分的完整代码,附有注释。
# -*- coding: utf-8 -*-
# @File : get_ip.py
# @Author: AaronJny
# @Date : 18-12-14 上午10:44
# @Desc : 从指定网站上获取代理ip,
# 我目前在使用站大爷,就以站大爷为例
import requests
import time
import utils
import settings
from gevent.pool import Pool
from gevent import monkey
monkey.patch_all()
class ZdyIpGetter:
"""
从`站大爷`上提取代理ip的脚本,使用其他代理服务的可自行编写相关脚本,
原理和逻辑都是相通的,部分细节上需要针对处理
"""
def __init__(self):
# 购买服务时,网站给出的提取ip的api,替换成自己的
self.api_url = 'http://xxxxxxxxxxxxxxxxxxxxxxxxxx'
self.logger = utils.get_logger(getattr(self.__class__, '__name__'))
self.proxy_list = []
self.good_proxy_list = []
self.pool = Pool(5)
self.server = utils.get_redis_client()
def check_proxy(self, proxy):
"""
检查代理是否可用,
并将可用代理加入到指定列表中
:param proxy:
:return:
"""
if settings.USE_PASSWORD:
tmp_proxy = '{}:{}@{}'.format(settings.USERNAME, settings.PASSWORD, proxy)
else:
tmp_proxy = '{}'.format(proxy)
proxies = {
'http': 'http://' + tmp_proxy,
'https': 'https://' + tmp_proxy,
}
try:
# 验证代理是否可用时,访问的是ip138的服务
resp = requests.get('http://2019.ip138.com/ic.asp', proxies=proxies, timeout=10)
# self.logger.info(resp.content.decode('gb2312'))
# 判断是否成功使用代理ip进行访问
assert proxy.split(':')[0] in resp.content.decode('gb2312')
self.logger.info('[GOOD] - {}'.format(proxy))
self.good_proxy_list.append(proxy)
except Exception as e:
self.logger.info('[BAD] - {} , {}'.format(proxy, e.args))
def get_proxy_list(self):
"""
提取一批ip,筛选出可用的部分
注:当可用ip小于两个时,则保留全部ip(不论测试成功与否)
:return:
"""
while True:
try:
res = requests.get(self.api_url, timeout=10).content.decode('utf8')
break
except Exception as e:
self.logger.error('获取代理列表失败!重试!{}'.format(e))
time.sleep(1)
if len(res) == 0:
self.logger.error('未获取到数据!')
elif 'bad' in res:
self.logger.error('请求失败!')
# 检测未考虑到的异常情况
elif res.count('.') != 15:
self.logger.error(res)
else:
self.logger.info('开始读取代理列表!')
for line in res.split():
if ':' in line:
self.proxy_list.append(line.strip())
self.pool.map(self.check_proxy, self.proxy_list)
self.pool.join()
# 当本次检测可用代理数量小于2个时,则认为检测失败,代理全部可用
if len(self.good_proxy_list) < 2:
self.good_proxy_list = self.proxy_list.copy()
self.logger.info('>>>> 完成! <<<<')
def save_to_redis(self):
"""
将提取到的有效ip保存到redis中,
供其他组件访问
:return:
"""
for proxy in self.good_proxy_list:
self.server.zadd(settings.IP_POOL_KEY, int(time.time()) + settings.PROXY_IP_TTL, proxy)
def fetch_new_ip(self):
"""
获取一次新ip的整体流程控制
:return:
"""
self.proxy_list.clear()
self.good_proxy_list.clear()
self.get_proxy_list()
self.save_to_redis()
def main(self):
"""
周期获取新ip
:return:
"""
start = time.time()
while True:
# 每 settings.FETCH_INTERVAL 秒获取一批新IP
if time.time() - start >= settings.FETCH_INTERVAL:
self.fetch_new_ip()
start = time.time()
time.sleep(2)
if __name__ == '__main__':
ZdyIpGetter().main()
说一下这里面的关键部分:
-
1.如何有效管理代理数据池?
-
平台获取的IP具有短暂的有效期,因此建议采用类似字典的数据结构来存储代理IP及其失效时间。
-
为了提高系统的容错能力,建议将从平台获取的IP的生命周期统一设置为
settings.PROXY_IP_TTL,而代理的有效时间通常设置为该值或更长(例如,我将默认设置为60秒)。 -
为了提升处理效率,建议使用Redis的有序集合(类似于Python的字典)来存储代理信息,其中每个元素的分数代表其过期时间。具体来说,代理IP将作为zset中的元素,其过期时间由元素的分数决定。例如,
save_to_redis(self)函数的实现细节可参考相关代码。 -
2.如何验证提取到的ip是否可用?
-
提取到的ip地址可能不可用,因此我进行了验证操作,仅将有效的ip地址加入到代理队列中
-
通过调用ip138接口来验证代理的连接状态
-
校验之后,如果可用的ip数量极少或全部无法通过验证,我认为此次测试的手段出现了问题,因此认为此次测试的ip均为正常的,将其加入到代理队列中
组件2-检验并清理过期ip
由于我为每个加入代理池的ip配置了过期时间,该操作实际上并非旨在检查ip的有效性,而是检查其过期时间。
我们需要清除掉过期时间<当前时间的ip,而zset可以快速实现此操作。
# -*- coding: utf-8 -*-
# @File : delele_ip.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:15
# @Desc : 过期ip清理器
import utils
import settings
import time
class ExpireIpCleaner:
def __init__(self):
self.logger = utils.get_logger(getattr(self.__class__, '__name__'))
self.server = utils.get_redis_client()
def clean(self):
"""
清理代理池中的过期ip
:return:
"""
self.logger.info('开始清理过期ip')
# 计算清理前代理池的大小
total_before = int(self.server.zcard(settings.IP_POOL_KEY))
# 清理
self.server.zremrangebyscore(settings.IP_POOL_KEY, 0, int(time.time()))
# 计算清理后代理池的大小
total_after = int(self.server.zcard(settings.IP_POOL_KEY))
self.logger.info('完毕!清理前可用ip {},清理后可用ip {}'.format(total_before, total_after))
def main(self):
"""
周期性的清理过期ip
:return:
"""
while True:
self.clean()
self.logger.info('*' * 40)
time.sleep(settings.CLEAN_INTERVAL)
if __name__ == '__main__':
ExpireIpCleaner().main()
定期进行检测和清理,很简单,没有什么需要说的。
组件3-获取随机ip的web接口
这个web服务支持两大核心功能:
- 1.获取一个随机的可用代理ip
- 2.查看当前代理池中可用的代理ip的数量
# -*- coding: utf-8 -*-
# @File : web_api.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:22
# @Desc : 提供http接口的web程序
import utils
import settings
import flask
import random
import time
redis_client = utils.get_redis_client()
ip_pool_key = settings.IP_POOL_KEY
app = flask.Flask(__name__)
@app.route('/random/')
def random_ip():
"""
获取一个随机ip
:return:
"""
# 获取redis中仍可用的全部ip
proxy_ips = redis_client.zrangebyscore(ip_pool_key, int(time.time()),
int(time.time()) + settings.PROXY_IP_TTL * 10)
if proxy_ips:
ip = random.choice(proxy_ips)
# 如果ip需要密码访问,则添加
if settings.USE_PASSWORD:
ip = '{}:{}@{}'.format(settings.USERNAME, settings.PASSWORD, ip.decode('utf8'))
return ip
else:
return ''
@app.route('/total/')
def total_ip():
"""
统计池中可用代理的数量
:return:
"""
total = redis_client.zcard(ip_pool_key)
if total:
return str(total)
else:
return '0'
def main():
"""
程序运行入口
:return:
"""
app.run('0.0.0.0', port=settings.API_WEB_PORT)
if __name__ == '__main__':
app.run('0.0.0.0', port=settings.API_WEB_PORT)
都很简单,就不细说了。
组件4-squid的维持、更新脚本
处理HTTP接口之外,我们还可以通过squid实现代理转发。这样,在爬虫程序中,就不必频繁更换代理IP地址,直接填入squid的地址,后者会自动转发给其他代理IP地址。
这个脚本提供如下功能:
首先,从代理池中获取所有可用的代理IP地址,并将这些可连接的代理列表配置到squid服务器的配置文件中。随后,通过命令重新加载squid的配置文件,确保其始终采用最新可用的代理列表。当squid服务出现异常时,首先使用命令终止所有squid进程,然后重新启动squid服务,以恢复其正常运行状态。
在当前代码中,采用了名为squid.conf的文件,该文件托管于GitHub,主要包含squid的相关配置信息。若希望对squid进行深入定制,必须自行调整该文件内容。
# -*- coding: utf-8 -*-
# @File : squid_keeper.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:27
# @Desc : 维持squid3使用可用ip的脚本
import utils
import settings
import time
import os
import subprocess
class SquidKeeper:
def __init__(self):
self.logger = utils.get_logger(getattr(self.__class__, '__name__'))
self.server = utils.get_redis_client()
self.ip_pool_key = settings.IP_POOL_KEY
# 区别对待使用密码和不使用密码的配置模板
if settings.USE_PASSWORD:
self.peer_conf = "cache_peer %s parent %s 0 no-query proxy-only login={}:{} never_direct allow all round-robin weight=1 connect-fail-limit=2 allow-miss max-conn=5\n".format(
settings.USERNAME, settings.PASSWORD)
else:
self.peer_conf = "cache_peer %s parent %s 0 no-query proxy-only never_direct allow all round-robin weight=1 connect-fail-limit=2 allow-miss max-conn=5\n"
def read_new_ip(self):
"""
从redis中读取全部有效ip
:return:
"""
self.logger.info('读取代理池中可用ip')
proxy_ips = self.server.zrangebyscore(settings.IP_POOL_KEY, int(time.time()),
int(time.time()) + settings.PROXY_IP_TTL * 10)
return proxy_ips
def update_conf(self, proxy_list):
"""
根据读取到的代理ip,和现有配置文件模板,
生成新的squid配置文件并重新加载,让squid使用最新的ip。
:param proxy_list:
:return:
"""
self.logger.info('准备加载到squid中')
with open('squid.conf', 'r') as f:
squid_conf = f.readlines()
squid_conf.append('\n# Cache peer config\n')
for proxy in proxy_list:
ip, port = proxy.decode('utf8').split(':')
squid_conf.append(self.peer_conf % (ip, port))
with open('/etc/squid/squid.conf', 'w') as f:
f.writelines(squid_conf)
failed = os.system('squid -k reconfigure')
# 这是一个容错措施
# 当重新加载配置文件失败时,会杀死全部相关进行并重试
if failed:
self.logger.info('squid进程出现问题,查找当前启动的squid相关进程...')
p = subprocess.Popen("ps -ef | grep squid | grep -v grep | awk '{print $2}'", shell=True,
stdout=subprocess.PIPE, universal_newlines=True)
p.wait()
result_lines = [int(x.strip()) for x in p.stdout.readlines()]
self.logger.info('找到如下进程:{}'.format(result_lines))
if len(result_lines):
for proc_id in result_lines:
self.logger.info('开始杀死进程 {}...'.format(proc_id))
os.system('kill -s 9 {}'.format(proc_id))
self.logger.info('全部squid已被杀死,开启新squid进程...')
os.system('service squid restart')
time.sleep(3)
self.logger.info('重新加载ip...')
os.system('squid -k reconfigure')
self.logger.info('当前可用IP数量 {}'.format(len(proxy_list)))
def main(self):
"""
周期性地更新squid的配置文件,
使其使用最新的代理ip
:return:
"""
while True:
proxy_list = self.read_new_ip()
self.update_conf(proxy_list)
self.logger.info('*' * 40)
time.sleep(settings.SQUID_KEEPER_INTERVAL)
if __name__ == '__main__':
SquidKeeper().main()
组件5-调度器
调度器是程序的入口,也是对以上各个组件的控制和整合。
它的主要功能是:
- 1.通过子进程分别启动各个组件
- 2.当某个组件发生异常退出时,重新启动该组件
- 3.当接收到终止信号时,先关闭所有存活的组件进程,然后退出
# -*- coding: utf-8 -*-
# @File : scheduler.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:41
# @Desc : 调度中心,所有组件在这里被统一启动和调度
import utils
import settings
from get_ip import ZdyIpGetter
from delele_ip import ExpireIpCleaner
from web_api import app
from squid_keeper import SquidKeeper
from multiprocessing import Process
import time
class Scheduler:
logger = utils.get_logger('Scheduler')
@staticmethod
def fetch_ip():
"""
获取新ip的进程
:return:
"""
while True:
try:
ZdyIpGetter().main()
except Exception as e:
print(e.args)
@staticmethod
def clean_ip():
"""
定期清理过期ip的进程
:return:
"""
while True:
try:
ExpireIpCleaner().main()
except Exception as e:
print(e.args)
@staticmethod
def squid_keep():
"""
维持squid使用最新ip的进程
:return:
"""
while True:
try:
SquidKeeper().main()
except Exception as e:
print(e.args)
@staticmethod
def api():
"""
提供web接口的进程
:return:
"""
app.run('0.0.0.0', settings.API_WEB_PORT)
def run(self):
process_list = []
try:
# 只启动打开了开关的组件
if settings.IP_GETTER_OPENED:
# 创建进程对象
fetch_ip_process = Process(target=Scheduler.fetch_ip)
# 并将组件进程加入到列表中,方便在手动退出的时候杀死
process_list.append(fetch_ip_process)
# 开启进程
fetch_ip_process.start()
if settings.EXPIRE_IP_CLEANER_OPENED:
clean_ip_process = Process(target=Scheduler.clean_ip)
process_list.append(clean_ip_process)
clean_ip_process.start()
if settings.SQUID_KEEPER_OPENED:
squid_keep_process = Process(target=Scheduler.squid_keep)
process_list.append(squid_keep_process)
squid_keep_process.start()
if settings.WEB_API_OPENED:
api_process = Process(target=Scheduler.api)
process_list.append(api_process)
api_process.start()
# 一直执行,直到收到终止信号
while True:
time.sleep(1)
except KeyboardInterrupt:
# 收到终止信号时,关闭所有进程后再退出
self.logger.info('收到终止信号,正在关闭所有进程...')
for process in process_list:
if process.is_alive():
process.terminate()
self.logger.info('关闭完成!结束程序!')
if __name__ == '__main__':
Scheduler().run()
公用方法和配置
将各组件公用的方法和配置抽取出来,做了集中。
# -*- coding: utf-8 -*-
# @File : utils.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:07
# @Desc :
from redis import StrictRedis, ConnectionPool
import settings
import logging
def get_redis_client():
"""
获取一个redis连接
:return:
"""
server_url = settings.REDIS_SERVER_URL
return StrictRedis(connection_pool=ConnectionPool.from_url(server_url))
def get_logger(name=__name__):
"""
获取一个logger,用以格式化输出信息
:param name:
:return:
"""
logger = logging.getLogger(name)
logger.handlers.clear()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s: - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 使用StreamHandler输出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
涉及到的所有配置,可以根据情况进行修改:
# -*- coding: utf-8 -*-
# @File : settings.py
# @Author: AaronJny
# @Date : 18-12-14 上午11:13
# @Desc :
# 代理池redis键名
IP_POOL_KEY = 'open_proxy_pool'
# redis连接,根据实际情况进行配置
REDIS_SERVER_URL = 'redis://:your_password@your_host:port/db_name'
# api对外端口
API_WEB_PORT = 9102
# 代理是否需要通过密码访问,当此项为False时可无视USERNAME和PASSWORD的配置
USE_PASSWORD = True
# 用户名
# 注意:用户名密码是指代理服务方提供给你,用以验证访问授权的凭证。
# 无密码限制时可无视此项,并将USE_PASSWORD改为False
USERNAME = 'your_username'
# 密码
PASSWORD = 'your_password'
# ***********功能组件开关************
# 打开web api功能,不使用web api的话可以关闭
WEB_API_OPENED = True
# 打开squid代理转发服务的维持脚本,不使用squid的话可以关闭
SQUID_KEEPER_OPENED = True
# 打开清理过期ip的脚本,如果池内的代理ip永远不会失效的话可以关闭
EXPIRE_IP_CLEANER_OPENED = True
# 打开定时获取ip并检查的脚本,如果不需要获取新ip的话可以关闭
IP_GETTER_OPENED = True
# ***********************************
# 清理代理ip的频率,如下配置代表每两次之间间隔6秒
CLEAN_INTERVAL = 6
# 获取代理ip的频率,根据api的请求频率限制进行设置
# 比如`站大爷`的频率限制是10秒一次,我就设置成了12秒
FETCH_INTERVAL = 12
# squid从redis中加载新ip的频率
SQUID_KEEPER_INTERVAL = 12
# 代理ip的生命周期,即一个新ip在多久后将被删除,单位:秒
PROXY_IP_TTL = 60
运行
到此为止,编码已经完成。在终端中切换至项目根目录后,只需运行python3 scheduler.py即可完成任务。若需后台运行,建议使用screen。
给出一个运行的截图(有机器在调用接口,我把ip隐藏了):

结尾
感谢支持,有问题欢迎拍砖~
