Closed jfhbrook closed 2 months ago
Likely in this case there would be no special error handling, since celery doesn't by default try to do anything smart with errors either.
RQ is a little weirder:
from pyee import RQEventEmitter
# Sets a (name, BaseEventEmitter()) key/val pair in a
# shared dict namespace. The worker and the
# client have to agree on these names, so no
# autogeneration on the fly unfortunately.
ee = RQEventEmitter(name='a_unique_deterministic_name')
# Registers the event with the underlying base EE
@ee.on('data')
def handler(data):
print(data)
Then you spin up your rq worker:
rq worker
and then call the EE in your app
from redis import Redis
from rq import Queue
from sample_app import ee
# You only need to set up a queue object if you're emitting,
# so in this example we do it after ee is assembled
ee.queue = Queue(connection=Redis())
# Enqueues a global event handler function (pyee._rq.emit or
# something) with the name of the EE, the name of the event,
# and the payload. On the worker, the underlying base EE gets
# called.
ee.emit('data', dict(some='data'))
RQ seems to allow returning results but doesn't require us to do so.
Error handling is a little involved: http://python-rq.org/docs/exceptions/ I think the best bet is going to be to have no special behavior, meaning that because everything is running synchronously by default raised exceptions will trigger the standard DLQ behavior.
Fun fact, Celery supports not just rabbit but also redis SQS and *checks notes* zookeeper
http://docs.celeryproject.org/en/latest/getting-started/brokers/
One edge case here is what happens if a worker emits an event on its own emitter.
I would hope that celery handles this - that if you call app.handler on the worker it gets inserted into the queue like everything else. I would hope!
I think RQ doesn't have any special handling here, but I think we can say that if the queue is set then we submit to the queue, and otherwise either raise or emit on the underlying (configurable?).
Over the years, I've reached the conclusion that the EventEmitter API has a lot of weaknesses as compared to alternative APIs, such as futures, coroutines and channels. For example, it's not type safe and errors regularly lose their context. Moreover, the fact that error handling diverges so wildly between subclasses is itself indicative of a leaky abstraction.
Meanwhile, the Celery and RQ abstractions are tailor-fit for their use cases. Any adopting to pyee
I'd make would necessarily lose fidelity - after all, it has to adapt to the behavior of message queues, which are pretty different from fire-and-forget events.
I've come to see pyee less as a good idea or an itch that needs scratching, and more as a tool for when you need or want to use Node-like abstractions. For instance, it would be appropriate if you were doing a straight port of some Node.js code, and it would be defensible if you were a Node.js developer finding themselves straying temporarily into Python. However, I would not recommend introducing pyee
as a layer of abstraction into a Python project otherwise - which would certainly be the case with a Celery or RQ backend.
Part of me thinks it would be fun to implement this feature. But given I think actually using it would be a bad idea, I think it would be best to close this issue.
6.0.0 has a way of supporting fairly arbitrary backends. That means that we could make an event emitter that can hook onto a celery app. Maybe like:
then you can spin up workers like normal:
and emit from your main program:
See: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application