提高性能,使用更快的 HTTP 客户端
Locust 的默认 HTTP 客户端使用的是 python-requests。它提供了一个非常友好的 API,许多 Python 开发者都熟悉,并且维护得非常好。但是,如果你打算进行非常高吞吐量的测试,并且在运行 Locust 时硬件有限,requests 有时可能效率不足。
因此,Locust 还提供了 FastHttpUser,它使用 geventhttpclient 代替。它提供了非常相似的 API,并且使用的 CPU 时间显著较少,在某些情况下,能够使特定硬件上每秒最大请求数提高 5 倍到 6 倍。
无法精确说出在你的特定硬件和特定测试计划下,Locust 可以处理多少请求,因此你需要进行测试。检查 Locust 的控制台输出,如果它受到 CPU 限制,它会发出警告。
在最佳情况下(在一个无限循环中做小请求),使用 FastHttpUser 的单个 Locust 进程(限制为一个 CPU 核心)可以达到每秒约 16000 个请求,而使用 HttpUser 则为 4000(在 2021 年的 M1 MacBook Pro 和 Python 3.11 上测试)。
相对提升可能在请求负载较大的情况下更为明显,但如果你的测试涉及到与请求无关的 CPU 密集型操作,提升也可能较小。
当然,实际上,你应该 为每个 CPU 核心运行一个 Locust 进程。
只要你的负载生成器 CPU 没有过载,FastHttpUser 的响应时间应该与 HttpUser 几乎相同。它并不会使单个请求更快。 |
如何使用 FastHttpUser
只需继承 FastHttpUser,而不是 HttpUser:
from locust import task, FastHttpUser
class MyUser(FastHttpUser):
@task
def index(self):
response = self.client.get("/")
并发
单个 FastHttpUser/geventhttpclient 会话可以运行并发请求,你只需要为每个请求启动 greenlet:
@task
def t(self):
def concurrent_request(url):
self.client.get(url)
pool = gevent.pool.Pool()
urls = ["/url1", "/url2", "/url3"]
for url in urls:
pool.spawn(concurrent_request, url)
pool.join()
FastHttpUser/geventhttpclient 与 HttpUser/python-requests 非常相似,但有时会有一些微妙的差别。特别是当你与客户端库的内部机制打交道时,例如在手动管理 cookies 时。 |
REST
FastHttpUser 提供了一个用于测试 REST/JSON HTTP 接口的 rest
方法。它是 self.client.request
的一个封装:
-
将 JSON 响应解析为响应对象中的字典
js
。如果响应不是有效的 JSON,它会将请求标记为失败。 -
默认将
Content-Type
和Accept
请求头设置为application/json
。 -
设置
catch_response=True
(因此总是使用with
块)。 -
捕获
with
块中抛出的任何未处理的异常,将样本标记为失败(而不是立即退出任务,且没有触发请求事件)。
from locust import task, FastHttpUser
class MyUser(FastHttpUser):
@task
def t(self):
with self.rest("POST", "/", json={"foo": 1}) as resp:
if resp.js is None:
pass # 无需做任何操作,已标记为失败
elif "bar" not in resp.js:
resp.failure(f"'bar' missing from response {resp.text}")
elif resp.js["bar"] != 42:
resp.failure(f"'bar' had an unexpected value: {resp.js['bar']}")
有关完整示例,请参见 rest.py
。该示例还展示了如何使用继承来提供特定于 REST API 的行为,这些行为适用于多个请求/测试计划。
此功能是新的,接口/实现的细节可能会在 Locust 的新版本中发生变化。 |
连接处理
默认情况下,用户会重用相同的 TCP/HTTP 连接(除非连接被断开)。为了更真实地模拟新浏览器连接到应用程序,你可以手动关闭该连接。
@task
def t(self):
self.client.client.clientpool.close() # self.client.client 不是错别字
self.client.get("/") # 此时会创建一个新的连接