Open TarasLevelUp opened 9 years ago
I did look at the example, that uses loop.set_exception_handler(connection_lost_handler)
, but I think that is a horrible solution. We really should have a normal callback (ex. in the connection)
@TarasLevelUp Thanks for getting in touch. asynqp
is definitely still somewhat deficient in the area of error handling. Would your proposed API look something like this?
def f(exc):
// custom error handling logic
print(exc)
connection.on_connection_lost(f)
Looks good for me, for start at least. It is consistent with the other parts of the API.
But above this low-level
API how about we add a higher-level
one, that will contain features like:
yield from
the queue to get single element), that could raise normal DisconnectError.I can make a prototype for that.
The problem is that the server can terminate the connection at any time, not necessarily while you're waiting to get an item from a queue. And if you're using consume
instead of get
then you won't even be waiting; there's no coroutine to throw an exception into.
I do agree that attempting to use a connection which we know to be closed should throw an error (and I think in some cases it already does), and that any running consumers should be cancelled. Some of that functionality already works, I think.
The problem is that the server can terminate the connection at any time, not necessarily while you're waiting to get an item from a queue. And if you're using consume instead of get then you won't even be waiting; there's no coroutine to throw an exception into.
Yes, I do know that, that is why the higher-level
API for consume is needed, so we have a coroutine to throw an exception to. The only way to go with consume
is to implement the on_connection_lost
callback.
I do agree that attempting to use a connection which we know to be closed should throw an error (and I think in some cases it already does), and that any running consumers should be cancelled. Some of that functionality already works, I think.
Yes, I did see this behaviour, but again we really should create a separate Exception for Disconnected cases, so ppl can except just that.
@TarasLevelUp I think you're right. So I think this work would involve a few things:
consume
API to make it possible to communicate an error to a consumer. I think the object you pass to consume
would have a signature which looks something like this:class MyConsumer:
def __call__(self, msg):
pass
def error(self, exception):
pass
It should also continue to work with plain functions for the sake of simplicity (if you don't need error handling it should be easy to use a function) and backwards compatibility.
consume
) which caches messages in a queue, so clients can yield from
it. When the consumer errors, the exception is thrown into the waiting coroutines.on_connection_lost
method for Connection
, which takes a callback which will be invoked when the connection is closed.I won't really have time to work on this for a little while, though I would accept a pull request.
Wow, didn't expect such TestSuite... It's hard to read all the cases with so much OOP involved. Kinda expected simple Python TestCase's. It will take some time to hack into those =)
@TarasLevelUp Does the API defined in the docstring in my latest commit look like what you had in mind?
Hi, sorry for long feedback, could not find enough time. I think this interface is quite hard to use as is. Asyncio libraries aim to give a "synchronous" instead of "callback" one. Callbacks should be possible, but for most users it's good to hide it behind a simple synchronous interface. I see working with the consumer as follows:
queue = yield from connect_and_declare_queue()
consumer = queue.consume(no_ack=True)
while True:
try:
msg = yield from consumer.get()
except ConnectionLostError:
print("Connection lost.")
# Do reconnect here or exit application...
# Process message
I do understand, that this can be achieved by using queue.get
, but that one is less efficient.
In case above I presume, that there will be another consumer if we don't pass in a callback.
My thought was that you'd be able to pass in an object exposing a yield from
-based interface. Something like this:
class QueuingConsumer:
def __init__(self):
self.q = asyncio.Queue()
self.exc = None
def __call__(self, msg):
self.q.put_nowait(msg)
def onerror(self, exc):
self.exc = exc
def get(self):
if self.exc is not None:
raise self.exc
yield from self.q.get()
queue = yield from connect_and_declare_queue()
qc = QueuingConsumer()
queue.consume(qc)
yield from qc.get()
This API is more general than your proposal because it allows clients to choose a strategy for consuming messages and handling errors.
1) on_cancel
- this will lead to confusion, as you would expect https://www.rabbitmq.com/consumer-cancel.html from this function. As it is implemented now I can't find a use case, as you will always have to cancel the consumer yourself, in the same loop for it to fire.
2) on_error
- this is confusing for me. Is there any other error, that can be passed, except for a disconnect? If there aren't any, they can use the described above on_connection_lost
callback to do the same, can't they. I think it's a more flexible to have disconnect callback in connection class.
It is quite cheap to implement the QueueConsumer on the library side, so why not?
1) From the article you linked:
channel.queueDeclare(queue, false, true, false, null);
Consumer consumer = new QueueingConsumer(channel) {
@Override
public void handleCancel(String consumerTag) throws IOException {
// consumer has been cancelled unexpectedly
}
};
channel.basicConsume(queue, consumer);
This is more-or-less exactly the API I'm proposing.
2) on_error
would be called in addition to an on_connection_lost
callback in the Connection
class.
3) It may be easy to implement, but public APIs require thought and careful design. I want something which fulfils as many use-cases as possible while maintaining compatibility. I'm not against providing the above QueuingConsumer
as a useful class in the library, but I want users to be able to write it themselves if they need something specialised.
1) We should add the API after we add support for Rabbit cancel notification. Because it's so similar it can be confused for a full implementation. Currently you only call it if cancelled by client himself.
How about we add a QueuingConsumer
, that will be default behaviour when callback
is not passed to queue.consume
. And if we want, we can pass a consumer
instance to queue.consume
, that can have an on_error
callback. I really don't want the user to create QueuingConsumer
object himself. This looks bad to me:
from asynqp import QueuingConsumer
queue = yield from connect_and_declare_queue()
qc = QueuingConsumer()
queue.consume(qc)
yield from qc.get()
It looks pretty idiomatic to me. What don't you like about it in particular?
This one's easier, isn't it:
queue = yield from connect_and_declare_queue()
qc = queue.consume(no_ack=True)
yield from qc.get()
And I don't want to expose the class as part of Public API. Reasons: 1) I want it to evolve into:
async def start():
queue = await connect_and_declare_queue()
async for msg in queue.consume(no_ack=True):
# Process message
in Python3.5 2) Maybe add auto-reconnect later.
So I want a simple API, that will not break backward compatibility later. If we give a class, ppl will subclass it and might get things broken after next release.
I'm +1 to this ticket. I agree that throwing exceptions to eventloop is pretty ugly.
I've looked through yours suggestions about interface, and I have something to offer too.
error_calback
as parameter:def consume(callback, *, error_calback=None, no_local=False, no_ack=False, exclusive=False, arguments=None)
IncomingMessage
object to current callback
(breaks compatibility with older callback handlers).Honestly, I still prefer the Consumer
class.
queue = yield from connect_and_declare_queue()
qc = queue.consume(no_ack=True)
yield from qc.get()
async def start():
queue = await connect_and_declare_queue()
async for msg in queue.consume(no_ack=True):
# Process message
With both of these proposals, the interface of consume
depends on the arguments you passed in to it. This makes for an inconsistent API. In the first example, if you pass no callback then it returns you something from which you can get
messages; if you do pass a callback then you just get something you can cancel (but calling get
on that object doesn't make sense). The second proposal is even stranger - if you don't pass in a callback then you get an async iterable back. (How do you cancel
such a consumer?)
With the QueuingConsumer
, consume
always takes a callback and always returns something you can use to cancel
it; you can optionally turn on extra features such as error handling by adding methods to the callback object.
def consume(callback, *, error_callback=None, no_local=False, no_ack=False, exclusive=False, arguments=None)
This is not a bad idea; it's roughly isomorphic to the Consumer
object I'm proposing. (Your proposal is to pass multiple functions; in the other option you effectively pass a record of functions.) How do we decide between the two? The Consumer
object works best when the on_error
callback naturally belongs as part of the same object as the message callback. The error_callback
proposal works best when the two functions are unrelated. Which of these two use-cases is the more common?
Note that you can fairly straightforwardly convert between the two interfaces:
# one way
class ConsumerAdapter: # or something
def __init__(self, callback, error_callback):
self._callback = callback
self._error_callback = error_callback
def __call__(self, *args, **kwargs):
return self._callback(*args, **kwargs)
def on_error(self, *args, **kwargs):
return self._error_callback(*args, **kwargs)
queue.consume(ConsumerAdapter(my_callback, my_error_callback))
# the other way
queue.consume(my_consumer, error_callback=my_consumer.on_error)
So the question is, which do we want to be more awkward - the case when your two callbacks are unrelated (you have to wrap them in an object) or the case when your two callbacks naturally form an object (you have to unpack the methods and pass them in)?
I suppose there are some secondary concerns - how many arguments to consume
is too many? (If we're going to end up with like 10 different callback arguments then the argument list of consume
will be unmanageable.) How likely is it that people will want to subclass a given implementation of Consumer
?
Intuition tells me that the Consumer
object is the way to go here but I'm open to other arguments in favour of the error_callback
idea.
Send exception object instead of
IncomingMessage
object to current callback (breaks compatibility with older callback handlers).
I don't like this because it requires users to do a run-time type-test of the callback's argument every time. This code will be duplicated in every consumer:
def my_consumer(x):
if isinstance(x, Exception):
# handle the error
else:
# process the message
Hi again.
@anton-ryzhov Thanks for your interest.
@benjamin-hodgson What I don't like is the callback ideology of the consumer. I do understand, that all libraries on AMQP do that, but it's not asyncio
way of handling things. Look at aiozmq
, aiopg
, aiohttp
libraries - they don't operate on callback's. Asyncio was written to allow asynchronous code to be written in a synchronous way. What I would like to do is to add synchronous behaviour to consuming.
I understand, that people are using the library and we don't want to break that, but I think working with something that can raise
an exception is way better than working with 2 or more callbacks: on_error
, on_cancel
, callback
.
About your concerns on interface of consume
- how about the synchronous consumer will be a separate coroutine, say create_consumer
.
As for the 2nd proposal I meant to say:
async def start():
queue = await connect_and_declare_queue()
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer:
# Process message
Missed the await call. It's the same interface as with .get()
, just added async for
support.
What is good about this behaviour is that we will have normal python exceptions propagated through code. Lets say:
@asyncio.coroutine
def consume(connection):
queue = yield from declare_queue(connection)
consumer = yield from queue.create_consumer(no_ack=True)
while True:
try:
msg = yield from consumer.get()
except asynqp.Cancelled:
break
# Process message
# Somewhere else in code
consumer.cancel()
connection = yield from connect()
while True:
try:
yield from consume(connection)
except asynqp.Disconnected:
connection = yield from reconnect()
# We can finalize other code here, as we clearly know we are finished here.
How do you implement a clean code for reconnect with callback's? I come up with:
@asyncio.coroutine
def process_message(msg):
# Process message
pass
@asyncio.coroutine
def on_error(error):
if isinstance(error, asynqp.Disconnected):
connection = yield from reconnect()
asyncio.async(consume(connection))
else:
# We can finalize other code here, I think...
@asyncio.coroutine
def consume(connection):
queue = yield from declare_queue(connection)
# Lets even say we can pass coroutines in the ConsumingAdapter
consumer = yield from queue.consume(
ConsumingAdapter(process_message, on_error), no_ack=True)
# Somewhere else in code
consumer.cancel()
connection = yield from connect()
asyncio.async(consume(connection))
About usecases of async for
- I think there are a lot of cases where you don't need to call cancel
from another block of code. It's ok to let it cancel after you leave the async for
block. It's not that hard to do (say with __del__
method which works good in 3.5+). We can always cancel the coroutine also:
async def start():
queue = await connect_and_declare_queue()
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer:
# Process message
consuming_task = asyncio.async(start())
# Somewhere else
consuming_task.cancel() # We cancel the task here, not our consumer
And we can also leave the cancel
method if that is a concern for the interface, or if you really hate to use __del__
for closing the consumer:
async def start():
queue = await connect_and_declare_queue()
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
# Somewhere else in anothe coroutine
await consumer.cancel()
We can write the same use case in Python3.4 using get()
function. It is the same proposal as I said above:
@asyncio.coroutine
def start():
queue = yield from connect_and_declare_queue()
consumer = yield from queue.create_consumer(no_ack=True)
while True:
try:
msg = yield from consumer.get()
except asynqp.Cancelled:
break
# Process message
# Somewhere else in code
yield from consumer.cancel()
@benjamin-hodgson
The error_callback proposal works best when the two functions are unrelated. Which of these two use-cases is the more common?
I's pretty common usecase — callback handles messages, one handler per consumer; and one error handler per connection which tries to reconnect.
# one way
class ConsumerAdapter: # or something
It's so big to reimplement it for every consumer. I do prefer such thing left inside library, not as its interface.
queue.consume(ConsumerAdapter(my_callback, my_error_callback))
It doesn't change anything except typing extra characters to type.
@TarasLevelUp
I'm +1 for asyncio
callback-less style, raising better than bunch of callbacks.
@anton-ryzhov I agree that ConsumerAdapter
should be provided as part of the library - the question is which of the two APIs is more convenient? It seems usual to me that the message callback and the error callback might need to access the same resources, so putting them in a class seems natural. On the other hand, it's quite trivial (though maybe not obvious) to convert an object into a set of callbacks; going the other way requires some boilerplate which I'd have to maintain.
It's not a binary choice between callbacks and async
. It's trivial to convert between the two styles, so at its core the choice we make is arbitrary. Do you want to create an object, pass it in, and async for
through that, or do you want to async for
over subscribe
directly?
Again, it's a question of which is more convenient. Both options seem quite natural to me, though I do agree that async for msg in subscribe(...)
wins on simplicity. On the other hand, given that we're targeting Python 3.4+ (not 3.5 only), and we want to maintain backwards compatibility as much as is reasonable, the callback style has its own advantages.
Ultimately, what I value is a simple and consistent API. By 'consistent' I mean consistency within the API, consistency between versions of asynqp
and consistency between versions of Python. I don't mind doing extra work in the library to achieve that goal. It's just not clear to me which solution maximises both the 'simple' and 'consistent' constraints. Your collective feedback is very helpful to me in that respect.
Also, @TarasLevelUp:
It's ok to let it cancel after you leave the
async for
block.
You will never leave the async for
block unless you cancel the consumer. A consumer will keep receiving messages for ever, waiting an indefinite amount of time between each one. Hooking into the cancellation of the Task
by overriding __del__
seems like a recipe for disaster to me... When a finaliser gets called is implementation-specific; clients of classes with finalisers inevitably run into unexpected problems; and in any case cancelling the Task
is not the same as cancelling the consumer.
Looking at your suggested alternative API:
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
# Somewhere else in another coroutine
await consumer.cancel()
To me, this scarcely looks different than what I'm proposing:
consumer = QueuingConsumer()
await queue.consume(consumer)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
# Somewhere else in another coroutine
await consumer.cancel()
... but I believe my proposal to be more consistent, as outlined above.
@benjamin-hodgson
ConsumerAdapter
should be provided as part of the library… the message callback and the error callback… putting them in a class seems natural.
Am I right, I (library user) have to put it together by myself. Create some class with handlers, instantiate it with some external links to my application; and then send it to aiomysql
s API. Too much extra work, too much extra things to create. It stinks like Java)
the message callback and the error callback might need to access the same resources
But what if not the same? Users will have to join into one class non-linked things.
On the other hand, given that we're targeting Python 3.4+ (not 3.5 only), and we want to maintain backwards compatibility as much as is reasonable, the callback style has its own advantages.
What problems do you see with 3.4 compatibility? All async
/await
stuff is just syntax sugar around yield from
things. And library has to support yield from
style without async
/await
; but keeping in mind that it can be used with 3.5.
3.4 users have to implement syntax sugar until they upgrade to 3.5. And this upgrade will be more easily if they have to fix only syntax of some constructions, not the whole logic.
... but I believe my proposal to be more consistent, as outlined above. I do see only difference in this examples that second one makes me to import
QueuingConsumer
from somewhere and implement it somehow; and first one makes everything for me. I'm choosing simplest way.
Talking about finalization and task cancellation, I believe it should be used like that:
try:
consumer = await queue.create_consumer(no_ack=True)
async for msg in consumer: # This will just stop after consumer is cancelled
# Process message
finally:
await consumer.cancel()
# Somewhere else in another coroutine
await consumer.cancel() # will naturally stop iteration, finally will do nothing
# or
coro.cancel() # will raise CancelledError inside coro, finally will do cleanup
I (library user) have to put it together by myself. Create some class with handlers, instantiate it with some external links to my application; and then send it to aiomysqls API. Too much extra work, too much extra things to create.
You have to write that code somewhere.
But what if not the same? Users will have to join into one class non-linked things.
I've already explained that it's straightforward for asynqp
to provide an adapter class. The two proposals are entirely equivalent in terms of power; it's just a question of convenience and natural-ness. Which is more convenient more of the time?
All
async
/await
stuff is just syntax sugar aroundyield from
things.
At its core, yes, but you don't get things like async for
for free. I'd have to implement __aiter__
and __anext__
alongside a 3.4-compatible version of the same API.
I'm choosing simplest way.
An idea which seems simple can often have unexpected consequences or interactions with other parts of a system. I've already explained why I am concerned about inconsistency.
Talking about finalization and task cancellation
Your try/finally idea looks OK to me. What are the consequences of forgetting to use a try/finally
? You'll end up with an un-cancelled consumer that does nothing upon receiving a message, though I suppose you'd probably be able to end up in that situation however you slice it.
What are the consequences of forgetting to use a
try/finally
It doesn't make worse current callback implementation, there is much more simpler to forget about cancellation.
I feel like we're not making much progress here. To achieve some clarity, let me enumerate some of the proposals and my thoughts on the pros and cons of each one, with regards to the common use case of queuing up messages in memory and getting them "synchronously"
on_error
parameter to consume
def my_consumer(msg):
...
def my_error_callback(exc):
...
queue.consume(my_consumer, error_callback=my_error_callback)
Pros:
error_callback
and my_consumer
don't naturally form an objectCons:
error_callback
- then there's no way to be notified of an errorQueuingConsumer
.)
QueuingConsumer
, pass in the methods individually, then yield from
the thing you just passed in.on_error
method on callback objectclass MyConsumer:
def __call__(self, msg):
...
def on_error(self, exc):
...
c = MyConsumer()
queue.consume(c)
# then...
yield from c.get()
asynqp
would provide some standard consumer classes like QueuingConsumer
and ConsumerAdapter
.
Pros:
QueuingConsumer
would be simple to use - just create it and pass it in, rather than passing in the methods separately.)yield from
or async for
consuming strategy (eg QueuingConsumer
)QueuingConsumer
- they will be thrown into your coroutine.Cons:
__call__
method on consumer object not obvious
__call__
, use a different name such as on_msg
.on_msg
or __call__
in the implementation of consume
, but we'd have to set up rules of what takes precedence and it feels a little weirdqueue.consume(ConsumerAdapter(my_callback, my_error_handler)
)QueuingConsumer
yield from
the thing you pass in, but cancel
the thing you get back.consume
creates a consumer object when you don't pass in a callbackc = MyConsumer()
queue.consume(c)
# or...
c = queue.consume()
# then...
yield from c.get()
Under this proposal, calling consume
without a callback argument would create and return a QueuingConsumer
Pros:
QueuingConsumer
.Cons:
consume
method is not stable, as outlined above
consume
always creates a QueuingConsumer
- no callbacksThis is effectively proposal 3 without the option of passing in a callback. This solves the issue of API consistency.
c = queue.consume()
# then...
yield from c.get()
Pros:
cancel
the consumer: async for msg in queue.consume()
.Cons:
QueuingConsumer
?)None of these options preclude an async for
-based API. The first two build it out of a lower-level callback-based API; the latter two build it in.
In my opinion, Option 2 strikes the best balance, even though it's a bit strange to cancel
a different object than the one you yield from
. Option 4 is a strong contender too, but it has problems with backwards compatibility and has no provisions for customisation; there's no way to drop down to a lower level if you need to do something unusual.
Do you have any more suggestions, or can you think of any other pros and cons of these options? If not, then consider the decision made.
Hello again The 4 cases above don't cover my last proposal, so let me describe it here in the same manner as the above:
consume
as backward-compatible function and introduce create_consumer
without callbacksThis is effectively proposal 4, but we leave consume
. This solves the issue of backward incompatibility issue.
c = queue.create_consumer()
# then...
yield from c.get()
Pros:
consume
if we think that is appropriate.c.close()
c.close()
before we actually started a consumerconsume
. Even now it is very confusing.Cons:
callback
style can not make their own QueuingConsumer
by subclassing one.unusual
Now in another comment I will describe why do I care so much about it. I tried to implement proposal 2: this commit Code is quite simple, but I have big problems with the documentation:
consume
a consumer
. With this in mind it is hard to explain why do we pass in a QueuedConsumer
and receive another consumer
as a result. Moreover those are different objects, and the other one is used to cancel
only.QueuedConsumer
requires a loop to create tasks. So the example is some more extensive:consumer = QueuedConsumer(loop=loop)
handle = yield from self.queue.consume(consumer, no_ack=True)
msg = yield from consumer.get()
# Somewhere else
yield from handle.close()
lower-level
of proposal 2 is hard to express. If we give QueuedConsumer
as an example, it's code is hard for users. Look for yourselves in the commit above, and that is just a 1 hour work implementation, I think it will be even bigger once we do a speed optimisation and go through all cases.I think, that by trying to combine 2 interfaces into 1 we will have problems for users to understand how to use it. It's quite easy to describe it with proposal 5, as it is a new, simple, straight-forward method without class inheritance, callbacks or adapters.
there's no way to drop down to a lower level if you need to do something unusual.
@benjamin-hodgson Why would you want to do something unusual? Any examples of use-cases. I can't come up with any (.
I would be amenable to implementing create_consumer
in addition to a lower-level API. Then create_consumer
would basically be a convenience function that creates, subscribes and returns the consumer.
I'm also up for renaming the existing Consumer
task to (something like) CancellationToken
. I don't expect this to result in any broken code.
If we do make 'create_consumer', you understand, that it will return 1 object, not 2. So cancelation will be as in proposal 5.
It can just return 2 objects.
cancellation_token, consumer = yield from queue.create_consumer(...)
I'm +1 for fifth proposal.
I didn't get what CancellationToken
is and why it should return 2 objects.
I remind you that throwing exceptions inside connection_lost
is ugly. I expect library shouldn't do it at least if some new API will be used.
It can just return 2 objects.
cancellation_token, consumer = yield from queue.create_consumer(...)
Why do you want create_consumer
to be built on top of consume
, it's ugly to return 2 objects when you expect 1. What I want from a library is a simple API, and I think most will be up for it too.
Implementation is simple if we don't set QueuedConsumer
as public API. And I will repeat myself I think it's a good idea to NOT put it as part of public API, it's too complex to be used as an example, and will be getting more complex in the future.
Made some progress on the issue in my branch , and now I can see another reason to have a separate API for QueuedConsumer:
The logic for no_ack=True
and no_ack=False
is different:
no_ack=True
we must wait for all messages to get processed, as they were removed from queue already. no_ack=False
we will need to purge all pending messages on errors, as they can not be ack
'ed nor reject
'ed afterwords.In a separate branch I created the 5-th proposal implementation with examples for Python3.4 and Python3.5 A note on Python3.5 implementation:
queued_consumer
in an async context manger. Now the problem of not calling consumer.cancel()
will be lessen. async with self.queue.queued_consumer() as consumer:
async for msg in consumer:
pass
Good idea! Nice way to ensure the consumer gets cancelled in the event of an application error.
Hello, thanks for the great library! Currently I am creating a consumer for indexing objects into another DB. It's quite simple, here's the snippet, that illustrates, what I want to do:
The problem is, that when connection is lost I have no way to see it except of the log: