I am trying the below grpc locust script but it gives the following error :
events.request_success.fire(request_type="grpc", ^^^^^^^^^^^^^^^^^^ AttributeError: 'Events' object has no attribute 'request_success'
Script
import sys
import grpc
import inspect
import time
import gevent
# Libs
from locust.contrib.fasthttp import FastHttpUser
from locust import task, events, constant
from locust import events as ev
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner
from **** import PublicAPIStub
import ****.v1.public_api_pb2 as dtos
import domain.v1.dto_pb2 as domain
import domain.v1.geo_pb2 as geo
def stopwatch(func):
"""To be updated"""
def wrapper(*args, **kwargs):
"""To be updated"""
# get task's function name
previous_frame = inspect.currentframe().f_back
_, _, task_name, _, _ = inspect.getframeinfo(previous_frame)
start = time.time()
result = None
try:
result = func(*args, **kwargs)
except Exception as e:
total = int((time.time() - start) * 1000)
ev.request_failure.fire(request_type="grpc",
name=task_name,
response_time=total,
response_length=0,
exception=e)
else:
total = int((time.time() - start) * 1000)
ev.request_success.fire(request_type="grpc",
name=task_name,
response_time=total,
response_length=0)
return result
return wrapper
class GRPCMyLocust(FastHttpUser):
host = 'http://*********:443'
wait_time = constant(0)
def on_start(self):
""" on_start is called when a Locust start before any task is scheduled """
pass
def on_stop(self):
""" on_stop is called when the TaskSet is stopping """
pass
@task
@stopwatch
def grpc_client_task(self):
"""To be updated"""
try:
ssl_metadata = grpc.ssl_channel_credentials()
with grpc.secure_channel('laas-zz-lt.usehurrier.com:443', ssl_metadata) as channel:
input_data = dtos.Request(type='test')
stub = PublicAPIStub(channel)
stub.ListAvailableVendors(input_data)
except (KeyboardInterrupt, SystemExit):
sys.exit(0)
# Stopping the locust if a threshold (in this case the fail ratio) is exceeded
def checker(environment):
while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
if environment.runner.stats.total.fail_ratio > 0.4:
print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(checker, environment)
Any idea what might be the problem with the script?
Note: I am receiving a response from the server so the connection works fine.
locust --version: locust 2.15.1 from /opt/homebrew/lib/python3.11/site-packages/locust (python 3.11.3)
Locust 1.5 deprecated request_success
and request_failure
in favor of a unified request
event.
https://docs.locust.io/en/stable/changelog.html#id47
2.15 removed the deprecated ones.
https://docs.locust.io/en/stable/changelog.html#id2
Here's the doc for request
event.
https://docs.locust.io/en/stable/api.html#locust.event.Events.request
So for you, it'd be something like:
ev.request.fire(request_type="grpc",
name=task_name,
response_time=total,
response_length=0,
exception=e)
and
ev.request.fire(request_type="grpc",
name=task_name,
response_time=total,
response_length=0)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With