Skip to content

Commit

Permalink
Merge pull request #2272 from zifter/feature/grpc-interceptor-example
Browse files Browse the repository at this point in the history
GRPC example - rewrite using interceptor
  • Loading branch information
cyberw committed Dec 11, 2022
2 parents cbe8180 + d12623d commit e60c1f5
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 42 deletions.
3 changes: 2 additions & 1 deletion docs/testing-other-systems.rst
Expand Up @@ -42,9 +42,10 @@ Dummy server to test:

.. literalinclude:: ../examples/grpc/hello_server.py

gRPC client, base User and example usage:
gRPC client, base GrpcUser, interceptor for sending events to locust and example usage:

.. literalinclude:: ../examples/grpc/locustfile.py

As base class for interceptor is used `grpc-interceptor <https://pypi.org/project/grpc-interceptor/>` library.

For more examples of user types, see `locust-plugins <https://github.com/SvenskaSpel/locust-plugins#users>`_ (it has users for WebSocket/SocketIO, Kafka, Selenium/WebDriver and more).
4 changes: 4 additions & 0 deletions examples/grpc/hello_server.py
Expand Up @@ -22,3 +22,7 @@ def start_server():
server.start()
logger.info("gRPC server started")
server.wait_for_termination()


if __name__ == "__main__":
start_server()
88 changes: 47 additions & 41 deletions examples/grpc/locustfile.py
@@ -1,18 +1,21 @@
# make sure you use grpc version 1.39.0 or later,
# because of https://github.com/grpc/grpc/issues/15880 that affected earlier versions
from typing import Callable, Any
import time

import grpc
import hello_pb2_grpc
import hello_pb2
import grpc.experimental.gevent as grpc_gevent
import gevent
from locust import events, User, task
from locust.exception import LocustError
from locust.user.task import LOCUST_STATE_STOPPING
from grpc_interceptor import ClientInterceptor

import hello_pb2_grpc
import hello_pb2

from hello_server import start_server
import gevent
import time

# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()


Expand All @@ -22,36 +25,38 @@ def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)


class GrpcClient:
def __init__(self, environment, stub):
class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
super().__init__(*args, **kwargs)

self.env = environment
self._stub_class = stub.__class__
self._stub = stub

def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)

def wrapper(*args, **kwargs):
request_meta = {
"request_type": "grpc",
"name": name,
"start_time": time.time(),
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(request_meta["response"].message)
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self.env.events.request.fire(**request_meta)
return request_meta["response"]

return wrapper

def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
response = None
exception = None
start_perf_counter = time.perf_counter()
response_length = 0
try:
response = method(request_or_iterator, call_details)
response_length = response.result().ByteSize()
except grpc.RpcError as e:
exception = e

self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=response_length,
response=response,
context=None,
exception=exception,
)
return response


class GrpcUser(User):
Expand All @@ -64,10 +69,12 @@ def __init__(self, environment):
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")

self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(environment, stub)
interceptor = LocustInterceptor(environment=environment)
self._channel = grpc.intercept_channel(self._channel, interceptor)

self.stub = self.stub_class(self._channel)


class HelloGrpcUser(GrpcUser):
Expand All @@ -76,6 +83,5 @@ class HelloGrpcUser(GrpcUser):

@task
def sayHello(self):
if not self._channel_closed:
self.client.SayHello(hello_pb2.HelloRequest(name="Test"))
self.stub.SayHello(hello_pb2.HelloRequest(name="Test"))
time.sleep(1)

0 comments on commit e60c1f5

Please sign in to comment.