swift-server / swift-kafka-client

Apache License 2.0
83 stars 23 forks source link

Commit messages by topic, partition, offset #160

Open omarkj opened 10 months ago

omarkj commented 10 months ago

Motivation:

In some cases, for instances when messages are handed out to other processes, it is not reasonable to hold on to the whole Kafka message to be able to commit it at a later time. It consumes much less memory to hold only onto the details needed to commit the message.

Modifications:

Add new public methods that allow users to commit messages if they have a reference to the topic, partition, and offset instead of only the whole message.

Result:

Developers no longer need to keep the whole message around to commit it, only the topic, partition, and offset.

blindspotbounty commented 9 months ago

@omarkj thank you for your PR! There are some notes.

Agree with @FranzBusch that indeed currently assign (.partition) does not allow to accept several topics. Though it doesn't lack sense to have topic as argument as there is other consumption strategy: .group that allows to subscribe to multiple topics by either explicitly specifying them, either subscribing to wildcard .group(id: "<group id>", topics: ["topic1", "topic2", "^some_prefixed_topic_wildcard.*"]).

However, I would probably organise it as some dedicated struct, i.e. TopicPartitionOffset. That have couple of advantages now and for future use:

  1. That would allow to copy this info in one short from consumer message
  2. Maybe then migrate .partition consumption strategy to the same structure in future
  3. The same structure can be used for rebalance implementation in future

For example:

struct TopicPartitionOffset: Hashable, Equitable? {
    public let topic: String
    public let partition: KafkaPartition
    public let offset: KafkaOffset
    /// no public constructor (at least so far)
}

struct KafkaConsumerMessage {
     var topicPartitionOffset: TopicPartitionOffset { .init(topic: self.topic, partition: self.partition, offset: self.offset) }
}

final class KafkaConsumer {
...
    func commit(_ topicPartitionOffset) throws
}

The usage would be the following:

for try await msg in consumer.messages {
    consumer.commit(msg.topicPartitionOffset) // here or after some time consuming processing now only need to save topicPartitionOffset
}