Closed reith closed 4 months ago
In workflow tests, in which I merely mock activities, I don't really need to run multiple workers, one per task queue
It should be harmless to do so and it's helpful; to possibly get errors about misconfigured task queue name and such during tests
My current approach is to subclass my workflow in the test suite, override task queues names and forward run to the actual superclass but I don't like it because I have to deal with workflow and data validation for my dummy workflow that I only added for tests.
The best approach is to just run as many workers as you need in tests. But if you are sure you don't want to run multiple workers in tests, consider using workflow input to drive the task queue name instead of subclassing.
Note that, there is no straightforward way to run multiple workers in test suites, or I haven't found it yet.
Running multiple workers is no different in tests or production. The library code is intentionally the same.
The bellow code: errors activity function activity_b is not registered on this worker. I can run the other worker in a separate thread though.
This sounds like you are trying to call activity_b
in your workflow from queue_a
.
testing.WorkflowEnvironment can provide a with_overriden_task_queue which modifies the client to run all workflows in a single queue.
The environment is unrelated to the workers you start on it. It has no knowledge of workers, it is simply a built-in Temporal server. Some people use the environment for
support nested Workers() (above code)
These are supported. If it is not working there must be something else incorrect such as improperly calling an activity on a task queue worker that it is not registered on.
provide a helper, WorkerGroup, that can act as an async context provider for multiple workers.
This is easy enough for anyone to write. You don't have to nest, you can just use run
, e.g. maybe something like:
workers = [
Worker(task_queue=queue_a, activities=[activity_a], workflows=[WorkflowA]),
Worker(task_queue=queue_b, activities=[activity_b], workflows=[WorkflowB]),
Worker(task_queue=queue_c, activities=[activity_c], workflows=[WorkflowC])
]
async with asyncio.TaskGroup() as tg:
worker_tasks = [tg.create_task(w.run()) for w in workers]
# Do stuff
for w in worker_tasks:
w.cancel()
That is untested (just typed here in GitHub), but the idea is there. Use Python asyncio utilities like task group or gather
to run multiple things concurrently (but nested is fine too like your previous code). Feel free to come join us on Slack in #python-sdk
to discuss general test questions/approaches.
Thanks @cretz for the detailed response.
This sounds like you are trying to call
activity_b
in your workflow fromqueue_a
.
This is the intended usage. I run my workflow in queue_a,
along with some activities, but that workflow also executes some specific activities in task queue queue_b
.
This is easy enough for anyone to write. You don't have to nest, you can just use run, e.g. maybe something like.
I'll try this. Thanks!
Is your feature request related to a problem? Please describe.
I have a workflow that runs activities in an explicitly set task queue. One reason for this is that I have two different task queues but the issue affects any workflows that set task queues explicitly. In workflow tests, in which I merely mock activities, I don't really need to run multiple workers, one per task queue. My current approach is to subclass my workflow in the test suite, override task queues names and forward
run
to the actual superclass but I don't like it because I have to deal with workflow and data validation for my dummy workflow that I only added for tests.Note that, there is no straightforward way to run multiple workers in test suites, or I haven't found it yet. The bellow code:
errors _activity function activityb is not registered on this worker. I can run the other worker in a separate thread though.
Describe the solution you'd like
I can think of some solutions:
testing.WorkflowEnvironment
can provide awith_overriden_task_queue
which modifies the client to run all workflows in a single queue.WorkerGroup
, that can act as an async context provider for multiple workers.