spreedly / kaffe

An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box.
https://hex.pm/packages/kaffe
MIT License
153 stars 61 forks source link

Support per-topic offsets #14

Open rwdaigle opened 7 years ago

rwdaigle commented 7 years ago

There is currently a start_with_earliest_message option, but you can't specify it on a per-topic basis. Kaffe should allow per topic configurations vs. the current configuration approach. I'm thinking this will meaningfully change the config structure to really be explicit about the relationship between consumers, kafka instances, and topics. Something more like:

config :kaffe,
  consumers: [
    [
      endpoint: [kafka: 9092],
      consumer_group: "your-app-consumer-group",
      topics: [
        [
          topic: "topic1",
          mode: :sync,
          message_handler: MessageProcessor,
          offset: :earliest # or integer or :latest?
        ], [
          topic: "topic2",
          mode: :async,
          message_handler: MessageProcessor2,
          offset: :latest
        ]
      ]
    ],
    [
      endpoint: [kafka: 8092],
      # ....
    ]
  ]

Not sure if that's the perfect delineation, but it feels like we're trying to overload the config as is.

duff commented 6 years ago

@rwdaigle I'm mulling on whether you can bring the consumer_group to be specific for the topic too:

config :kaffe,
  consumers: [
    [
      endpoint: [kafka: 9092],
      topics: [
        [
          topic: "topic1",
          mode: :sync,
          message_handler: MessageProcessor,
          consumer_group: "consumer-group-for-topic-1",
          offset: :earliest # or integer or :latest?
        ], [
          topic: "topic2",
          mode: :async,
          message_handler: MessageProcessor2,
          consumer_group: "consumer-group-for-topic-2",
          offset: :latest
        ]
      ]
    ],
    [
      endpoint: [kafka: 8092],
      # ....
    ]
  ]
rwdaigle commented 6 years ago

I'm mulling on whether you can bring the consumer_group to be specific for the topic too:

Yes! Definitely seems like this should be the case.

madshargreave commented 6 years ago

@duff That would be great, yeah

dams commented 6 years ago

I'm also very much in need for such a feature!

objectuser commented 6 years ago

Maybe we can rework the config for the next major revision.

dams commented 6 years ago

kaffe seems to assume that developers want only one consumer group, one producer. Most of the time people need more things at the same time. So yes, a rework of that would be really awesome

objectuser commented 6 years ago

What does having a separate consumer group per topic provide? The consumer group is still specific to the topic.

dams commented 6 years ago

You might want to have multiple consumer groups, doing different things, like consuming from 2 different sets of topics independantly. You also could consume from two different kafka clusters and write to a third. I have some cases where I need to consume data from a lot of topics, using one consumer group, then I want a second consumer group to consume a set of "instructions" topics, where it'll be able to read which actions to perform. I don't want the data reading to affect the instruction readings. Etc.

objectuser commented 6 years ago

Yes, if you're consuming from the same topic multiple times, two consumer groups would be necessary.

However, I don't think consuming from two different clusters would necessitate different consumer groups. Each cluster would just store the same consumer group name associated with whatever topics your client is subscribed to.

In the example above, if the "instructions" topics are different from the first set of topics, using the same consumer group (which is just a name that refers to the partition offsets of a topic) shouldn't be a problem, right?

LMK if I'm not understanding.

dams commented 6 years ago

Well, it's an issue, because (correct me if I'm wrong) but kaffe can only setup one message handler, so you've got one handler for both data and instruction topics, so it's not easy to maintain isolation of processing. Also, if the data topics have a lot of partition reassignment, it's going to impact your instruction topic consumer, as the consumer group will have to pause. But maybe I'm missing something, like a way to have 2 different message handlers ? From what it seems, kaffe is really tailored towards doing one consuming thing, and one producer thing, which is limiting.

For instance, you can't setup one producer to send message with a :random partitioning, and an other one with a :md5 partitioner. Also producers to different kafka cluster is not supported.

objectuser commented 6 years ago

Well, it's an issue, because (correct me if I'm wrong) but kaffe can only setup one message handler, so you've got one handler for both data and instruction topics, so it's not easy to maintain isolation of processing.

This is true, only one message handler is supported. Isolation would have to be handled downstream.

Also, if the data topics have a lot of partition reassignment, it's going to impact your instruction topic consumer, as the consumer group will have to pause.

I might be wrong, but I don't think this is true. The GroupMembers are specific to a topic. If one is rebalancing, it shouldn't impact subscribers to the other topics. However, even if Kaffe isn't doing the right thing here, it shouldn't have anything to do with consumer groups.

From what it seems, kaffe is really tailored towards doing one consuming thing, and one producer thing, which is limiting

Yes, I can see this from the standpoint in isolating the processing, so that errors in one don't impact another, for example. In this case, I think you'd need separate apps.

For instance, you can't setup one producer to send message with a :random partitioning, and an other one with a :md5 partitioner. Also producers to different kafka cluster is not supported.

Correct on both counts.

But I still don't think ~any of that~ most of that has anything to do with consumer groups (except for possibly the rebalance thing). This is a general design issue, which starts with how Kaffe is configured. Kaffe would require some refactoring to support this, but most of the internals are topic/subscriber/worker specific.

dams commented 6 years ago

This is true, only one message handler is supported. Isolation would have to be handled downstream.

Which might be an issue if for instance one topic if DDos-ed with msgs, you can't keep a sererate handler that would be unaffected. Well you can but it's not easy to implement I think.

I might be wrong, but I don't think this is true. The GroupMembers are specific to a topic. If one is rebalancing, it shouldn't impact subscribers to the other topics. However, even if Kaffe isn't doing the right thing here, it shouldn't have anything to do with consumer groups.

Hm it's weird because I see this issue that rebalancing is affecting all the groupmembers, and actually all the nodes part of the consumer group. But maybe it's something on my side, so it's interesting to know that it shouldn't happen. I'll dig into this.

Yes, I can see this from the standpoint in isolating the processing, so that errors in one don't impact another, for example. In this case, I think you'd need separate apps.

That would increase complexity by a substantial margin

Correct on both counts.

But I still don't think any of that most of that has anything to do with consumer groups (except for possibly the rebalance thing). This is a general design issue, which starts with how Kaffe is configured. Kaffe would require some refactoring to support this, but most of the internals are topic/subscriber/worker specific.

Also, don't get me wrong, kaffe is nice, it's just weird because to me the internal code and modules layout seems to have been designed to allow a lot of flexibility and different way to use it, but in reality it's arbitrary rigid about what it allows users to do. Maybe that's on purpose?

It looks like I could implement a thin layer on top of kaffe's internals, with a different approach to configuration and pluggability, and achieve total freedom of use. Which seems strange :)

Anyway keep up the great work !

objectuser commented 6 years ago

Which might be an issue if for instance one topic if DDos-ed with msgs, you can't keep a sererate handler that would be unaffected.

👍

Hm it's weird because I see this issue that rebalancing is affecting all the groupmembers, and actually all the nodes part of the consumer group. But maybe it's something on my side, so it's interesting to know that it shouldn't happen. I'll dig into this.

I might totally be wrong. I used a one_for_all strategy in the supervisors to ensure everything was in sync if something went wrong. Maybe that's too aggressive for the supervision strategy Kaffe currently has.

That would increase complexity by a substantial margin

Too true! I wish umbrellas were a solution here, but they all share the same config.

Also, don't get me wrong, kaffe is nice, it's just weird because to me the internal code and modules layout seems to have been designed to allow a lot of flexibility and different way to use it, but in reality it's arbitrary rigid about what it allows users to do. Maybe that's on purpose?

Kaffe started out as a very simple consumer with the same process handling all partitions. It was a very limited use case.

When we discovered that the throughput wasn't good, we followed the design of Brucke and got great throughput. But we were still using the old style config. So it was a bit of a split personality. 😉

Anyway keep up the great work !

Thanks very much. We also appreciate your contributions, your willingness to argue your point and submit PRs. That all helps make it better for everyone!