Closed achille-roussel closed 5 years ago
Thanks for opening this thread. Here was my thinking on each of those:
As for the exporting. By exporting, you'll enable people that use kafka-go to build off it to add features that you might not have the time or desire for. If you look at the relationship that bsm/sarama-cluster and burrow have with sarama, the former exists because sarama exposes its broker level API. I can understand if you want to keep it private, but I think there's a real value in opening it up.
As for the naming style, I'm happy to change things to match the current style. I prefixed kafka broker api files separately as I was having a hard time parsing the existing code base to know what was specific to kafka-go and what was part of the kafka protocol api.
The api prefix follows the same line of thinking as above. As I was working with the code, I really wanted to create a Broker or Client abstraction as it feels like Conn is becoming overloaded. Also, it would simplify other people using library as the distinction between what's required for the kafka protocol and what Segment has added would be clearer.
Happy to split things up between Kafka protocol and the consumer group work.
I considered using the existing types as is, but ended up going with a separate type. Here was my thinking.
I agree exporting APIs allows people to reuse the code and build other packages on top of it. However, we have no immediate needs for this, and once exposed we have to maintain the code forever because there is no way to tell how many dependencies there will be out there (and we'd end up with a really large public API like sarama has, there's no point redoing what's already been done, this project intends to take a different approach). If someone ever opens an issue requesting to expose the internal types we'll probably do so by moving some of the lower-level components in a different package, for now we must aim for the smallest public API needed to use consumer groups (and having access to the low-level message types must not be needed).
Conn becoming overloaded is very subjective, to one developer it could seem too large, to another it could seem just fine. I stand on the side of not adding abstractions just to split things up. We must add abstractions when they serve a purpose or clearly represent different concepts. Conn
is intended to be the low-level representation of a connection to a Kafka server, it is OK to add more methods to it if needed, the only thing we must aim for is readability (short methods serving one purpose for example, your code seemed to be fine in this regard). If you look at files of the standard library (which stands as one of the highest quality Go codebase out there), there are some very large files sometimes, but the code is still easy to read and navigate.
As to "what Segment has added", there is nothing Segment-specific in this repository, and I intend to keep it this way, do you mind explaining your concern here?
I understand that re-using the Reader
concept would be more restrictive (for example it would only allow balancing on a single topic), but I'm convinced that this is a good first step as it would solve the majority of use cases without requiring new abstractions (which means programs using kafka-go would have more work to do to support both the Reader
and ConsumerGroup
types). In my experience, convenience is more important than features, so sacrificing some features for an API that's simpler to use is a trade off worth taking (kafka-go is actually a great example of this, as the project got good traction even tho it had less features than sarama). I wouldn't be against supporting multiple topics on the Reader
type if that's needed, but I'd advocate for supporting single-topic consumer groups with no breaking changes in the first place.
You make good points about not wanting to support more than you have to. I'll go ahead and un-export the api and update the naming within Conn
I agree that being overloaded is subjective. I will say as a newcomer to the codebase, the blending of api calls with higher order concepts took me a little to get used to. I thought an abstraction would have cleared up the confusion for future contributors. That said, I'm happy to take your lead on this.
As to Segment's opinion. When I look at the Kafka api, especially wrt consumer groups, it views the world as a set of topics with their associated partitions. kafka-go on the other hand views the world as topic partition pairs. I noticed in putting together consumer groups the conflict between these perspectives. It comes front and center during rebalancing.
For using Reader, I think there are a couple of questions that would help my thinking:
When I look at the Kafka api, especially wrt consumer groups, it views the world as a set of topics with their associated partitions. kafka-go on the other hand views the world as topic partition pairs. I noticed in putting together consumer groups the conflict between these perspectives. It comes front and center during rebalancing.
Like I said, I'm not against supporting multiple topics on the Reader
type, it may even be a great improvement of the current code. I was just saying that as a first step and to reuse as much of the existing code as possible we could start with single topics, then improve the code to be fully featured.
Is compatibility with other consumers desired? e.g. can I gradually co-mingle kafka-go and sarama consumers using consumer groups?
I believe this is a requirement, companies have programs written in multiple languages or using multiple libraries. As library writers we can build much more powerful tools if we offer cross-compatibility instead of building something new. This is also what's recommended by the Kafka documentation:
The primary use case for the membership API is consumer groups, but the requests are intentionally generic to support other cases (e.g. Kafka Connect groups). The cost of this generality is that specific group semantics are pushed into the client. For example, the JoinGroup/SyncGroup requests defined below have no explicit fields supporting partition assignment for consumer groups. Instead, they contain generic byte arrays in which assignments can be embedded by the consumer client implementation.
But although this allows each client implementation to define its own embedded schema, compatibility with Kafka tooling requires clients to use the standard embedded schema used by the client shipped with Kafka. The consumer-groups.sh utility, for example, assumes this format to display partition assignments. We therefore recommend that clients should follow the same schema so that these tools will work for all client implementations.
What is the expected behavior if the client uses the same consumer group id for multiple topics partitions? For example, if we assume a consumer group aware Reader, what would be the expected behavior if one service has two Readers that use the same consumer group id, but point to different topic-partitions?
Requiring that one group equals one topic is fine as a first step if that means less public API changes and a simpler implementation. I've never seen groups being used to load balance across topics (I'm sure there are use cases for it but it's not common in my experience).
What are your thoughts on using a custom rebalance strategy?
Let's focus on one implementation first, then once we have it working we can add an abstraction for supporting custom strategies. In my experience, it's always been easier to first solve one problem, then step back and think about the generic solution. It also means we'll already have tests to ensure that as we introduce an abstraction we won't be breaking existing code.
These answers help a lot. The reason I ask about compatibility vs multiple topics has to do with how consumer groups rebalance. Conceptually Kafka consumers handle rebalancing as follows:
The challenge I see is thus. A single leader is responsible for assignments across all topic-partitions, not just a single topic partition pair.
Consequently, if we had a heterogeneous environment with both sarama cluster and kafka-go, if the leadership were given to a sarama cluster consumer, the kafka-go consumer may not be able to handle the assignment given e.g. that both the topic and partition should be changed.
Conversely, if the kafka-go consumer were elected leader, without a custom strategy, it would be unaware of which topic-partitions each of the existing consumers were bound to. A workaround for this would be to create a custom rebalancing strategy. However, if we create a custom strategy, then there would be no common strategy between sarama-cluster consumers and kafka-go consumers and new consumers would get an InconsistentGroupProtocol error.
Hence, my thinking in allowing ConsumerGroup to support multiple topic partitions. I considered making the change to Reader, but ran into the following:
Adding Topic/Partition to kafka.Message seems like a very fine change to me, maybe we can even make it its own PR because it would likely touch a lot of the existing code and we should probably set those fields even for a reader that doesn't use consumer groups (for consistency purposes).
I understand the leader is in charge of all topic and partitions of the group, however if we document the restriction of supporting a single topic in the first place, then kafka-go is compatible with programs using sarama clusters for a single topic as well, isn't it?
I'm not sure I understand the motivation for adding Partitions []int
to the configuration, what's the use case for setting the partitions of the topic? This information can be discovered via the metadata API, and is dynamic anyway (the number of partitions may be increased on a topic for example).
I like that first suggestion a lot. I'll go ahead and start there.
Partitions[] shouldn't be in the configuration. It was something I experimented with, but clearly didn't remove completely before committing.
I noticed that ReaderStats has an embedded partition. How would you like to handle that if the Reader can handle more than one partitions. Options I see would include:
or a combination of the above.
That's a good point. I think it's fine to set the partition to -1 for now when the reader is used with a consumer group. It's not ideal but I'm not a big fan of adding new methods for solving this.
Ideally I think we should return a slice of ReaderStats, one of each topic/partition that the reader handles, but this is an API breaking change so I'd rather delay this until we have consumer groups fully implemented and maybe we'll make a major version release where we can introduce those kind of changes.
sounds good to me
At this point, I have a working version of consumer groups with kafka-go.
As you've suggested, I split the work into smaller commits to make it easier to digest. The first commit, expand Connto handle kafka api calls
, increases the surface area of *Conn to support the api calls required by consumer groups. I've intentionally made the calls shallow so the functions could be a simple adapter to Kafka.
In a separate commit, I can introduce the code that modifies Reader to support consumer groups.
I've updated the link above, https://github.com/savaki/kafka-go/tree/consumer-groups, to point to the working version. Still has a number of open issues and needs more tests, but should serve as a good straw man for discussion.
Some of the open issues include:
(*Reader).Offset
and (*Reader).SetOffset
What to do with
(*Reader).Offset
and(*Reader).SetOffset
Could Offset
and SetOffset
simply return errors if the Reader is configured to be part of a consumer group? Because in this case offset management is automatic so the program cannot be given control over where the Reader is supposed to start.
Should the Reader hold a persistent connection to the coordinator, or just create one on demand
I'd advocate for getting a working implementation first (no matter what approach you choose), but if connections occur quite often we'll probably want to keep a persistent connection established to the coordinator.
Should any attempt be made to buffer commits? Currently I'm using a naive implementation that opens a connection, writes the commit, and closes the connection
Same answer as the previous question, let's aim for a simple and working solution first, no matter how inefficient, then we can improve it.
Could Offset and SetOffset simply return errors if the Reader is configured to be part of a consumer group? Because in this case offset management is automatic so the program cannot be given control over where the Reader is supposed to start.
+1
I'd advocate for getting a working implementation first (no matter what approach you choose), but if connections occur quite often we'll probably want to keep a persistent connection established to the coordinator.
Agreed. I have a working implementation that we're testing internally. However, I'm breaking up the check-ins as per our discussion to ease the review process. As it stands, not reusing connections was simpler to reason about so I went that direction. The main driver of a persistent connection is the commit which currently opens a new conn on commit.
Same answer as the previous question, let's aim for a simple and working solution first, no matter how inefficient, then we can improve it.
Agreed. I'm coming from the pov that I have a working version currently and have capacity to look at additional issues. I'll make sure to submit these changes we're discussing as separate pull requests to not muddy the waters.
My suggestion is that we set a Config time.Duration, CommitInterval
. When zero, commits are pushed synchronously with no buffering. When > 0, commits will be buffered and pushed at the specified interval. Close on the Reader will push any buffered commits to the broker.
Could Offset and SetOffset simply return errors if the Reader is configured to be part of a consumer group? Because in this case offset management is automatic so the program cannot be given control over where the Reader is supposed to start.
(Not directly about Offset()
or SetOffset()
, mostly about ReaderConfig
) While I really like the ease of use of the current implementation this is my main issue with it. Without having the support to start the consumer group from latest offset I'll have to keep using separate consumers for each partition.
Use case here is having temporary consumer group read latest data from a topic. With data possibly containing terabytes and retention of couple weeks starting from beginning each time is not ok.
That’s a good pint, would you have the time to work on adding this feature?
I think I can find some time soonish, and I know what to change. Just need to decide where to expose the configuration for this.
The reader config seems like the right place for this, but feel free to suggest something else if you think otherwise.
@errnoh, I might do some work on this soon. Have you written anything?
Yeah, sorry. A bit busy at work but my proof-of-concept is now at https://github.com/segmentio/kafka-go/pull/88
Any opinion if that's the way we should do it or do you have any other suggestions?
Just so I understand: kafka-go supports consumer groups, but not entirely (or, natively)? There has been an effort to implement it better, but that stopped in March? We really want to use (and contribute to) this library rather than Sarama (with the sarama-cluster lib for consumer groups), but reading the open issues I get the feeling kafka-go is not quite there yet? @achille-roussel @Pryz @abraithwaite
Catching up here, @mariusw consumer group support in kafka-go should be usable and we are using this code in production at Segment, so you should be able to as well.
If there are features that you think are missing feel free to open a discussion about it or a PR if you have a fix ready to be merged.
Regarding this issue, I think that all the work that it tracked has landed or has other issues/PRs tracking it, so I'll go ahead and close it.
Thank you all for your contributions in the discussions and the code!
@savaki
I'm opening this issue as communication thread on the consumer groups implementation that you're working on there https://github.com/savaki/kafka-go/tree/consumer-groups
First of all, thanks a lot of taking time to work on this, it's a highly demanded feature that I'm sure a lot will benefit from.
Here are some feedback I had on your work:
All types that represent structured protocol messages (like DescribeGroupsRequestV1 for example) must be non-exported. One of the main goals of the kafka-go project is to hide away implementation details of the kafka protocol into high-level concepts that fit well into Go programs. We must aim for exposing the minimal public API so the package can focus on useful and programmer-friendly concepts. Take a look at how the other protocol messages are implemented for example.
The file naming that you used is not following the current scheme, aim for shorter names with no underscores (the way it was done for all other files). For example,
api_describe_groups.go
could likely be renameddescribegroups.go
to match the current style.The naming scheme you used with the
api
prefix for method names likeapiListGroups
,apiLeaveGroup
seems unnecessary. Simply naming those methodslistGroups
,leaveGroups
would make the code shorter and more expressive.It is more common in Go to declare duration constants as
<value> * <unit>
instead of the opposite. The reason for this is better readability because30 * time.Second
kins of read like natural English "30 seconds". For exampledefaultSessionTimeout = time.Second * 30
would be better writtendefaultSessionTimeout = 30 * time.Second
.This is a considerable amount of work, what do you think about splitting it into two steps? First adding all the protocol messages with tests, then adding high-level APIs and actual consumer group implementation. That way we can better focus on the changes being made, ease review and chance of producing quality software.
What do you think about supporting consumer groups in the
Reader
type instead of introducing a new public API (ConsumerGroup
). It seems like we could simply add aGroup string
property to theReaderConfig
to configure the reader as being part of a group instead of directly reading from a partition. This may be less flexible, for example we lose the ability to read from multiple topics, or from multiple static partitions, but as a first step I think we're better off focusing on reading from a single topic and keeping the API as simple as possible. This would have the benefit of reusing the existing public API and a bunch of theReader
implementaiton, making it easier for programs based on kafka-go to introduce support for consumer groups without having to create abstractions if they need to support both.