RabbitMQ的使用
在爬取数据的过程中,可能需要一些进程间的通信机制,例如下面三个。
-
一个进程负责构造爬取请求,另—个进程负责执行爬取请求。
-
某个数据爬取进程执行完毕,通知另外一个负责数据处理的进程开始处理数据。
-
某个进程新建了一个爬取任务,通知另外一个负责数据爬取的进程开始爬取数据。
为了降低这些进程的耦合度,需要—个类似消息队列的中间件来存储和转发消息,实现进程间的通信。有了消息队列中间件之后,以上各机制中的两个进程就可以独立执行,它们之间的通信则由消息队列实现。
-
一个进程根据需要爬取的任务,构造请求对象并放入消息队列,另一个进程从队列中取出请求对象并执行爬取。
-
某个数据爬取进程执行完毕,就向消息队列发送消息,当另一个负责数据处理的进程监听到这类消息时,就开始处理数据。
-
某个进程新建了一个爬取任务后,就向消息队列发送消息,当另一个负责数据爬取的进程监听到这类消息时,就开始爬取数据。
那这个消息队列怎么实现呢?业界比较流行的实现有 RabbitMQ、RocketMQ、Kafka 等,其中 RabbitMQ 作为一个开源、可靠、灵活的消息队列中间件备受青睬,本节我们也来了解一下它的用法。
|
我们在前几节了解了一些数据存储库的用法,它们几乎都用于持久化存储数据。本节介绍的是一个消息队列中间件,它虽然主要应用于数据消息通信,但由于它也具备存储信息的能力,所以将其放在本章介绍。 |
RabbitMQ的介绍
RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议实现。AMQP 的全称是 Advanced Message Queue Protocol,即高级消息队列协议,其主要特点有面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全性。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储和转发消息,在易用性、扩展性、高可用性等方面均表现不俗,具体特点有以下这些。
-
可靠性(Reliability):RabbitMQ 通过一些机制保证可靠性,如持久化、传输确认、发布确认。
-
灵活的路由(FlexibleRouting):由 Exchange 将消息路由至消息队列。RabbitMQ 已经提供了一些内置的 Exchange 来实现典型的路由功能;对于较复杂的路由功能,则将多个 Exchange 绑定在一起,或者通过插件机制实现自己的 Exchange。
-
消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker。
-
高可用(HighlyAvailableQueues):消息队列可以在集群中的机器上镜像存储,使得队列在部分节点出问题的情况下仍然可用。
-
多种协议支持(multi-protocol):RabbitMQ 支持多种消息队列协议,例如 STOMP、MQTT。
-
多语言客户端(ManyClients):RabbitMQ 几乎支持所有常用语言,例如 Java、.NET、Ruby。
-
管理界面(ManagementUI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的多个方面。
-
跟踪机制(Tracing):RabbitMQ 提供了消息跟踪机制,如果消息异常,使用者就可以找出发生了什么。
-
插件机制(PluginSystem):RabbitMQ 提供了许多插件,实现了多方面的扩展,用户也可以编写自己的插件。
准备工作
在本节开始之前,请确保已经正确安装好了 RabbitMQ,安装方式可以参考 https://setup.scrape.center/rabbitmq ,需要确保其可以在本地正常运行。
除了安装 RabbitMQ,还需要安装一个操作 RabbitMQ 的 Python 库,叫作 pika,使用 pip3 工具安装即可:
pip3 install pika
更详细的安装说明可以参考 https://setup.scrape.center/pika 。
以上二者都安装好之后,开启本节的学习。
基本使用
首先,RabbitMQ 就是一个消息队列,我们要实现的进程间通信,从本质上讲是一个生产者-消费者模型,即一个进程作为生产者往消息队列放入消息,另一个进程作为消费者监听并处理消息队列中的消息,主要有 3 个关键点需要关注。
-
声明队列:通过指定一些参数,创建消息队列。
-
生产内容:生产者根据队列的连接信息连接队列,往队列中放入消息。
-
消费内容:消费者根据队列的连接信息连接队列,从队列中取出消息。
下面我们先来声明一个队列,相关代码如下:
import pika
QUEUE_NAME = 'scrape'
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME)
这里先连接 RabbitMQ 服务,由于 RabbitMQ 运行在本地,因此直接使用 localhost 即可,将得到的连接对象赋值为 connection。然后声明了一个频道对象,即 channel,利用它我们可以操作队列内消息的生产和消费。之后我们调用 channel 的 queue_declare 方法声明了一个队列,队列名称叫作 scrape。
下面我们尝试往队列中添加消息:
channel.basic_publish(exchange="",
routing_key=QUEUENAME,
body='HelloWorld!')
这里我们调用 channel 的 basic_publish 方法往队列放入了消息,其中 routing_key 是队列的名称,body 是放入的真实消息。
将以上代码写入一个名为 producer.py 的文件,即生产者。
现在,前两点一一声明队列和生产内容其实已经完成了,接下来就是消费内容了。
其实也很简单。消费者用同样的方式连接到 RabbitMQ 服务,代码如下:
import pika
QUEUE_NAME = 'scrape'
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME)
然后从队列中获取数据,代码如下
def callback(ch, method, properties, body):
print(f"Get {body}")
channel.basic_consume(queue='scrape',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
这里我们调用 channel 的 basic_consume 方法从队列中取出消息,实现了消费。同时指定回调方法 on_message_callback 的名称为 callback。另外,还将 auto_ack 设置为了 True, 代表消费者获取消息之后会自动通知消息队列当前消息已经被处理,可以移除这个消息。
最后,将以上代码保存为 consumer.py 文件 (消费者) 并运行,它会监听 scrape 队列的变动,如果有消息进入,就获取并消费,回调 callback 方法,打印输出结果。
现在运行 producer.py 文件,运行之后会连接刚才的队列,同时往该队列中放入一条消息,消息内容为 Hello World!。
这时再返回 consumer.py 文件,可以发现输出结果如下:
Get Hello World!
这说明生产者成功地把消息放入了消息队列,然后消费者收到并输出了这条消息。
可以继续运行 producer.py, 每运行一次,生产者都会向队列中放入一个消息,消费者会收到该消息并输出。
以上便是最基本的 RabbitMQ 的用法。
随用随取
上面的案例是基于 RabbitMQ 实现的最简单的生产者和消费者之间的通信,但如果把这种实现用在爬虫上是不太现实的,因为我们把消费者实现为了“订阅”模式,也就是说,消费者会一直监听队列的变化,一旦监听到队列中添加了消息,便要立马处理,它无法主动控制取用消息的时机。应用到爬虫中,消费者其实就是执行爬取请求的进程,生产者往队列中放置请求对象,消费者从中获取请求对象,然后执行这个请求(向服务器发起 HTTP 请求以获取响应)。但问题是,消费者是无法控制从发起请求到获取响应所消耗的时间的,因为什么时候获取到响应内容取决于服务器响应时间的长短,所以这意味着消费者不一定能很快地将消息处理完。如果生产者往队列中放置过多的请求,消费者处理不过来,那就会出现问题。因此消费者也应该有权控制取用消息的频率,这就是随用随取。
我们可以对前面的代码稍做改写,使生产者可以自行控制向队列放入请求对象的频率,消费者可以根据自已的处理能力控制从队列中取出请求对象的频率。如果生产者的放置速度比消费者的获取速度更快,那么队列中就缓存一些请求对象,反之队列有时候会处于闲置状态。
总的来说,消息队列起到了缓冲的作用,使生产者和消费者可以按照自已的节奏工作。
好,下面先实现下刚才所述的随用随取机制,队列中的消息可以暂且先用字符串表示,后面再将其更换为请求对象。
可以将生产者实现如下:
import pika
QUEUE_NAME = 'scrape'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME)
while True:
data = input()
channel.basic_publish(exchange='',
routing_key=QUEUE_NAME,
body=data)
print(f'Put {data}')
这里我们还是使用 input 方法来获取生产者的数据,输入的内容就是字符串,输入之后该内容会直接被放置到队列中,然后打印到控制台。
先运行一下生产者代码,然后回车输入几项内容:
foo Put foo bar Put bar baz Put baz
这里我们输入了 foo、bar、baz 三项内容,每次输入后控制台都会输出对应的结果。
然后将消费者实现如下:
import pika
QUEUE_NAME = 'scrape'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
while True:
input()
method_frame, header, body = channel.basic_get(
queue=QUEUE_NAME, auto_ack=True)
if body:
print(f'Get {body}')
我们这里也是通过 input 方法控制消费者何时获取下一个数据,获取方法是 basic_get,这个方法会返回一个元组,其中的 body 就是真正的数据。
运行消费者代码,然后按几下回车,每次按回车后都可以看到控制台输出一个从消息队列中获取的新数据:
Get b'foo' Get b'bar' Get b'baz'
这样就实现了消费者的随用随取。
优先级队列
刚才我们仅仅是了解了最基本的队列用法,RabbitMQ 还有一些高级功能。例如,生产者发送的消息具有优先级,队列会优先接收优先级高的消息,这要怎么实现呢?
其实很简单,只需要在声明队列的时候增加一个属性即可:
MAX_PRIORITY=100
channel.queue_declare(queue=QUEE_NAME,arguments={
'x-max-priority': MAX_PRIORITY
})
这里在声明队列的时候,增加了一个名为 x-max-priority 的参数,用来指定最大优先级,这样整个队列就能支持优先级了。
下面改写一下生产者代码,在其向队列发送消息的时候指定 properties 参数为 BasicProperties 对象,在 BasicProperties 对象里通过 priority 参数指定对应消息的优先级,实现如下:
import pika
MAX_PRIORITY = 100
QUEUE_NAME = 'scrape'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, arguments={
'x-max-priority': MAX_PRIORITY
})
while True:
data, priority = input().split()
channel.basic_publish(exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(
priority=int(priority)),
body=data)
print(f'Put {data}')
这里的优先级我们也可以手动输入,需要将输入的内容分为两部分,这两部分用空格隔开,运行结果如下:
foo 40 Put foo bar 20 Put bar baz 50 Put baz
这里我们输入了三次内容,第一次输入的是 foo 40,代表 foo 这个消息的优先级是 40;第二次输入 bar 20,代表 bar 这个消息的优先级是 20;第三次输入 baz 50,代表 baz 这个消息的优先级是 50。
然后重新运行消费者代码,并按几次回车,可以看到如下输出结果:
Get b'baz' Get b'foo' Get b'bar'
从输出结果我们可以看到,消息按照优先级被取出来了。baz 的优先级是最高的,所以被最先取出来。bar 的优先级是最低的,所以被最后取出来。
队列持久化
除了设置优先级,还可以将队列持久化存储,如果不设置持久化存储,那么数据在 RabbitMQ 重启之后就没有了。
在声明队列时指定 durable 为 True,即可开启持久化存储,实现如下:
channel.queue_declare(queue=QUEUENAME,
arguments={'x-max-priority': MAX_PRIORITY}, durable=True)
同时在添加消息的时候需要指定 BasicProperties 对象的 delivery_mode 为 2,实现如下:
properties = pika.BasicProperties(priority=int(priority),delivery_mode=2)
所以,这时的生产者代码改写如下:
import pika
MAX_PRIORITY = 100
QUEUE_NAME = 'scrape'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, arguments={
'x-max-priority': MAX_PRIORITY
}, durable=True)
while True:
data, priority = input().split()
channel.basic_publish(exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(
priority=int(priority),
delivery_mode=2,
),
body=data)
print(f'Put {data}')
这样就可以持久化存储队列了。
实战
最后,我们将字符串消息改写成请求对象,这里需要借助 requests 库中的 Request 类来表示一个请求对象。
构造请求对象时,传入请求方法和请求 URL 即可,代码如下:
request = requests.Request(GET', url)
这样就构造了一个 GET 请求,然后可以通过 pickle 工具进行序列化,最后发送到 RabbitMQ 中。生产者代码实现如下:
import pika
import requests
import pickle
MAX_PRIORITY = 100
TOTAL = 100
QUEUE_NAME = 'scrape_queue'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
for i in range(1, TOTAL + 1):
url = f'https://ssr1.scrape.center/detail/{i}'
request = requests.Request('GET', url)
channel.basic_publish(exchange='',
routing_key=QUEUE_NAME,
properties=pika.BasicProperties(
delivery_mode=2,
),
body=pickle.dumps(request))
print(f'Put request of {url}')
运行这段生产者代码,就构造出了 100 个请求对象并发送到了 RabbitMQ 中。
对于消费者,可以编写一个循环,让它不断地从队列中取出请求对象,取出一个就执行一次爬取任务,实现如下:
import pika
import pickle
import requests
MAX_PRIORITY = 100
QUEUE_NAME = 'scrape_queue'
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
session = requests.Session()
def scrape(request):
try:
response = session.send(request.prepare())
print(f'success scraped {response.url}')
except requests.RequestException:
print(f'f\'error occurred when scraping {request.url}\'')
while True:
method_frame, header, body = channel.basic_get(
queue=QUEUE_NAME, auto_ack=True)
if body:
request = pickle.loads(body)
print(f'Get {request}')
scrape(request)
这里消费者调用 basic_get 方法获取了消息,然后通过 pickle 工具把消息反序列化还原成一个请求对象,之后使用 session 的 send 方法执行该请求,爬取了数据,如果爬取成功就打印爬取成功的消息。
运行结果如下:
Get <Request [GET]> success scraped https://ssr1.scrape.center/detail/1 Get <Request [GET]> success scraped https://ssr1.scrape.center/detail/2 ... Get <Request [GET]> success scraped https://ssr1.scrape.center/detail/100
可以看到,消费者依次取出了请求对象,然后成功完成了一个个爬取任务。