Open pawamoy opened 4 years ago
You can achieve this by spawning greenlets within your locust tasks. Here's a small example:
from gevent.pool import Pool
from locust import HttpLocust, TaskSet, task, constant
class MyLocust(HttpLocust):
host = "https://docs.locust.io"
wait_time = constant(5)
class task_set(TaskSet):
@task
def dual_greenlet_task(self):
def do_http_request_or_whatever():
print("yay, running in separate greenlet")
response = self.client.get("/")
print("status code:", response.status_code)
pool = Pool()
pool.spawn(do_http_request_or_whatever)
pool.spawn(do_http_request_or_whatever)
pool.join()
Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python async
any time soon. There's been discussion about it in the gevent project: https://github.com/gevent/gevent/issues/982
Alright thanks for the explanation, example and link 🙂
Is there any reasons to not switch to asyncio completely?
The main reason is that it would be a ton of work to rewrite everything. Apart from that I think it is a great idea :)
You can achieve this by spawning greenlets within your locust tasks. Here's a small example:
from gevent.pool import Pool from locust import HttpLocust, TaskSet, task, constant class MyLocust(HttpLocust): host = "https://docs.locust.io" wait_time = constant(5) class task_set(TaskSet): @task def dual_greenlet_task(self): def do_http_request_or_whatever(): print("yay, running in separate greenlet") response = self.client.get("/") print("status code:", response.status_code) pool = Pool() pool.spawn(do_http_request_or_whatever) pool.spawn(do_http_request_or_whatever) pool.join()
Locust is heavily reliant on Gevent, and as far as I know gevent and python async are not 100% compatible. Therefore I don't see locust supporting python
async
any time soon. There's been discussion about it in the gevent project: gevent/gevent#982
AFAIK using different threads to make many parallel requests with requests.Session
is not good idea because requests.Session
is not thread safe (https://github.com/psf/requests/issues/1871). You need to use other client to do this
AFAIK using different threads to make many parallel requests with requests.Session is not good idea because requests.Session is not thread safe (psf/requests#1871). You need to use other client to do this
From the description of that issue it sounds like it should only be a problem if you're using the same client to connect to many different hosts. In that case one could create another HttpSession
instance.
@heyman in my case it happens when I'm executing many requests on the same host (as in your example).
@heyman sorry about confusion, looks like it was my mistake. Everything is ok with requests.Session
Do we support Asyncio in locust task now?
hey guys, maintainers, theoretically, do you see any potential issues with using asgiref project to execute async functions as sync ones ?
Havent tried it, would love to see a proof of concept!
Most asyncio wrappers have issues coexisting with gevent though.. but if it works then it would be awesome!
gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent
gevent now has an asyncio version, so that should make integration easier: https://github.com/gfmio/asyncio-gevent
Unfortunately that repo hasnt been updated in two years, so I'm not sure if it works any more.
I think it would be doable to implement an asyncio version of the locust worker, but it'll take some work.
Is there a more concrete example of calling ayncio coroutine inside a Locust gevent task? Sorry but it's not immediately obvious after reading the code examples above (where I can't find where asyncio coroutines are called...)
Asyncio is not supported right now.
We're planning to build a completely asyncio-based worker implementation, but unless theres a simple way to allow gevent and asyncio to coexist (like asyncio-gevent seemed to promise before it went silent) it might be a while.
In the event that someone wants to take it upon themselves to reimplement the worker on asyncio themselves, it would be most welcome :)
Thanks @cyberw for your prompt reply. After days of struggle, I finally found a way to make asyncio work in Locust. Here is a working example. The basic idea is to create a dedicated thread with the asyncio event loop in each process (since Locust can start multiple workers on multiple CPU cores that run in separate processes), and define a lambda function that takes a coroutine and its parameters as the argument, and dynamically run it in the event loop in the process that it belongs.
import asyncio
import logging
import gevent
import os
import threading
import time
from locust import User, task, between, events
import logging
import threading
from typing import Any, Callable, Awaitable
# Set the logging level for the root logger to WARNING to suppress INFO messages
logging.getLogger("root").setLevel(logging.WARNING)
def thread_func(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
"""Run asyncio coroutine in the current event loop to make gevent work."""
logging.debug(f'In thread_func()')
try:
assert loop.is_running(), "Asyncio event loop is not running!"
future = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), loop)
event = gevent.event.Event()
future.add_done_callback(lambda _: event.set())
event.wait()
return future.result(timeout=3)
except TimeoutError as te:
logging.exception(f'TimeoutError: {te}')
future.cancel()
raise te
except RuntimeError as rte:
logging.exception(f'RuntimeError: {rte}')
raise rte
except Exception as e:
logging.exception(f'Other Exception: {e}')
raise e
def run_asyncio(loop, coro: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> Any:
logging.debug(f'In run_asyncio()')
return thread_func(loop, coro, *args, **kwargs)
async def async_foo(param1, param2):
"""asyncio coroutine function to be tested"""
...
class AsyncioInLocustTest(User):
shortest_secs, longest_secs = 0.1, 1
wait_time = between(shortest_secs, longest_secs)
# Class-level variables to track process initialization
shared_loop = None
shared_thread = None
initialized_pid = None # Store the PID of the process where initialization has occurred
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Detect newly forked processes
current_pid = os.getpid()
if OSSLoadTest.initialized_pid != current_pid:
# Run initialization for the current process
self.init_process_resources(current_pid)
# Use the shared event loop and thread
self.loop = OSSLoadTest.shared_loop
@classmethod
def init_process_resources(cls, current_pid):
"""Initialize resources once for each new process."""
cls.initialized_pid = current_pid # Mark the process as initialized
print(f"Initializing resources for process PID: {current_pid}")
# Create a shared asyncio event loop and thread for this process
cls.shared_loop = asyncio.new_event_loop()
cls.shared_thread = threading.Thread(target=cls.shared_loop.run_forever, daemon=True)
cls.shared_thread.start()
@task
def asyncio_test(self):
start_time = time.time()
try:
run_asyncio(self.loop, async_foo, foo_param1, foo_param2)
# Fire success event
events.request.fire(
request_type="foo",
name="bar",
response_time=int((time.time() - start_time) * 1000),
response_length=0,
exception=None
)
except Exception as e:
# Fire failure event
events.request.fire(
request_type="foo",
name="bar",
response_time=int((time.time() - start_time) * 1000),
response_length=0,
exception=e
)
raise e
Follow up of #924 and #1079.
Is your feature request related to a problem? Please describe.
Sometimes one needs to simulate an asynchronous user behavior. Hatching more users does not solve the problem.
Describe the solution you'd like
Support defining
async
tasks or something similar.Describe alternatives you've considered
One could also write a custom client. Maybe this would solve the problem. It would be great to collect examples or snippets on how to do so in this issue, and add them in https://docs.locust.io/en/stable/testing-other-systems.html or another doc page.