kafkaex / kafka_ex

Kafka client library for Elixir
MIT License
598 stars 162 forks source link

Support pull-based consumer groups for GenStage users #230

Open dantswain opened 7 years ago

dantswain commented 7 years ago

In #143 we more or less decided that supporting GenStage would be out of scope for this library. On the other hand, talking with some folks recently now that our consumer group implementation is released, I think it would be feasible to at least support someone building their own GenStage producer (and maybe providing an example).

If I'm understanding correctly, the only thing this really requires is a pull-based version of GenConsumer. Looking at the current implementation, I don't think this would be too hard to accomplish. The user would need to create their own GenStage producer module and link it to the GenConsumer. I think this would be tricky to do generically but would be feasible for users to do on a one-off basis to meet their needs.

I think the API could look something like the following, assuming that the consumer group has already been started.

# somehow have to configure the GenConsumer in setup to be in "pull" mode

# user has to get the pids of the consumers from the ConsumerGroup API (existing IIRC)

# fetch a message from the pid
GenConsumer.fetch(pid, max_messages :: pos_integer) :: {:ok, message_batch :: [Message.t]} | {:error, term}

Offset management would require a little logic based on number of messages requested vs number actually fetched, but this is doable. For reference, I think the Stream implementation does this.

bjhaid commented 7 years ago

I experimented with this and offset management was what put me off, genstage expects that you can assign counters to the data being streamed, however with Kafka you can only request for a number of bytes, accurately mapping bytes to a counter in Kafka is difficult. If you want to model it like the stream implementation, you lose some of the guarantees provided by genstage, e.g: back pressure all the way to Kafka. There were other problems I noticed that I can't remember clearly but it just didn't seem right when I tried implementing it

dantswain commented 7 years ago

I mostly agree. From my understanding, GenStage is just not a great fit for Kafka, at least not in a generic sense. I am becoming convinced that there are some use cases where it could work fine, though, and mostly what I want to accomplish here is to make it possible. That is, I think we can make some relatively small changes to GenConsumer that do not break the current API but do open up the possibility of a user implementing their own producer.

cdegroot commented 7 years ago

@bjhaid can't you in 0.10 consumers specify the number of messages you want to receive?

bjhaid commented 7 years ago

@cdegroot I have not been following the changes to the API, can you link relevant doc?

cdegroot commented 7 years ago

Hmm... I think I was referring to the Java driver's "max.poll.records" which I assumed had a protocol-level corresponding setting; digging through the protocol, though, I can't find it. Too bad - seems to be a Java-only thing then?