测试其他系统/协议
Locust 默认只支持 HTTP/HTTPS,但可以通过扩展支持几乎任何协议。这通常是通过封装协议库并在每个调用完成后触发请求事件来实现的,这样 Locust 就可以知道请求的执行情况。
使用的协议库必须能够被 gevent monkey-patch。 几乎所有纯 Python 的库(使用 Python 的 一些 C 库提供了其他解决方法。例如,如果您想使用 psycopg2 来进行 PostgreSQL 的性能测试,可以使用 psycogreen。如果您愿意深入操作,也许可以自己为某些库打补丁,但这超出了本文档的范围。 |
XML-RPC
假设我们有一个 XML-RPC 服务器,想要进行负载测试。
import random
import time
from xmlrpc.server import SimpleXMLRPCServer
def get_time():
time.sleep(random.random())
return time.time()
def get_random_number(low, high):
time.sleep(random.random())
return random.randint(low, high)
server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()
我们可以通过封装 xmlrpc.client.ServerProxy
创建一个通用的 XML-RPC 客户端:
from locust import User, task
import time
from xmlrpc.client import Fault, ServerProxy
class XmlRpcClient(ServerProxy):
"""
XmlRpcClient 是对标准库中 ServerProxy 的封装。
它代理任何函数调用,并在调用完成后触发 *request* 事件,
使得这些调用能被 Locust 记录。
"""
def __init__(self, host, request_event):
super().__init__(host)
self._request_event = request_event
def __getattr__(self, name):
func = ServerProxy.__getattr__(self, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "xmlrpc",
"name": name,
"start_time": time.time(),
"response_length": 0, # 计算 xmlrpc.client 响应的长度较困难
"response": None,
"context": {}, # 如果您希望实现上下文,请参阅 HttpUser
"exception": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
except Fault as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self._request_event.fire(**request_meta) # 这会让请求在 Locust 中被记录
return request_meta["response"]
return wrapper
class XmlRpcUser(User):
"""
一个简化的 Locust 用户类,为其子类提供 XmlRpcClient
"""
abstract = True # 不要在运行 Locust 时实例化这个类
def __init__(self, environment):
super().__init__(environment)
self.client = XmlRpcClient(self.host, request_event=environment.events.request)
# 实际运行 Locust 的用户类
# 这是唯一与我们正在测试的服务特定的类
class MyUser(XmlRpcUser):
host = "http://127.0.0.1:8877/"
@task
def get_time(self):
self.client.get_time()
@task
def get_random_number(self):
self.client.get_random_number(0, 100)
gRPC
假设我们有一个 gRPC 服务器,想要进行负载测试:
import logging
import time
from concurrent import futures
import grpc
import hello_pb2
import hello_pb2_grpc
logger = logging.getLogger(__name__)
class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
def SayHello(self, request, context):
name = request.name
time.sleep(1)
return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")
def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
server.add_insecure_port("localhost:50051")
server.start()
logger.info("gRPC server started")
server.wait_for_termination()
if __name__ == "__main__":
start_server()
使用 拦截器 来将事件发送到 Locust:
from locust import User
from locust.exception import LocustError
import time
from typing import Any, Callable
import grpc
import grpc.experimental.gevent as grpc_gevent
from grpc_interceptor import ClientInterceptor
# 将 gRPC 修改为使用 gevent 而非 asyncio
grpc_gevent.init_gevent()
class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
super().__init__(*args, **kwargs)
self.env = environment
def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
response = None
exception = None
start_perf_counter = time.perf_counter()
response_length = 0
try:
response = method(request_or_iterator, call_details)
response_length = response.result().ByteSize()
except grpc.RpcError as e:
exception = e
self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=response_length,
response=response,
context=None,
exception=exception,
)
return response
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
interceptor = LocustInterceptor(environment=environment)
self._channel = grpc.intercept_channel(self._channel, interceptor)
self.stub = self.stub_class(self._channel)
下面是一个使用上述 gRPC 示例的 Locustfile:
from locust import events, task
import gevent
import grpc_user
import hello_pb2
import hello_pb2_grpc
from hello_server import start_server
# 启动伪服务器。真实测试中你不会这么做。
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)
class HelloGrpcUser(grpc_user.GrpcUser):
host = "localhost:50051"
stub_class = hello_pb2_grpc.HelloServiceStub
@task
def sayHello(self):
self.stub.SayHello(hello_pb2.HelloRequest(name="Test"))
基于请求的库/SDK
如果您想使用一个在底层使用 requests.Session
对象的库,通常可以跳过上述复杂性。例如,Zeep 提供的 SOAP 客户端就允许您显式地传递一个 Session。在这种情况下,只需将 HttpUser
的 client
传递给该库即可,任何使用该库发出的请求都将被 Locust 记录。
即使您的库没有显式地暴露该 Session
,您也可以通过覆盖其内部使用的 Session 来使其工作。例如,以下是如何为 Archivist 客户端做这个操作:
import locust
from locust.user import task
from archivist.archivist import Archivist # 需要测试的示例库
class ArchivistUser(locust.HttpUser):
def on_start(self):
AUTH_TOKEN = None
with open("auth.text") as f:
AUTH_TOKEN = f.read()
# 启动库提供的客户端实例
self.arch: Archivist = Archivist(url=self.host, auth=AUTH_TOKEN)
# 用 locust 的 session 覆盖库的内部 _session
self.arch._session = self.client
@task
def Create_assets(self):
"""用户以最快的速度创建资产"""
while True:
self.arch.assets.create(behaviours=["Builtin", "RecordEvidence", "Attachments"], attrs={"foo": "bar"})
REST 示例
见 FastHttpUser。
其他示例
请参阅 locust-plugins,它提供了用于 WebSocket/SocketIO、Kafka、Selenium/WebDriver、Playwright 等的用户类。