Open funkybob opened 4 years ago
I would be for merging something like this assuming you are (or someone else is) willing to do the work here. It seems to me like the best route would be to add the notion of a registry, add a way to enable registry mode (eg. dramatiq.enable_registry()
) and make actor
and get_broker
aware of that mode.
Rather than making brokers registry-aware, you'd probably want to add an explicit way to bind the actors in a registry to a broker. That way no changes will be required to any of the brokers that aren't a part of dramatiq (like the PG broker that you use or the SQS broker).
Makes sense.
In that case
actor
decoratorThis allows us to declare tasks in advance, and still get middleware parameter checks, albeit delayed until the registry is bound to the Broker.
Additionally, I could get fancy and teach the Registry to proxy all attributes on to its attached Broker, so when declaring a Task it can pass itself as the broker.
Just for the reference, this is a quick hack which works in my projects:
class ActorCollector:
def __init__(self):
self.actors = [] # type: List[dramatiq.Actor]
def declare_actor(self, actor: dramatiq.Actor):
self.actors.append(actor)
def actor(
self,
fn=None,
*,
actor_class=dramatiq.Actor,
actor_name=None,
queue_name="default",
priority=0,
**options
):
def decorator(fn):
nonlocal actor_name
actor_name = actor_name or fn.__name__
return actor_class(
fn,
actor_name=actor_name,
queue_name=queue_name,
priority=priority,
broker=self,
options=options,
)
if fn is None:
return decorator
return decorator(fn)
def enqueue(self, message, *, delay=None):
raise RuntimeError(
"ActorCollector has not transferred its actors to an actual broker. "
"Ensure transfer_actors() is called."
)
def transfer_actors(self, broker: dramatiq.Broker = None):
broker = broker or dramatiq.get_broker()
for a in self.actors:
a.broker = broker
broker.declare_actor(a)
def transfer_actors(broker, collectors: List[ActorCollector]):
for ac in collectors:
ac.transfer_actors(broker)
To be used like this:
# a_module.py
...
collector = ActorCollector()
@collector.actor(
)
def an_actor():
...
# another_module.py
broker = SomeBroker()
transfer_actors(
broker,
[
package_a.actors.collector,
package_b.worker.actors.collector,
package_c.worker.actors.collector,
],
)
I also have this issue, presently I just want to disconnect the declare_actor
call from the Actor
__init__
by wrapping the Broker in a subclass that doesn't call the parent's declare_actor
method if the broker shouldn't process those actors.
I don't really like this because it breaks the behaviour of the constructor, but it gets the job done. If you're open to it, I'd suggest moving the declare_actor
call into the decorator instead to keep the "batteries included" feeling.
I spent some more time on this today looking through the change I mentioned but due to the cyclical relationship between broker, middleware, actor, and worker I discovered it would be impossible to do it without needing to release a package that breaks backwards compatibility, and would require way too much re-work.
In the meanwhile, I found a work-around that gives me much of the functionality I want with a minor downside.
I like to create multiple workers in production for various task.py
files. Different tasks in the system require different system resources. For example, one might need a lot of memory and CPU for a task that runs 100x per day, but another will need minimal CPU but runs 50 000x per day. Ideally I can just run dramatiq and target it at each task.py, but if one task ever needs to enqueue something from the other file I end up consuming that work queue in the wrong worker the moment a worker thread is restarted.
But a simple enough work-around for me is to use the -Q
flag from the cli and manually specify each queue I want to use. This has some downsides like doing a tiny bit of extra work, and is prone to simple human error.
For my use-case I no longer need this work-around, but it would be interesting if there was a way to define something like "only consume for actors in this file/list" just to reduce likelihood someone on my project miss-types a queue name.
Feature Request
For reference, this would help here
Currently there is no way to declare a Task without having a Broker instance.
This can lead to sequencing issues in code launch.
Potentially, having a Task Registry you can bind a Broker to could alleviate this issue.
Benefits
It would still allow for multiple brokers, but also detach task-groupings (per Registry) from Broker configurations.
Importing modules declaring tasks would not longer implicitly create a Broker instance.
Issues
Currently when a task is registered, the params passed to it are checked against those supported by the Broker and its plugins.
This work would have to be delayed until Broker binding, potentially hiding configuration issues until later in the process.
Ideal solution
An ideal solution would allow for the current behavior where not explicitly listing a registry would bind a task to the default broker.
Potentially Brokers could present a Registry interface, so this behavior would simply be a pass-through, also allowing the default broker to be passed when no registry is specified.
Funding