代理池的维护

我们在 9.1 节了解了给各个请求库设置代理的方法,如何实时高效地获取大量可用代理变成了新的问题。

首先,互联网上有大量公开的免费代理,当然我们也可以购买付费代理但无论是免费代理还是付费代理,都不能保证是可用的,因为自己选用的 IP,可能其他人也在用,爬取的还是同样的目标网站,从而被封禁,或者代理服务器突然发生故障、网络繁忙。一旦选用的是一个不可用的代理,势必就会影响爬虫的工作效率。所以要提前做筛选,删除掉不可用的代理,只保留可用代理。

那么怎么实现呢?这就需要借助一个叫代理池的东西了。本节就来介绍一下如何搭建一个高效易用的代理池。

准备工作

存储代理池需要借助于 Redis 数据库,因此需要额外安装 Redis 数据库。整体来讲,本节需要的环境如下。

  • 安装并成功运行和连接一个 Redis 数据库,它运行在本地或者远端服务器都可以,只要能正常连接就行,安装方式可以参考 https://setup.scrape.center/redis

  • 安装好一些必要的库,包括 aiohttp、requests、redis-py、pyquery、Flask、loguru 等,安装命令如下:

    pip3 install aiohttp requests redis pyquery flask loguru

代理池的目标

我们需要实现下面几个目标来构建一个易用高效的代理池。

代理池分为 4 个基本模块:存储模块、获取模块、检测模块和接口模块。各模块的功能如下。

  • 存储模块:负责存储爬取下来的代理。首先要保证代理不重复,标识代理的可用情况,其次要动态实时地处理每个代理,一种比较高效和方便的存储方式就是 Redis 的 SortedSet,即有序集合。

  • 获取模块:负责定时在各大代理网站爬取代理。代理既可以是免费公开的,也可以是付费的形式都是 IP 加端口。此模块尽量从不同来源爬取,并且尽量爬取高匿代理,爬取成功后将可用代理存储到存储模块中。

  • 检测模块:负责定时检测存储模块中的代理是否可用。这里需要设置一个检测链接,最好是设置为要爬取的那个网站,这样更具有针对性。对于一个通用型的代理,可以设置为百度等链接。另外,需要标识每一个代理的状态,例如设置分数标识,100分代表可用,分数越少代表越不可用。经检测,如果代理可用,可以将分数标识立即设置为满分100,也可以在原分数基础上加1:如果代理不可用,就将分数标识减1,当分数减到一定值后,直接从存储模块中删除此代理。这样就可以标识代理的可用情况,在选用的时候也会更有针对性。

  • 接口模块:用API提供对外服务的接口。其实我们可以直接连接数据库来获取对应的数据,但这样需要知道数据库的连接信息,并且要配置连接。比较安全和方便的方式是提供一个WebAPI 接口,访问这个接口即可拿到可用代理。另外,由于可用代理可能有多个,所以可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都有机会被获取,实现负载均衡。

以上内容是设计代理池的一些基本思路。接下来,我们设计整体的架构,然后用代码实现代理池。

代理池的整体架构

结合上文的描述,代理池的整体架构如图 9-1 所示。

图9-1 代理池的整体架构

结合这张图,再简述一下 4 个模块的功能。

  • 存储模块使用 Redis 的有序集合,负责代理的去重和状态标识,同时它是中心模块和基础模块,用于将其他模块串联起来。

  • 获取模块定时从代理网站爬取代理,将爬取的代理传递给存储模块,并保存到数据库。

  • 检测模块定时通过存储模块获取所有代理,并对代理进行检测,根据不同的检测结果对代理设置不同的标识。

  • 接口模块通过 Web API 提供服务接口,接口通过连接数据库并通过Web形式返回可用的代理。

代理池的实现

接下来我们分别用代码实现代理池的 4 个模块。

完整的代码,代码量较大,因此本节我们不会详细编写,大家了解源码即可,源码地址为 https://github.com/Python3WebSpider/ProxyPool

存储模块

存储模块使用 Redis 的有序集合,集合中的每个元素都不重复,对于代理池,集合中的元素就是代理,是 IP 地址和端口号的组合,如 60.207.237.111:8888。另外,有序集合中的每个元素都有一个分数字段,分数可以重复,既可以是浮点数,也可以是整数。集合会根据每个元素的分数对元素进行排序,分数值小的元素排在前面,大的排在后面,这样就实现了有序。

具体到代理池,分数可以作为判断一个代理是否可用的标志:100 为最高分,代表最可用:0为最低分,代表最不可用。如果要获取可用代理,可以从代理池中随机获取分数最高的代理。注意这里是随机,能够保证每个可用代理都有机会被调用。

分数的设置细节是新获取的代理的分数为10,如果经测试是可用的,立即将分数置为100。检测器会定时循环检测每个代理的可用情况,一日检测到可用的代理,就立即将分数置为100:如果检测到某个代理不可用,就将其分数减1,分数减至0后,删除代理。

这只是一种解决方案,当然还可能有更合理的方案。之所以按此方案设置,有如下几个原因。

  • 当检测到代理可用时,立即将分数置为 100,这样能够保证所有可用代理都有更大的机会被获取。你可能会问,为什么不将分数加1而是直接设为最高值100呢?设想一下,有的代理是从各大免费公开代理网站获取的,一个代理通常并没有那么稳定,可能平均每5次请求中有2次成功,3次失败,如果按照这种方式设置分数,那么这个代理几乎不可能获得高分数,意味着即便它有时是可用的,但是因为筛选的依据是最高分,也几乎不可能被调用。如果想追求代理调用的稳定性,就要使用上述方法,这种方法可确保分数最高的代理一定是最稳定可用的。所以,这里我们采取“可用即设置100”的方法,确保代理只要可用就有机会被调用。

  • 当检测到代理不可用时,把分数减 1,分数减至 0 后,删除代理。按此规则,要删除一个有效代理,需要连续不断失败 100 次。也就是说,当使用一个可用代理尝试了 100 次都失败后,才将此代理删除,一旦有一次是成功的,就重新置回 100。尝试机会越多,这个代理被拯救回来的机会就越多,这样不会使一个曾经的可用代理轻易被丢弃,因为代理不可用的原因很可能是网络繁忙或者其他人用此代理请求得太过频繁。

  • 将新获取的代理的分数设置为 10,如果它不可用,就把分数减1,减到0的话就删除:如果可用,则把分数置为 100。由于很多代理是从免费网站获取的,所以新获取的代理无效的概率非常大,可用的代理可能不足 10%。这里将分数设置为 10,到弃用最多检测 10 次,没有可用代理的100次那么多,可以适当减小开销。

上述设置思路不一定是最优的,但据个人实测,实用性还是比较强的。这里首先给出存储模块的源代码,见 https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/storages ,建议直接对照源代码阅读。

代码中,定义了一个类 RedisClient 来操作 Redis 的有序集合,其中定义了一些方法来设置分数、获取代理等。核心实现代码如下:

import redis
from proxypool.exceptions import PoolEmptyException
from proxypool.schemas.proxy import Proxy
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, \
    PROXY_SCORE_MIN, PROXY_SCORE_INIT
from random import choice
from typing import List
from loguru import logger
from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies

REDIS_CLIENT_VERSION = redis.__version__
IS_REDIS_VERSION_2 = REDIS_CLIENT_VERSION.startswith('2.')

class RedisClient(object):

    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, **kwargs):
        self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True, **kwargs)

    def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:
        if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
            logger.info(f'invalid proxy {proxy}, throw it')
            return
        if not self.exists(proxy):
            if IS_REDIS_VERSION_2:
                return self.db.zadd(REDIS_KEY, score, proxy.string())
            return self.db.zadd(REDIS_KEY, {proxy.string(): score})

    def random(self) -> Proxy:
        # 尝试获取最大值的代理
        proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX)
        if len(proxies):
            return convert_proxy_or_proxies(choice(proxies))
        # 否则根据分数排序
        proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX)
        if len(proxies):
            return convert_proxy_or_proxies(choice(proxies))
        # 否则报错
        raise PoolEmptyException

    def decrease(self, proxy: Proxy) -> int:
        score = self.db.zscore(REDIS_KEY, proxy.string())
        # 当前分数比 PROXY_SCORE_MIN 大
        if score and score > PROXY_SCORE_MIN:
            logger.info(f'{proxy.string()} current score {score}, decrease 1')
            if IS_REDIS_VERSION_2:
                return self.db.zincrby(REDIS_KEY, proxy.string(), -1)
            return self.db.zincrby(REDIS_KEY, -1, proxy.string())
        # 否则删除代理
        else:
            logger.info(f'{proxy.string()} current score {score}, remove')
            return self.db.zrem(REDIS_KEY, proxy.string())

    def exists(self, proxy: Proxy) -> bool:
        return not self.db.zscore(REDIS_KEY, proxy.string()) is None

    def max(self, proxy: Proxy) -> int:
        logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')
        if IS_REDIS_VERSION_2:
            return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())
        return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})

    def count(self) -> int:
        return self.db.zcard(REDIS_KEY)

    def all(self) -> List[Proxy]:
        return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))

    def batch(self, start, end) -> List[Proxy]:
        return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))

if __name__ == '__main__':
    conn = RedisClient()
    result = conn.random()
    print(result)

这里首先定义了一些常量,如 PROXY_SCORE_MAX、PROXY_SCORE_MIN、PROXY_SCORE_INIT 分别代表最大分数、最小分数、初始分数;REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 代表 Redis 的连接信息,即 IP 地址、端口和密码;REDIS_KEY 是有序集合的键名,我们可以通过它获取存储代理使用的有序集合。

然后在 RedisClient 这个类中定义了一些用来对集合中的元素进行处理的方法,这些方法如下。

  • __init__ 方法用于初始化,其参数是 Redis 的连接信息,默认的连接信息已经定义为常量。我们 在 __init__ 方法中初始化了 StrictRedis 类,建立了 Redis 连接。

  • add 方法用于往有序集合中添加代理并设置分数,分数默认取 PROXY_SCORE_INIT 的值,也就是 10,返回值是添加的结果。

  • random 方法用于随机获取代理。首先获取所有分数为 100 的代理,然后从中随机选择一个返回。如果不存在 100 分的代理,则按照排名,获取排在前 100 位的代理,然后从中随机选择一个返回,否则抛出异常。

  • decrease 方法用于在代理检测无效时,将其分数减 1。

  • exists 方法用于判断代理是否存在于集合中。

  • max 方法用于将代理的分数设置为 PROXY_SCORE_MAX,即 100,在代理检测有效时用到。

  • count 方法用于返回当前集合的元素个数。

  • all 方法用于返回所有代理组成的列表,供检测使用。

定义好这些方法后,就可以在后续的模块中调用 RedisClient 类来连接和操作数据库。如果想要获取随机可用的代理,只需要调用 random 方法即可,得到的就是随机且可用的代理。

获取模块

获取模块主要负责从各大网站爬取代理并将代理保存到存储模块,代码实现见 https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/crawlers

这个模块的代码逻辑相对简单,例如可以定义一些爬取代理的方法,示例如下:

from proxypool.crawlers.base import BaseCrawler
from proxypool.schemas.proxy import Proxy
import re

MAX_PAGE = 5
BASE_URL = 'http://www.ip3366.net/free/?stype=1&page={page}'

class IP3366Crawler(BaseCrawler):
    """
    ip3366 爬虫, http://www.ip3366.net/
    """
    urls = [BASE_URL.format(page=i) for i in range(1, 8)]

    def parse(self, html):
        ip_address = re.compile('<tr\\s*><td.*?>(.*?)</td>\\s*<td>(.*?)</td>')
        # \s* 匹配空格,起到换行作用
        re_ip_address = ip_address.findall(html)
        for address, port in re_ip_address:
            proxy = Proxy(host=address.strip(), port=int(port.strip()))
            yield proxy

这里定义了一个代理类 IP3366Crawler,用来爬取 IP3366 网站的公开代理,通过 parse 方法解析页面的源代码,然后构造一个个 Proxy 对象并返回。

我们在其父类 BaseCrawler 里定义了通用的页面爬取方法 fetch,代码实现如下:

from retrying import retry
import requests
from loguru import logger

class BaseCrawler(object):
    urls = []

    @retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)
    def fetch(self, url, **kwargs):
        try:
            response = requests.get(url, **kwargs)
            if response.status_code == 200:
                return response.text
        except requests.ConnectionError:
            return

    @logger.catch
    def crawl(self):
        for url in self.urls:
            logger.info(f'fetching {url}')
            html = self.fetch(url)
            for proxy in self.parse(html):
                logger.info(f'fetched proxy {proxy.string()} from {url}')
                yield proxy

如果要扩展一个代理类 Crawler,只需要继承 BaseCrawler 并实现 parse 方法即可,扩展性较好。fetch 方法可以读取 Crawler 里定义的全局变量 urls 并对其中的页面进行爬取,Crawler 再调用 parse 方法解析页面即可。

这样,就可以让一个个 Crawler 从各个不同的代理网站爬取代理,最后统一将所有 Crawler 汇总起来,遍历调用即可。如何汇总呢?这里是通过检测代码,只要检测到 BaseCrawler 的子类,就将其算作一个有效的 Crawler,可以直接遍历 Python 文件包。代码实现如下:

import pkgutil
from .base import BaseCrawler
import inspect

classes = []
for loader, name, is_pkg in pkgutil.walk_packages(__path__):
    module = loader.find_module(name).load_module(name)
    for name, value in inspect.getmembers(module):
        globals()[name] = value
        if inspect.isclass(value) and issubclass(value, BaseCrawler) and value is not BaseCrawler:
            classes.append(value)

__all__ = classes

这里我们调用了 walk_packages 方法,遍历了整个 crawlers 模块下的类,并判断每个类是否为 BaseCrawler 的子类,如果是就将其添加到 classes 中并返回。最后只要遍历 classes 里里面的类并依次实例化、调用各自的 crawl 方法即可完成代理的爬取和提取,代码实现见 https://github.com/Python3WebSpider/ProxyPool/blob/master/proxypool/processors/getter.py

检测模块

我们已经成功获取了各个网站的代理,现在需要一个检测模块对所有代理进行多轮检测。如果检测代理可用,就把其分数置为 100,检测不可用,就把分数减 1,这样可以实时改变每个代理的可用情况。要获取有效代理时,从分数高的代理中选择即可。

由于代理非常多,为了提高检测效率,这里使用异步请求库 aiohttp 来检测。

requests 是一个同步请求库,在使用 requests 发出一个请求后,程序需要等待网页加载完才能继续执行。也就是网页加载的过程会导致我们的程序阻塞,如果服务器响应得非常慢,例如十几秒才加载出来,那我们就需要先等待十几秒的时间,这期间程序不会继续往下执行,但完全可以去做其他的事情,例如调度其他请求或者解析网贞等。

如果服务器响应得比较快,那么使用 requestsaiohttp 的效果差距就没那么大。可检测一个代理一般需要十多秒甚至几十秒的时间,这时候使用 aiohttp 库的优势就大大体现出来了,效率可能会提高几十倍不止。

检测模块的实现示例如下:

import asyncio
import aiohttp
from loguru import logger
from proxypool.schemas.proxy import Proxy
from proxypool.storages.redis import RedisClient
from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS
from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
from asyncio import TimeoutError

EXCEPTIONS = (
    ClientProxyConnectionError,
    ConnectionRefusedError,
    TimeoutError,
    ServerDisconnectedError,
    ClientOSError,
    ClientHttpProxyError
)

class Tester(object):

    def __init__(self):
        self.redis = RedisClient()
        self.loop = asyncio.get_event_loop()

    async def test(self, proxy: Proxy):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
            try:
                logger.debug(f'testing {proxy.string()}')
                async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
                                       allow_redirects=False) as response:
                    if response.status in TEST_VALID_STATUS:
                        self.redis.max(proxy)
                        logger.debug(f'proxy {proxy.string()} is valid, set max score')
                    else:
                        self.redis.decrease(proxy)
                        logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
            except EXCEPTIONS:
                self.redis.decrease(proxy)
                logger.debug(f'proxy {proxy.string()} is invalid, decrease score')

    @logger.catch
    def run(self):
        logger.info('stating tester...')
        count = self.redis.count()
        logger.debug(f'{count} proxies to test')
        for i in range(0, count, TEST_BATCH):
            start, end = i, min(i + TEST_BATCH, count)
            logger.debug(f'testing proxies from {start} to {end} indices')
            proxies = self.redis.batch(start, end)
            tasks = [self.test(proxy) for proxy in proxies]
            self.loop.run_until_complete(asyncio.wait(tasks))

if __name__ == '__main__':
    tester = Tester()
    tester.run()

这里定义了一个类 Tester。首先在其构造方法中建立了一个 RedisClient 对象,供类中的其他方法使用。然后定义了一个 test 方法,用来检测单个代理的可用情况,参数就是被检测的代理。注意,test 方法前面加了 async 关键词,代表这个方法是异步的。test 方法的内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。

测试链接在这里被定义为常量 TEST_URL,建议将其值设置为目标网站的地址,因为在爬取过程中,可能代理本身是可用的,而该代理的 IP 已经被目标网站封禁了。例如,某些代理可以正常访问百度等页面,但知乎已经把它们封住了,所以如果对知乎的某个页面有爬取需求,可以直接将 TEST_URL 的值设置为知乎这个页面的链接,当请求失败,代理被封后,代理的分数自然会减下来,等到失效时就不会被获取了。

如果实现的是一个通用的代理池,则不需要专门设置 TEST_URL 的取值,既可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。

我们还定义了 TEST_VALID_STATUS 变量,这个变量的类型是列表,由正常的状态码构成,例如 [200]。当然,某些目标网站还可能会出现其他状态码,可以自行配置。程序在获取响应信息后需要判断其状态,如果状态码在 TEST_VALID_STATUS 列表中,就代表代理可用,需要调用 RedisClient 对象的 max 方法将该代理的分数设置为 100,否则调用 decrease 方法将代理分数减 1,如果出现异常,也同样将代理分数减 1。

另外,我们设置了批量测试的最大值 TEST_BATCH,意思是一批最多测试 TEST_BATCH 个,这样可以避免在代理池过大时,一次性测试全部代理导致内存开销过大的问题。当然,也可以用信号量机制实现并发控制。

test 方法之后,定义了 run 方法用于获取所有的代理列表,然后使用 aiohttp 分配任务,启动运行。在不断运行的过程中,代理池中无效代理的分数会一直减 1,直至代理被册删除,有效的代理则一直保持 100 分供随时取用。

至此,测试模块的逻辑就完成了。

接口模块

通过前面 3 个模块,我们已经可以实现代理的获取、检测和更新,Redis 数据库会以有序集合的形式存储各个代理及代理对应的分数,分数 100 代表可用,分数越小代表越不可用。 但是我们怎样方便地获取可用代理呢?可以用 RedisClient 类直接连接 Redis,然后调用 random 方法。这样做没问题,效率很高,但也会有几个弊端。

  • 使用这个代理池需要知道 Redis 连接的用户名和密码信息,如果其他人使用,会很不安全。

  • 如果代理池需要部署在远程服务器上运行,而远程服务器的 Redis 只允许本地连接,那么就不能通过远程直连 Redis 来获取代理。

  • 如果爬虫所在的主机没有连接 Redis 模块,或者爬虫不是由 Python 语言编写的,我们就无法使用 RedisClient 来获取代理。

  • 如果 RedisClient 或者数据库结构有更新,那么爬虫端必须同步这些更新,这样非常麻烦。综上考虑,为了使代理池可以作为一个独立服务运行,我们最好增加一个接口模块,并以 Web API 的形式暴露可用代理。这样一来,获取代理只需要请求接口即可,以上的几个弊端也可以避免。

我们使用一个比较轻量级的库 Flask 来实现这个接口模块,实现示例如下:

from flask import Flask, g
from proxypool.storages.redis import RedisClient
from proxypool.setting import API_HOST, API_PORT, API_THREADED

__all__ = ['app']

app = Flask(__name__)

def get_conn():
    if not hasattr(g, 'redis'):
        g.redis = RedisClient()
    return g.redis

@app.route('/')
def index():
    return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
    conn = get_conn()
    return conn.random().string()

@app.route('/count')
def get_count():
    conn = get_conn()
    return str(conn.count())

if __name__ == '__main__':
    app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

这里我们声明了一个 Flask 对象,以及定义了 3 个接口,分别用于获取首页、随机代理页和数量页。运行代码后,Flask 会启动一个 Web 服务,我们只需要访问对应的接口即可获取可用代理。

调度模块

调度模块用于调用上面定义的 4 个模块,通过多进程的方式把它们运行起来,示例如下:

import time
import multiprocessing
from proxypool.processors.server import app
from proxypool.processors.getter import Getter
from proxypool.processors.tester import Tester
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
    ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger

if IS_WINDOWS:
    multiprocessing.freeze_support()

tester_process, getter_process, server_process = None, None, None

class Scheduler():
    def run_tester(self, cycle=CYCLE_TESTER):
        if not ENABLE_TESTER:
            logger.info('tester not enabled, exit')
            return

    tester = Tester()
    loop = 0
    while True:
        logger.debug(f'tester loop {loop} start...')
        tester.run()
        loop += 1
        time.sleep(cycle)

    def run_getter(self, cycle=CYCLE_GETTER):
        if not ENABLE_GETTER:
            logger.info('getter not enabled, exit')
            return
        getter = Getter()
        loop = 0
        while True:
            logger.debug(f'getter loop {loop} start...')
            getter.run()
            loop += 1
            time.sleep(cycle)

    def run_server(self):
        if not ENABLE_SERVER:
            logger.info('server not enabled, exit')
            return
        app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

    def run(self):
        global tester_process, getter_process, server_process
        try:
            logger.info('starting proxypool...')
            if ENABLE_TESTER:
                tester_process = multiprocessing.Process(target=self.run_tester)
                logger.info(f'starting tester, pid {tester_process.pid}...')
                tester_process.start()

            if ENABLE_GETTER:
                getter_process = multiprocessing.Process(target=self.run_getter)
                logger.info(f'starting getter, pid {getter_process.pid}...')
                getter_process.start()

            if ENABLE_SERVER:
                server_process = multiprocessing.Process(target=self.run_server)
                logger.info(f'starting server, pid {server_process.pid}...')
                server_process.start()

            tester_process.join()
            getter_process.join()
            server_process.join()
        except KeyboardInterrupt:
            logger.info('received keyboard interrupt signal')
            tester_process.terminate()
            getter_process.terminate()
            server_process.terminate()
        finally:
            tester_process.join()
            getter_process.join()
            server_process.join()
            logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
            logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
            logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
            logger.info('proxy terminated')

if __name__ == '__main__':
    scheduler = Scheduler()
    scheduler.run()

这里首先定义了 3 个常量 ENABLE_TESTER、ENABLE_GETTER 和 ENABLE_SERVER,都是布尔类型,分别表示测试模块、获取模块和接口模块的开关,如果都取 True,代表 3 个模块都开启了。

这个模块的启动入口是 run 方法,这个方法会判断模块的开关是否开启,如果开启,就新建一个 Process 进程设置好启动目标,然后调用 start 方法运行该进程,对 3 个模块都如此操作,之后 3 个进程并行执行,互不干扰。

3 个调度方法的结构也非常清楚。例如,run_tester 方法用于调度检测模块,方法中首先声明一个 Tester 对象,然后进入死循环,不断地调用其 run 方法,执行完一轮后就休眠一段时间,休眠结束后再重新执行。这里把休眠时间定义为一个常量,如 20 秒,即每隔 20 秒进行一次代理检测。

最后,只需要调用 Scheduler 类的 run 方法即可启动整个代理池。

以上内容是整个代理池的架构和各个模块对应的实现逻辑。

运行

现在,我们将代码整合在一起,并运行,运行之后的输出结果如下:

2020-04-13 02:52:06.510 | INFO | proxypool.storages.redis:decrease:73 - 60.186.146.193:9000 current
score 10.0, decrease 1
2020-04-13 02:52:06.517 | DEBUG | proxypool.processors.tester:52 - proxy 60.186.146.193:9000
invalid, decrease score
2020-04-13 02:52:06.524 | INFO | proxypool.storages.redis:decrease:73 - 60.186.151.147:9000 current
score 10.0, decrease 1
2020-04-13 02:52:06.532 | DEBUG | proxypool.processors.tester:52 - proxy 60.186.151.147:9000 is
invalid, decrease score
2020-04-13 02:52:07.159 | INFO | proxypool.storages.redis:max:96 - 60.191.11.246:3128 is valid,
set to 100
2020-04-13 02:52:07.167 | DEBUG | proxypool.processors.tester:46 - proxy 60.191.11.246:3128 is
valid, set max score
2020-04-13 02:52:17.271 | INFO | proxypool.storages.redis:decrease:73 - 59.62.7.130:9000 current
score 10.0, decrease 1
2020-04-13 02:52:17.280 | DEBUG | proxypool.processors.tester:52 - proxy 59.62.7.130:9000 is
invalid, decrease score
2020-04-13 02:52:17.288 | INFO | proxypool.storages.redis:decrease:73 - 60.167.103.74:1133 current
score 10.0, decrease 1
2020-04-13 02:52:17.295 | DEBUG | proxypool.processors.tester:52 - proxy 60.167.103.74:1133 is
invalid, decrease score
2020-04-13 02:52:17.302 | INFO | proxypool.storages.redis:decrease:73 - 60.162.71.113:9000 current
score 10.0, decrease 1
2020-04-13 02:52:17.309 | DEBUG | proxypool.processors.tester:52 - proxy 60.162.71.113:9000 is
invalid, decrease score

以上是代理池的控制台输出,可以看到可用代理的分数被设置为 100,不可用代理分数被减 1。

现在打开浏览器,当前配置运行在 5555 端口,所以打开 [可疑链接已删除] 即可看到代理池系统的首页,如图 9-2 所示。

再打开 http://127.0.0.1:5555/random ,即可获取随机的可用代理,非常方便,这里获取一个如图 9-3 所示。

图9-2 代理池系统的首页 图9-3 获取随机的可用代理

获取代理的代码如下:

import requests

PROXY_POOL_URL = 'http://localhost:5555/random'

def get_proxy():
    try:
        response = requests.get(PROXY_POOL_URL)
        if response.status_code == 200:
            return response.text
    except ConnectionError:
        return None

运行这段代码便可以获取一个随机可用代理了,代理是字符串类型的数据。可以按照 9.1 节的方法设置此代理,例如为 requests 设置代理:

import requests

proxy = get_proxy()
proxies = {
    'http': 'http://' + proxy,
    'https': 'https://' + proxy,
}
try:
    response = requests.get('http://www.httpbin.org/get', proxies=proxies)
    print(response.text)
except requests.exceptions.ConnectionError as e:
    print('Error', e.args)

有了代理池,即可从中取出代理使用,有效防止我们的 IP 被封。

总结

本节中我们学习了代理池的设计思路和实现方案,有了这个代理池,我们就可以实时获取一些可用的代理了。相对之前的实战案例,整个代理池的代码量多了很多,逻辑复杂度也比较高,建议好好理解和消化一下。

本节的代码见 https://github.com/Python3WebSpider/ProxyPool ,代码库中还提供了基于 Docker 和 Kubernetes 的运行和部署操作,可以帮助我们更快捷地运行代理池。