如何使用 AWS SQS 构建强大的 ETL 管道

抓取大量网站和数据可能是一个复杂且缓慢的过程。 但它可以充分利用并行处理,可以在本地使用多个处理器线程,也可以使用消息队列系统分发抓取请求以报告抓取器。 类似于提取、转换和加载管道 (ETL) 的过程中可能还需要多个步骤。 这些管道还可以使用消息队列架构结合抓取来轻松构建。

使用消息队列架构为我们的管道带来了两个优势:

  • 鲁棒性

  • 可扩展性

处理变得鲁棒,就像单个消息的处理失败一样,然后可以重新排队该消息以进行再次处理。 因此,如果抓取器发生故障,我们可以重新启动它,而不会丢失抓取页面的请求,否则消息队列系统会将请求传递给另一个抓取器。

它提供了可扩展性,因为同一或不同系统上的多个抓取器可以侦听队列。 然后可以在不同的核心或更重要的是不同的系统上同时处理多个消息。 在基于云的抓取器中,您可以按需扩展抓取器实例的数量以处理更大的负载。

可以使用的常见消息队列系统包括:Kafka、RabbitMQ 和 Amazon SQS。 我们的示例将使用 Amazon SQS,尽管 Kafka 和 RabbitMQ 都非常好用(我们将在本书后面看到 RabbitMQ 的使用)。 我们使用 SQS 来保持使用基于 AWS 云的服务的模型,就像我们在本章前面使用 S3 所做的那样。

准备工作

作为示例,我们将构建一个非常简单的 ETL 流程,该流程将读取行星主页面并将行星数据存储在 MySQL 中。 它还会将页面中每个更多信息链接的单个消息传递到队列,其中 0 个或多个进程可以接收这些请求并对这些链接执行进一步处理。

要从 Python 访问 SQS,我们将重新使用 boto3 库。

如何操作 - 将消息发布到 AWS 队列

03/create_messages.py 文件包含读取行星数据并将 MoreInfo 属性中的 URL 发布到 SQS 队列的代码:

from urllib.request import urlopen
from bs4 import BeautifulSoup

import boto3
import botocore

# declare our keys (normally, don't hard code this)
access_key = "AKIAIXFTCYO7FEL55TCQ"
access_secret_key = "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"

# create sqs client
sqs = boto3.client('sqs', "us-west-2", aws_access_key_id=access_key, aws_secret_access_key=access_secret_key)

# create / open the SQS queue
queue = sqs.create_queue(QueueName="PlanetMoreInfo")
print(queue)

# read and parse the planets HTML
html = urlopen("http://127.0.0.1:8080/pages/planets.min.html")
bsobj = BeautifulSoup(html, "lxml")

planets = []
planet_rows = bsobj.html.body.div.table.findAll("tr", {"class": "planet"})

for i in planet_rows:
    tds = i.findAll("td")

    # get the URL
    more_info_url = tds[5].findAll("a")[0]["href"].strip()

    # send the URL to the queue
    sqs.send_message(QueueUrl=queue["QueueUrl"],
                     MessageBody=more_info_url)
    print("Sent %s to %s" % (more_info_url, queue["QueueUrl"]))

在终端中运行代码,您将看到类似于以下内容的输出:

{'QueueUrl':
'https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo',
'ResponseMetadata': {'RequestId': '2aad7964-292a-5bf6-b838-2b7a5007af22',
'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Mon, 28
Aug 2017 20:02:53 GMT', 'content-type': 'text/xml', 'content-length':
'336', 'connection': 'keep-alive', 'x-amzn-requestid': '2aad7964-292a-5bf6-
b838-2b7a5007af22'}, 'RetryAttempts': 0}}
Sent https://en.wikipedia.org/wiki/Mercury_(planet) to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Venus to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Earth to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Mars to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Jupiter to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Saturn to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Uranus to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Neptune to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Sent https://en.wikipedia.org/wiki/Pluto to
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo

现在进入 AWS SQS 控制台。 您应该看到队列已创建并且包含 9 条消息:

image 2024 01 29 13 59 43 790

工作原理

该代码连接到给定账户和 AWS 的 us-west-2 区域。 如果队列不存在,则创建一个队列。 然后,对于源内容中的每个行星,程序都会发送一条消息,其中包含该行星的更多信息 URL。

此时,没有人在监听队列,因此消息将坐在那里,直到最终被读取或过期。 每条消息的默认有效期为 4 天。

如何做到这一点 - 读取和处理消息

要处理消息,请运行 03/process_messages.py 程序:

import boto3
import botocore
import requests
from bs4 import BeautifulSoup

print("Starting")

# declare our keys (normally, don't hard code this)
access_key = "AKIAIXFTCYO7FEL55TCQ"
access_secret_key = "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"

# create sqs client
sqs = boto3.client('sqs', "us-west-2", aws_access_key_id=access_key, aws_secret_access_key=access_secret_key)

print("Created client")

# create / open the SQS queue
queue = sqs.create_queue(QueueName="PlanetMoreInfo")
queue_url = queue["QueueUrl"]
print("Opened queue: %s" % queue_url)

while True:
    print("Attempting to receive messages")
    response = sqs.receive_message(QueueUrl=queue_url,
                                   MaxNumberOfMessages=1,
                                   WaitTimeSeconds=1)
    if not 'Messages' in response:
        print("No messages")
        continue

    message = response['Messages'][0]
    receipt_handle = message['ReceiptHandle']
    url = message['Body']

    # parse the page
    html = requests.get(url)
    bsobj = BeautifulSoup(html.text, "lxml")

    # now find the planet name and albedo info
    planet = bsobj.findAll("h1", {"id": "firstHeading"})[0].text
    albedo_node = bsobj.findAll("a", {"href": "/wiki/Geometric_albedo"})[0]
    root_albedo = albedo_node.parent
    albedo = root_albedo.text.strip()

    # delete the message from the queue
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=receipt_handle
    )

    # print the planets name and albedo info
    print("%s: %s" % (planet, albedo))

使用 python process_messages.py 运行脚本。 您将看到类似于以下内容的输出:

Starting
Created client
Opened queue:
https://us-west-2.queue.amazonaws.com/414704166289/PlanetMoreInfo
Attempting to receive messages
Jupiter: 0.343 (Bond)
0.52 (geom.)[3]
Attempting to receive messages
Mercury (planet): 0.142 (geom.)[10]
Attempting to receive messages
Uranus: 0.300 (Bond)
0.51 (geom.)[5]
Attempting to receive messages
Neptune: 0.290 (bond)
0.41 (geom.)[4]
Attempting to receive messages
Pluto: 0.49 to 0.66 (geometric, varies by 35%)[1][7]
Attempting to receive messages
Venus: 0.689 (geometric)[2]
Attempting to receive messages
Earth: 0.367 geometric[3]
Attempting to receive messages
Mars: 0.170 (geometric)[8]
0.25 (Bond)[7]
Attempting to receive messages
Saturn: 0.499 (geometric)[4]
Attempting to receive messages
No messages

工作原理

该程序连接到 SQS 并打开队列。 打开队列进行读取也是使用 sqs.create_queue 完成的,如果队列已经存在,它将简单地返回队列。

然后,它进入一个调用 sqs.receive_message 的循环,指定队列的 URL、每次读取中要接收的消息数以及如果没有可用消息则等待的最长时间(以秒为单位)。

如果读取消息,则会检索消息中的 URL,并使用抓取技术读取 URL 处的页面并提取行星的名称和有关其反照率的信息。

请注意,我们检索消息的接收句柄。 这是从队列中删除消息所必需的。 如果我们不删除该消息,一段时间后它将在队列中可用。 因此,如果我们的抓取工具崩溃并且没有执行此确认,则 SQS 将再次提供消息以供另一个抓取工具(或备份时的同一个抓取工具)进行处理。

还有更多

您可以在以下位置找到有关 S3 的更多信息: https://aws.amazon.com/s3/ 。 有关 API 详细信息的具体信息,请访问: https://aws.amazon.com/documentation/s3/