Farfetch / kafkaflow

Apache Kafka .NET Framework to create applications simple to use and extend.
https://farfetch.github.io/kafkaflow/
MIT License
634 stars 110 forks source link

[Bug Report]: Distribution strategy by message/partition key issue. #490

Closed adambajguz closed 8 months ago

adambajguz commented 9 months ago

Prerequisites

Description

WorkerDistributionContext has the following properties:

https://github.com/Farfetch/kafkaflow/blob/06978533c18b8cdf6063b3bdba7692b340f2286f/src/KafkaFlow.Abstractions/WorkerDistributionContext.cs#L33C1-L56C71

I don't understand why the documentation of BytesSumDistributionStrategy states the following: https://github.com/Farfetch/kafkaflow/blob/06978533c18b8cdf6063b3bdba7692b340f2286f/src/KafkaFlow/Consumers/DistributionStrategies/BytesSumDistributionStrategy.cs#L8 while it clearly uses message key instead of partiton key, resulting in messages that can be delivered out of order.

Related issue (without any feedback from kafkalow team): #440

Expected behavior

Documentation should be changed to:

/// <summary>
/// This strategy sums all bytes in the message key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen
/// This algorithm is fast and creates a good work balance. Messages with the same message key are always delivered in the same worker, so, message order is guaranteed when multiple messages share the same key.
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same message key are consumed).
/// </summary>

New strategy named PartitionKeyDistributionStrategy should be added. Optionally a new strategy can be added where a fixed part of message key is used for worker selection and the other part is the actual unique ID of the message.

KafkaFlow version

All

joelfoliveira commented 8 months ago

Hi @adambajguz ,

Indeed the strategy description is incorrect since it is using the message key instead of the partition id. The reason for that strategy to be the default one is that in cases where the number of partitions is small, using the partition key will limit the number of available workers to distribute the message consumption (the number of used workers will never be more than the number of partitions assigned to the consumer).

The advantage of using the message key is usually it has more cardinality allowing the worker distribution to be more effective. We created a PR to fix the documentation and to create a new strategy that uses the partition id that can be used instead of the default one by configuring the consumer with .WithWorkerDistributionStrategy() method.

Please check the PR: https://github.com/Farfetch/kafkaflow/pull/491

adambajguz commented 8 months ago

Hi @joelfoliveira Maybe when a number of partitions is small it is possible to assing using round robin algorithm for each partiton?

joelfoliveira commented 8 months ago

The issue with using round robin to distribute to workers is that the processing order is lost.

On KafkaFlow v3 release the WorkerDistributionStrategy was refactored to include a WorkerDistributionContext (documentation here) which contains properties like the message key or the partition id.

You can always create your own custom worker distribution strategy which implements IWorkerDistributionStrategy and passing the implementation to the consumer configuration method .WithWorkerDistributionStrategy() in case the built-in distribution strategies aren't fully compatible with your requirements.

joelfoliveira commented 8 months ago

Hi @adambajguz

This issue has been closed since the PR (https://github.com/Farfetch/kafkaflow/pull/491) was merged with the worker distribution by partition key as a possible strategy and also related to the reply left in the previous issue where this was reported (https://github.com/Farfetch/kafkaflow/issues/440).

I want to reiterate that with version 3.0 of KafkaFlow, for more specific use cases, you can create your own worker distribution strategy.

Please feel free to respond to this message or reopen the issue if you'd like to continue the discussion or if you've encountered any new developments related to it.