confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
248 stars 3.15k forks source link

User-facing function for waiting on full queue #3146

Open andrewthad opened 3 years ago

andrewthad commented 3 years ago

Feature Request Description

It would make librdkafka easy for me to incorporate into a library I'm working on if it provided a function for waiting on a message queue have space available:

void rd_kafka_wait_msg_queue (rd_kafka_t *rk, size_t len)

The behavior of this function would be to block until rk->rk_curr_msgs.size becomes low enough that len bytes are available on the queue. This is really just a hint since another producer thread could get scheduled to run right after rd_kafka_wait_msg_queue. That is, correct use would involve running rd_kafka_wait_msg_queue and rd_kafka_produce* (with message flags for asynchronous behavior) in a loop. The implementation of this function could be almost entirely cribbed from rd_kafka_curr_msgs_add. With such a function, users could simulate the synchronous producer with something like:

while(1) {
  resp = rd_kafka_produce(rk, ...);
  if(resp == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
    rd_kafka_wait_msg_queue(rk,len); // len is the length of the message
  } else {
    break;
  }
}

Motivation

For message production, users currently have the option of asynchronous production or synchronous production. In the applications I've been working on, the blocking producer tends to meet my needs a little better. However, now I'm running into a tricky situation. I've started incorporating the work from https://github.com/edenhill/librdkafka/pull/2902 into rdkafka-api, a Haskell library that provides bindings to librdkafka.

The GHC Haskell runtime includes a relocating stop-the-world garbage collector. The C foreign function interface (FFI) offers two ways to call C code (both named poorly). Here are their relevant behaviors:

I cannot speak in any detail about other language runtimes. In GHC Haskell, this ends up meaning that it is preferable to use the unsafe ffi when possible since that means you can don't have to care about whether or not byte arrays had been explicitly pinned. Nonblocking functions are easy to work with since they require the caller to uphold fewer invariants.

So, how does this relate to librdkafka? I'd like to be able to get the behavior that the blocking producer offers but without having to use the safe FFI to call rd_kafka_produce*. Having the extra function to wait until space is probably available means that I could use the safe FFI just for rd_kafka_wait_msg_queue.

andrewthad commented 3 years ago

I'm still interesting in implementing a user-facing rd_kafka_wait_msg_queue. @edenhill Would you accept a PR implementing this?

edenhill commented 3 years ago

This is essentially what produce() does if providing RD_KAFKA_MSG_F_BLOCK, with the culprit that delivery reports need to be polled from another thread. Would that be usable in your setup?

andrewthad commented 3 years ago

Agreed, the example while loop I gave is basically just rd_kafka_produce with RD_KAFKA_MSG_F_BLOCK. In languages without relocating garbage collectors (like C or golang), the two have identical behavior. However in GHC Haskell, getting to split up rd_kafka_produce (nonblocking) and rd_kafka_wait_msg_queue (blocking) would allow me to pass unpinned (relocatable) memory to rd_kafka_produce. As things are now, I have to resort to pinned memory, which means that I have to make an extra copy of some of the fields before passing them over to librdkafka. I have to use pinned memory because calling rd_kafka_produce with RD_KAFKA_MSG_F_BLOCK requires performing a foreign function call in such a way that the GC might run while rd_kafka_produce is running. When called without RD_KAFKA_MSG_F_BLOCK, I may instead perform rd_kafka_produce a foreign function call that prevents GC from running concurrently. (GHC confusingly refers to these two types of FFI calls as safe and unsafe for historical reasons.)