代理反爬案例爬取实战
9.2 节、9.3节和9.4节我们了解了代理池的维护和付费代理的相关使用方法,通过这些方法可以获得不少可用的代理,方便我们在爬取数据的时候伪造 IP,绕过一些通过 IP 实现反爬的网站。
本章我们就分析一个实例,看一下如何使用代理池绕过某些网站的反爬机制。
本节目标
我们会以一个 IP 反爬网站为例进行这一次实战演练,该网站限制单个 IP 每5分钟最多访问10次,访问次数超过10,网站便会封锁该IP,并返回403状态码,10分钟后才解除封锁。
所以,要想在短时间内快速有效地爬取这个网站的所有数据,就得使用代理了。我们会先便用9.2 节讲解的代理池获取一些可用代理,再利用这些代理爬取数据,本节会介绍整个爬取流程的实现。
准备工作
首先需要准备并正常运行代理池。还需要安装好一些 Python 库——requests、redis-py、environs pyquery 和 loguru,安装命令如下:
pip3 install requests redis environs pyquery loguru
安装完毕后,就可以往下走了。
爬取分析
本节要爬取的网站是 https://antispider5.scrape.center/ ,首页如图 9-18 所示。
图9-18 爬取的目标网站
页面看上去和之前没什么不同,但这里网站增加了IP反爬机制,限制单个IP每5分钟最多访问10 次,超过10次就封锁IP,并返回403状态码。例如我连续刷新10次网页,页面就变成了下面这样,如图9-19所示。
图9-19 连续刷新 10 次后的页面
但如果此时切换一个网络环境,例如使用全局代理或者由Wi-Fi切换到手机热点,总之让访问目标网站所用的IP地址发生改变,就又可以看到页面正常显示了。也就是说,要想在短时间内爬取这个网站的所有数据,得更换多个IP进行爬取,怎么更换呢?自然就是使用代理了。
由于我们无法预知某个代理是否能完成一次正常的爬取,因此可能请求成功也可能请求失败,失败原因可能是网站封锁了该代理,或者代理本身失效了。为了保证正常爬取,我们需要添加重试机制,以确保请求失败的时候可以再次爬取,直到成功。
那怎么实现失败后的重试呢?至少要把失败的请求记录下来吧,那记录下来后又保存到哪里呢?如果有很多个请求都失败了,又该记录呢?一个简单的解决方案就是使用队列,当请求失败时,把对应的请求加入队列里,等待下次被调度。队列的实现方式有很多,本节我们选用Redis实现,简单高效。
综上所述,本节实现了如下儿个功能:
-
构造 Redis 爬取队列,用队列存取请求;
-
实现异常处理,把失败的请求重新加入队列;
-
解析列表页的数据,将爬取详情页和下一页的请求加入队列;
-
提取详情页的信息。
下面几节我们用代码实现一下这些功能。
构造请求对象
既然要用队列存储请求,那就肯定要实现一个请求的数据结构,这个请求需要包含一些必要信息,例如请求链接、请求头、请求方式和超时时间。另外,对于一个请求,需要实现对应的方法来处理它的响应,所以需要加一个回调函数 callback。如果一个请求的失败次数太多,就不会再重新请求了,所以还需要增加失败次数的记录。用这些内容组成一个完整的请求对象并放入队列等待被调度,从队列获取出这个对象的时候直接执行就好了。
我们可以采用继承 requests 库中的 Request 对象的方式实现这个数据结构。requests 库中已经存在 Request 对象,它将请求作为一个整体对象去执行,得到响应后返回。其实 requests 库里的 get、post 等方法都是通过执行 Request 对象实现的。
先看看 Request 对象的源代码:
class Request(RequestHooksMixin):
def __init__(self,
method=None, url=None, headers=None, files=None, data=None,
data = [] if data is None else data
files = {} if files is None else files
headers = {} if headers is None else headers
params = {} if params is None else params
hooks = {} if hooks is None else hooks
self.hooks = default_hooks()
for (k, v) in list(hooks.items()):
self.register_hook(event=k, hook=v)
self.method = method
self.url = url
self.headers = headers
self.files = files
self.data = data
self.json = json
self.params = params
self.auth = auth
self.cookies = cookies
这是 requests 库中 Request 对象的构造方法。其中已经包含了请求方式、请求链接和请求头这几个属性,但和我们需要相比还差几个。因此需要实现一个特定的数据结构,在原先 Request 对象的基础上加入上文额外所提的几个属性。这里需要继承 Request 对象重新实现一个请求对象,将其定义为 MovieRequest,实现如下:
TIMEOUT = 10
from requests import Request
class MovieRequest(Request):
def __init__(self, _self, url, callback, method='GET', headers=None, need_proxy=False, fail_time=0, timeout=TIMEOUT):
Request.__init__(self, method, url, headers)
self.callback = callback
self.fail_time = fail_time
self.timeout = timeout
这里我们实现了 MovieRequest 类,代码文件保存为 request.py,在构造方法中先调用了 Request 类的构造方法,然后加入了几个额外的参数,分别定义为 callback、fail_time 和 timeout,代表回调函数、失败次数和超时时间。
之后就可以将 MovieReguest 作为一个整体来执行,各个 MovieRequest 对象都是独立的,每个请求都有自已的属性。例如,调用请求的 callback 属性就可以知道应该用什么方法处理这个请求的响应,调用 failtime 就可以知道这个请求失败了多少次,继而判断失败次数是否到达國值,该不该丢弃这个请求。这单我们采用了面向对象的一些思想。
实现请求队列
现在要构造请求队列,实现请求的存取。存取无非是分为两个操作——放和取,所以这里利用 Redis 的 rpush 方法和 lpop 方法即可。
另外还需要注意,存取时不能直接使用 Request 对象。因为 Redis 数据库里存储的是字符串,所以在存 Request 对象之前要先把它序列化,取出来的时候要将其反序列化,这两个过程可以利用 pickle 模块实现:
from pickle import dumps, loads
from request import WeixinRequest
class RedisQueue():
def __init__(self):
self.db = StrictRedis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD)
def add(self, request):
if isinstance(request, MovieRequest):
return self.db.rpush(REDIS_KEY, dumps(request))
return False
def pop(self):
if self.db.llen(REDIS_KEY):
return loads(self.db.lpop(REDIS_KEY))
return False
def empty(self):
return self.db.llen(REDIS_KEY) == 0
这里实现了一个 RedisQueue 类,代码保存为 db.py,先在构造方法里初始化了一个 StrictRedis 对象。随后实现了 add 方法,方法中首先判断了请求对象的类型,如果是 MovieRequest,程序就会使用 pickle 模块的 dumps 方法把它序列化,再调用 rpush 方法把它加入队列。pop 方法则相反,先调用 lpop 方法从队列取出请求,再使用 pickle 模块的 loads 方法将其转为 MovieRequest 对象。另外,empty 方法会返回队列是否为空,通过判断队列长度是否为 0 即可知道。
在调度的时候,我们只需要新建一个 RedisQueue 对象,然后调用 add 方法,传入 WeixinRequest 对象,即可将 WeixinRequest 加入队列,调用 pop 方法,即可取出下一个 MovieRequest 对象,非常简单易用。
修改代理池
现在要找一些可用代理,此处直接使用 9.4 节的代理池即可。根据 9.4 节的操作启动代理池,等待一定时间,可以观察到 RedisHash 表中多了一些100分的可用代理,如图9-20所示。
图9-20 生成的可用代理
代理接口设置为 5555,因此访问 http://127.0.0.1:5555/random 即可获取随机的可用代理,如图9-21 所示。
图9-21 获取的一个可用代理
再定义一个用来获取可用代理的方法:
PROXY_POOL_URL = 'http://127.0.0.1:5555/random'
from loguru import logger
@logger.catch
def get_proxy():
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
logger.debug(f'get proxy {response.text}')
return response.text
这里有个小技巧,我们使用 loguru 日志库里的 catch 方法作为 get_proxy 方法的装饰器,这样可以在请求代理池失败的时候输出具体的报错信息,同时又不会中断程序运行,也避免了编写 try except 语句的麻烦,使得代码看起来更简洁。
第一个请求
一切准备工作都做好了,现在我们就可以构造第一个请求并放到队列里以供调度了。代码如下:
from requests import Session
from db import RedisQueue
from request import MovieRequest
BASE_URL = 'https://antispider5.scrape.center/'
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'
}
class Spider():
session = Session()
queue = RedisQueue()
def start(self):
self.session.headers.update(HEADERS)
start_url = BASE_URL
request = MovieRequest(url=start_url, callback=self.parse_index)
self.queue.add(request)
这里首先定义了2个全局变量,BASE_URL 代表目标网站的 URL, HEADERS 代表请求头。然后定义了 Spider 类,代码文件保存为 spider.py。
在 Spider 类中,先初始化了 Session 对象和 RedisQueue 对象,分别用来执行请求和存储请求。然后定义了 start 方法,该方法第一步全局更新了 headers, 使得所有请求都能应用全局变量 HEADERS; 第二步构造了一个起始 URL, 并将 BASE_URL 赋值给它; 第三步用起始 URL 构造了一个 MovieRequest 对象,回调函数是 Spider 类的 parse_index 方法,也就是说请求成功后就用 parse_index 方法来处理和解析返回结果; 第四步调用了 RedisQueue 对象的 add 方法,用于将请求加入队列,以供调度。
调度请求
把第一个请求加入队列之后,就可以开始调度执行了。首先从队列中取出这个请求,将它的结果解析出来,生成新的请求加入队列,然后拿出新的请求,将结果解析,再生成新的请求加入队列,这样循环执行,直到队列中没有请求,代表爬取结束。
我们在 Spider 类中添加 scheduler 方法,实现如下:
from loguru import logger
VALID_STATUSES = [200]
def schedule(self):
while not self.queue.empty():
request = self.queue.pop()
callback = request.callback
logger.debug(f'executing request {request.url}')
response = self.request(request)
logger.debug(f'response status {response} of {request.url}')
if not response or not response.status_code in VALID_STATUSES:
self.error(request)
continue
results = list(callback(response))
if not results:
self.error(request)
continue
for result in results:
if isinstance(result, MovieRequest):
logger.debug(f'generated new request {result}')
self.queue.add(result)
if isinstance(result, dict):
logger.debug(f'scraped new data {result}')
scheduler 方法的内部是一个 while 循环,该循环的判断条件是队列不为空。当队列不为空时,调用 pop 方法取出一个请求,然后调用 request 方法执行这个请求,request 方法的实现如下:
@logger.catch
def request(self, request):
proxy = get_proxy()
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy
} if proxy else None
return self.session.send(request.prepare(),
timeout=request.timeout,
proxies=proxies)
request 方法中,首先调用 get_proxy 方法获取代理,然后将代理赋值给 proxies 变量以备使用。接着调用 session 变量的 send 方法执行这个请求。这里调用 prepare 方法将请求转化为了 Prepared Request 对象 (这在本书 2.2 节有相关介绍),具体的用法可以参考 https://docs.requests.org/en/master/user/advanced/#prepared-requests,timeout 属性是该请求的超时时间,proxies 属性是刚才声明的。
执行 request 方法之后会得到两种结果:一种是 False,即请求失败、连接错误;另一种是 Response 对象,即请求成功后服务器返回的结果,需要判断其中的状态码,如果状态码合法,就返回结果进行解析,否则将请求重新放入队列。
判断状态码合法,及其返回结果进行解析时会调用 MovieRequest 类的回调函数。例如这里回调函数是 parse_index,其实现如下:
from pyquery import PyQuery as pq
from urllib.parse import urljoin
def parse_index(self, response):
doc = pq(response.text)
# 请求详情页
items = doc('.item .name').items()
for item in items:
detail_url = urljoin(BASE_URL, item.attr('href'))
request = MovieRequest(
url=detail_url, callback=self.parse_detail)
yield request
# 请求下一页
next_href = doc('.next').attr('href')
if next_href:
next_url = urljoin(BASE_URL, next_href)
request = MovieRequest(
url=next_url, callback=self.parse_index
)
yield request
这里定义了一个生成器,它做了两件事:一件事是获取列表页中所有电影对应的详情页链接,另一件事是获取下一页的链接,再构造 MovieRequest 对象,之后 yield 返回。
然后,schedule 方法会对返回的结果进行遍历,利用 isinstance 方法判断返回结果是否为 MovieRequest 对象,如果是,就将其重新加入队列。
至此,第一次循环执行结束。
这时 while 循环会继续执行。如果第一次请求成功,那么这时的队列里会新增爬取第一个列表页中 10 部电影的详情页的请求和爬取下一页的请求,即队列中又多了 11 个新请求。程序会从队列中获取下一个请求,然后重新调用 request 方法获取其响应,再调用对应的回调函数对响应进行解析。如果爬取的是详情页,那么回调方法就不一样了,是 parse_detail 方法。此方法的实现如下:
import re
from pyquery import PyQuery as pq
from urllib.parse import urljoin
def parse_detail(self, response):
doc = pq(response.text)
cover = doc('.img.cover').attr('src')
name = doc('a > h2').text()
categories = [item.text() for item in doc('.categories button span').items()]
published_at = doc('.info:contains(上映)').text()
published_at = re.search('\d{4}-\d{2}-\d{2}', published_at).group(1) \
if published_at and re.search('\d{4}-\d{2}-\d{2}', published_at) else None
drama = doc('.drama p').text()
score = doc('p.score').text()
score = float(score) if score else None
yield {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
这个方法解析了详情页的内容,提取出了电影的名称、类别、上映时间、简介和评分等信息,然后将这些信息组合成一个字典返回。
之后程序会接着调用用后续的请求,然后接着执行第三次循环、第四次循环,这样往复下去。每个请求都有自己的回调函数,列表页解析完毕后,会继续生成后续请求,而详情页解析完毕后,会返回结果,直到爬取完毕。
现在,整个调度就完成了。
最后,整合一个入口方法:
def run(self):
self.start()
self.schedule()
if __name__ == '__main__':
spider = Spider()
spider.run()
run 方法中先调用 start 方法添加了第一个请求,然后调用 schedule 方法开始调度和爬取。
现在,对 IP 反爬网站的爬取就算完成了。
运行
部分运行结果如下:
2021-02-31 02:28:55.227 | DEBUG | core.spider:schedule:133 - executing request
https://antispider5.scrape.center/
2021-02-31 02:28:55.232 | DEBUG | core.spider:get_proxy:30 - get proxy 118.99.127.62:8080
2021-02-31 02:28:55.232 | DEBUG | core.spider:request:102 - get proxy 118.99.127.62:8080
2021-02-31 02:28:56.838 | DEBUG | core.spider:schedule:135 - response status 200 of
从结果可以看到,爬虫首先爬取了首页,也就是第一个列表页,爬取时通过 get_proxy 方法获取一个代理,然后执行爬取,爬取成功,接着顺次产生了后续的 11 个请求,即 10 个爬取详情页的请求和 1 个爬取下一页的请求。之后调度队列单的下一个请求,爬取第一个详情页,爬取时获取了一个新的代理,爬取成功,输出了提取结果。然后接着往下执行,直到爬取结束。