Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC example - rewrite using interceptor #2272

Merged
merged 2 commits into from Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/testing-other-systems.rst
Expand Up @@ -42,9 +42,10 @@ Dummy server to test:

.. literalinclude:: ../examples/grpc/hello_server.py

gRPC client, base User and example usage:
gRPC client, base GrpcUser, interceptor for sending events to locust and example usage:

.. literalinclude:: ../examples/grpc/locustfile.py

As base class for interceptor is used `grpc-interceptor <https://pypi.org/project/grpc-interceptor/>` library.

For more examples of user types, see `locust-plugins <https://github.com/SvenskaSpel/locust-plugins#users>`_ (it has users for WebSocket/SocketIO, Kafka, Selenium/WebDriver and more).
4 changes: 4 additions & 0 deletions examples/grpc/hello_server.py
Expand Up @@ -22,3 +22,7 @@ def start_server():
server.start()
logger.info("gRPC server started")
server.wait_for_termination()


if __name__ == "__main__":
start_server()
88 changes: 47 additions & 41 deletions examples/grpc/locustfile.py
@@ -1,18 +1,21 @@
# make sure you use grpc version 1.39.0 or later,
# because of https://github.com/grpc/grpc/issues/15880 that affected earlier versions
from typing import Callable, Any
import time

import grpc
import hello_pb2_grpc
import hello_pb2
import grpc.experimental.gevent as grpc_gevent
import gevent
from locust import events, User, task
from locust.exception import LocustError
from locust.user.task import LOCUST_STATE_STOPPING
from grpc_interceptor import ClientInterceptor

import hello_pb2_grpc
import hello_pb2

from hello_server import start_server
import gevent
import time

# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()


Expand All @@ -22,36 +25,38 @@ def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)


class GrpcClient:
def __init__(self, environment, stub):
class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
super().__init__(*args, **kwargs)

self.env = environment
self._stub_class = stub.__class__
self._stub = stub

def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)

def wrapper(*args, **kwargs):
request_meta = {
"request_type": "grpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(request_meta["response"].message)
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self.env.events.request.fire(**request_meta)
return request_meta["response"]

return wrapper

def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
response = None
exception = None
start_perf_counter = time.perf_counter()
response_length = 0
try:
response = method(request_or_iterator, call_details)
response_length = response.result().ByteSize()
except grpc.RpcError as e:
exception = e

self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=response_length,
response=response,
context=None,
exception=exception,
)
return response


class GrpcUser(User):
Expand All @@ -64,10 +69,12 @@ def __init__(self, environment):
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")

self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(environment, stub)
interceptor = LocustInterceptor(environment=environment)
self._channel = grpc.intercept_channel(self._channel, interceptor)

self.stub = self.stub_class(self._channel)


class HelloGrpcUser(GrpcUser):
Expand All @@ -76,6 +83,5 @@ class HelloGrpcUser(GrpcUser):

@task
def sayHello(self):
if not self._channel_closed:
self.client.SayHello(hello_pb2.HelloRequest(name="Test"))
self.stub.SayHello(hello_pb2.HelloRequest(name="Test"))
time.sleep(1)