测试其他系统/协议

Locust 默认只支持 HTTP/HTTPS,但可以通过扩展支持几乎任何协议。这通常是通过封装协议库并在每个调用完成后触发请求事件来实现的,这样 Locust 就可以知道请求的执行情况。

使用的协议库必须能够被 gevent monkey-patch

几乎所有纯 Python 的库(使用 Python 的 socket 模块或其他标准库函数,如 subprocess)应该可以开箱即用——但如果它们的 I/O 调用是通过编译的 C 代码实现的,gevent 就无法对其进行补丁修复。这会阻塞整个 Locust/Python 进程(实际上这会限制每个 worker 进程只能运行一个用户)。

一些 C 库提供了其他解决方法。例如,如果您想使用 psycopg2 来进行 PostgreSQL 的性能测试,可以使用 psycogreen。如果您愿意深入操作,也许可以自己为某些库打补丁,但这超出了本文档的范围。

XML-RPC

假设我们有一个 XML-RPC 服务器,想要进行负载测试。

import random
import time
from xmlrpc.server import SimpleXMLRPCServer

def get_time():
    time.sleep(random.random())
    return time.time()

def get_random_number(low, high):
    time.sleep(random.random())
    return random.randint(low, high)

server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()

我们可以通过封装 xmlrpc.client.ServerProxy 创建一个通用的 XML-RPC 客户端:

from locust import User, task
import time
from xmlrpc.client import Fault, ServerProxy

class XmlRpcClient(ServerProxy):
    """
    XmlRpcClient 是对标准库中 ServerProxy 的封装。
    它代理任何函数调用,并在调用完成后触发 *request* 事件,
    使得这些调用能被 Locust 记录。
    """

    def __init__(self, host, request_event):
        super().__init__(host)
        self._request_event = request_event

    def __getattr__(self, name):
        func = ServerProxy.__getattr__(self, name)

        def wrapper(*args, **kwargs):
            request_meta = {
                "request_type": "xmlrpc",
                "name": name,
                "start_time": time.time(),
                "response_length": 0,  # 计算 xmlrpc.client 响应的长度较困难
                "response": None,
                "context": {},  # 如果您希望实现上下文,请参阅 HttpUser
                "exception": None,
            }
            start_perf_counter = time.perf_counter()
            try:
                request_meta["response"] = func(*args, **kwargs)
            except Fault as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
            self._request_event.fire(**request_meta)  # 这会让请求在 Locust 中被记录
            return request_meta["response"]

        return wrapper


class XmlRpcUser(User):
    """
    一个简化的 Locust 用户类,为其子类提供 XmlRpcClient
    """

    abstract = True  # 不要在运行 Locust 时实例化这个类

    def __init__(self, environment):
        super().__init__(environment)
        self.client = XmlRpcClient(self.host, request_event=environment.events.request)


# 实际运行 Locust 的用户类
# 这是唯一与我们正在测试的服务特定的类
class MyUser(XmlRpcUser):
    host = "http://127.0.0.1:8877/"

    @task
    def get_time(self):
        self.client.get_time()

    @task
    def get_random_number(self):
        self.client.get_random_number(0, 100)

gRPC

假设我们有一个 gRPC 服务器,想要进行负载测试:

import logging
import time
from concurrent import futures
import grpc
import hello_pb2
import hello_pb2_grpc

logger = logging.getLogger(__name__)

class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
    def SayHello(self, request, context):
        name = request.name
        time.sleep(1)
        return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")

def start_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
    server.add_insecure_port("localhost:50051")
    server.start()
    logger.info("gRPC server started")
    server.wait_for_termination()

if __name__ == "__main__":
    start_server()

使用 拦截器 来将事件发送到 Locust:

from locust import User
from locust.exception import LocustError
import time
from typing import Any, Callable
import grpc
import grpc.experimental.gevent as grpc_gevent
from grpc_interceptor import ClientInterceptor

# 将 gRPC 修改为使用 gevent 而非 asyncio
grpc_gevent.init_gevent()

class LocustInterceptor(ClientInterceptor):
    def __init__(self, environment, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.env = environment

    def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: grpc.ClientCallDetails,
    ):
        response = None
        exception = None
        start_perf_counter = time.perf_counter()
        response_length = 0
        try:
            response = method(request_or_iterator, call_details)
            response_length = response.result().ByteSize()
        except grpc.RpcError as e:
            exception = e

        self.env.events.request.fire(
            request_type="grpc",
            name=call_details.method,
            response_time=(time.perf_counter() - start_perf_counter) * 1000,
            response_length=response_length,
            response=response,
            context=None,
            exception=exception,
        )
        return response


class GrpcUser(User):
    abstract = True
    stub_class = None

    def __init__(self, environment):
        super().__init__(environment)
        for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
            if attr_value is None:
                raise LocustError(f"You must specify the {attr_name}.")
        self._channel = grpc.insecure_channel(self.host)
        interceptor = LocustInterceptor(environment=environment)
        self._channel = grpc.intercept_channel(self._channel, interceptor)
        self.stub = self.stub_class(self._channel)

下面是一个使用上述 gRPC 示例的 Locustfile:

from locust import events, task
import gevent
import grpc_user
import hello_pb2
import hello_pb2_grpc
from hello_server import start_server

# 启动伪服务器。真实测试中你不会这么做。
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
    gevent.spawn(start_server)

class HelloGrpcUser(grpc_user.GrpcUser):
    host = "localhost:50051"
    stub_class = hello_pb2_grpc.HelloServiceStub

    @task
    def sayHello(self):
        self.stub.SayHello(hello_pb2.HelloRequest(name="Test"))

基于请求的库/SDK

如果您想使用一个在底层使用 requests.Session 对象的库,通常可以跳过上述复杂性。例如,Zeep 提供的 SOAP 客户端就允许您显式地传递一个 Session。在这种情况下,只需将 HttpUserclient 传递给该库即可,任何使用该库发出的请求都将被 Locust 记录。

即使您的库没有显式地暴露该 Session,您也可以通过覆盖其内部使用的 Session 来使其工作。例如,以下是如何为 Archivist 客户端做这个操作:

import locust
from locust.user import task
from archivist.archivist import Archivist  # 需要测试的示例库

class ArchivistUser(locust.HttpUser):
    def on_start(self):
        AUTH_TOKEN = None

        with open("auth.text") as f:
            AUTH_TOKEN = f.read()

        # 启动库提供的客户端实例
        self.arch: Archivist = Archivist(url=self.host, auth=AUTH_TOKEN)
        # 用 locust 的 session 覆盖库的内部 _session
        self.arch._session = self.client

    @task
    def Create_assets(self):
        """用户以最快的速度创建资产"""

        while True:
            self.arch.assets.create(behaviours=["Builtin", "RecordEvidence", "Attachments"], attrs={"foo": "bar"})

REST 示例

FastHttpUser


其他示例

请参阅 locust-plugins,它提供了用于 WebSocket/SocketIO、Kafka、Selenium/WebDriver、Playwright 等的用户类。