Open ghost opened 7 years ago
Callbacks are called in the main thread, triggered by proudcer.poll()
.
The GIL is unlocked while the underlying rd_kafka_poll()
C function is called (since it may block if timeout > 0), if a callback is triggered the GIL is re-acquired before calling any registered Python-code callbacks (e.g., on_delivery).
Illustration:
p.poll(10)
rd_kafka_poll(10*1000)
on_delivery
, error_cb
, ..)rd_kafka_poll()
returnsp.poll()
returnsThis all happens in the same thread, main application thread.
Thank you for your answer! I still have some questions, so i try to explain.
Lets imagine situation where we use confluent-kafka-python from single thread Python app.
p.poll(10)
called from Main Threadrd_kafka_poll(10*1000)
. Here interpeter will be blocked untill rd_kafka_poll
returns. Am i right?
Yes, Python execution of the thread calling poll() will halt until the function returns, but other Python threads will continue to run during this time.
This is exactly what i am searching for. This means that we shouldn't use confluent-kafka-python with Tornado, because Tornado has only one exection thread and calling poll()
wil block it. So we will lose perfomance.
Hi @johnsm87 and @edenhill ,
I'm also doing an evaluation of confluent-kafka-python with applications built on the Tornado framework and I'm hoping to understand this conversation a bit better.
This means that we shouldn't use confluent-kafka-python with Tornado, because Tornado has only one exection thread and calling poll() wil block it. So we will lose perfomance.
Did you find a way to do non-blocking calls using Tornado async interfaces? Any way to extend confluent-kafka-python to support tornado coroutines?
As far as I understand the poll(1000)
function creates a busy loop and doesn't allow Tornado to execute other work (or code) while waiting for a response from confluent-kafka-python.
I'm looking for a possible workaround or patch, anything that could help here :-)
I'm not certain about Tornado, but with asyncio I'm pretty sure you'd want to integrate with run_in_executor for any blocking calls. I'd imagine Tornado has something similar since pretty much any async library needs a way to integrate with synchronous libraries by pushing them out to a thread/threadpool.
I think the major issue here is with callbacks since they come from different C threads. Any callbacks that are triggered by C code when the GIL is unlocked would be invoked as soon as they can acquire the lock, which would introduce arbitrary ordering wrt the event thread. This might be ok if those callbacks always appended them to the event queue. I'm not sure off the top of my head what kinds of problems could occur if the user callback is executed immediately.
Side note: pretty much all librdkafka callbacks are triggered from .poll() (or similar) in the application thread.
I used this library along with Tornado here: https://github.com/CrowdStrike/cs.eyrie/blob/master/cs/eyrie/transistor.py#L470 (in this code, self.collector
is an instance of Consumer
). If you call poll(0)
, it will return immediately. It might return None
if there is not a message ready, but it won't block. The above code handles that by waiting (using gen.sleep
) progressively longer before polling again.
Dedicating a thread to poll()
calls definitely works, but hinders scalability.
With one thread per broker, librdkafka is already very liberal in it's use of threads. This may be tolerable for most languages where you run one application process per host and maximize CPU utilization by running more worker threads. With Python and its GIL however you achieve better utilization by running more worker processes.
We typically run over a hundred worker processes per host. In the environment with 10 Kafka brokers we are looking at having a process go from one thread to 13 (main IOLoop thread, main librdkafka thread, 10 broker threads, polling and callback dispatch thread).
Napkin math:
Because of this cascading effect we will also have to keep in mind that whenever we change Kafka broker topology we need to either decrease the number of workers we run (not to run out of memory), or to increase it (to get better utilization).
My understanding is that broker threads mostly sit idle blocked on IO. If that's the case - having one thread manage all broker sockets via poll
or select
should not negatively affect the throughput.
Ideally with asyncio and Tornado you would not need extra threads at all. Sockets can be put in non-blocking mode and registered with IOLoop.
Sure, threading model is not something you can change easily, but it would be a great future improvement for Python clients.
Thanks everyone for your responses, it was very helpful. We're going to look at adding a dedicated thread for polling and will let you know how that goes.
@mikhtonyuk: the code I linked to is designed to write to a zmq socket, so that there's one process reading from Kafka and many processes connected to the zmq PULL socket. Messages are then distributed in round robin fashion.
Just to update this issue on what I did when integrating Tornado and confluent-kafka-python:
Producer().poll
infinitely. I attached a callback function to Producer().produce
that adds another callback function [i] to the Tornado event loop. See IOLoop.add_callback
The second callback function sets the result of the future once the message is delivered to Kafka (i.e. poll
triggers the callback function)
if err is None:
future.set_result(True)
else:
future.set_result(False)
return future
It all works fine, however, it becomes complicated if you try to send two non-blocking messages in different coroutines on the main thread. There is no guarantee that the order in which you send the messages will be the same order that the callback function is triggered.
This means that I need to wait until each message is sent successfully before sending another. That flow results in an average latency around ~2ms per message (withqueue_buffering_max_ms=0
) to a local Kafka cluster. That's roughly ~500 messages / second compared to 20,000 messages / second using the produce
method without the coroutine.
I guess my next step would be to wait until this issue is resolved and set max.inflight=1
:
Question: what is the best way to support two non-blocking requests coming from the same main thread given that the responses may be out-of-order?
Thanks!
@mtrienis One way to accomplish this ordering guarantee would be to do a bit of your own buffering /blocking. In your callback that is setting the future, maintain some state that tracks the next expected message offset (or offsets across multiple topic partitions). I think aside from the initial setup, you shouldn't have to worry about locking because these callbacks are all executing on the main thread. When you see the expected offset, fire it as well as any messages that are queued up behind it. The vast majority of the time you'd expect this buffer to be empty, but in the case of some reordering you do a bit of buffering until the earliest request's response comes through.
Now, there are a couple of issues here -- you need to know the partitions the messages are going to (or have a global order on the produce requests), the quick design I sketched out won't handle the offset jumps in compacted topics, etc. But those could be addressed if you need this before the underlying issue can be fixed in librdkafka. (Falling back to synchronous production is basically never a good solution, for exactly the reason you point out, even if you need to manage a bit more state yourself.)
The producer side is fairly easy due to the non-blocking/callback mechanism already present here.
Building a tornado/asyncio framework around confluent-kafka-python would be much easier if we had the ability to register a Python callback that was triggered every time the internal buffer of messages went from empty to non-empty. After this callback was triggered we would be guaranteed that the next call to Consumer.poll(0)
would return a message, and not None
.
Is adding such a callback feasible?
This notebook gives a basic non-blocking implementation of Consumer.poll
and shows basic benchmarking information.
https://gist.github.com/3feba6adcf9b33ffb261c896b5e6c343
Would it be reasonable to reopen this issue to discuss using confluent_kafka
with Python's async frameworks?
@mrocklin There is indeed community demand for getting this client to work flawlessly with asyncio, see issue #185.
We would be very happy to see community contributed work in this area.
FWIW it is no longer likely that I will implement anything like this near-term. I would no longer plan on productivity from me.
Let's talk about usage of confluent-kafka-python Producer together with Tornado web server.
We know that Tornado is non blocking-server and has event loop for processing socket events. We also know that librdkafka has several threads and its own event loop. And confluent-kafka-python is build on top of librdkafka.
When we call Producer.produce() from something inside Tornado it executes in main thread. Right? Then librdkafka pass data (with callback, probably) to another thread (event loop thread). And when message succefully recieved by Kafka, callback executed.
What happens when librdkafka call callback function from its event loop? Does it lock Python main thread with GIL?