redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 815 forks source link

Rework `kafka_franz` batching and delivery mechanisms #2745

Open Jeffail opened 1 month ago

Jeffail commented 1 month ago

The configuration of the kafka_franz input losely resembles the current kafka one in that it has a checkpoint_limit field as well as a batching field. The batching mechanism is operated per topic-partition, as is the checkpointer. However, one thing we currently do not have parity with is that the checkpointing mechanism can't guarantee ordered processing downstream.

This limitation is due to how message requests are made and distributed out to the batching and checkpointing mechanisms of the input component. In order to correct it we would need to refactor this mechanism which will likely also result in a change of behaviour.

We therefore should consider instead creating a new component, perhaps named the redpanda input, and then deprecated the kafka_franz input (and the old kafka input), once we're confident with it. We should also consider whether we want to ditch the advance batching mechanisms that we currently support in favour of directly forwarding the maximum message count that we receive from each poll request as a config field.

emaxerrno commented 1 month ago

@twmb is this about reducing the in-flight to 1

Jeffail commented 1 month ago

It's complicated. We can have >1 in flight from a partition but they need to be of one batch, and we cannot have >1 batch in flight from a given partition (but only if we need to preserve ordering).

The kafka_franz input right now shares a single client connection for all topic partitions we're consuming from, but the general poll it makes has partitions removed when they hit their checkpoint limit. This mechanism is optimised for when the checkpoint limit is quite high and you just want high throughput, but this solution can't really support a lockstep system where checkpoints are precise.

When you bring the checkpoint down to just 1 it's actually just "very low", which is why the kafka_franz input doesnt say anywhere that it supports strict ordering.

The regular kafka input uses the sarama client library, which has layers and layers of abstraction away from the polling so that it's trivial to apply checkpointing (and back pressure) per partition. The performance is worse and it's less stable, but the advantage there is that we can easily implement lockstepped checkpointing and guarantee ordering (as long as all the other caveats are addressed elsewhere in the config).

Solution A

We remove all of the complexity of the batching mechanisms such that users can no longer have period or byte size based batching. Instead, there will only be a maximum messages count, which is applied per topic-partition and dictates the maximum batch size. This is closer in line with what the underlying client poll is doing and has some performance advantages.

We then ensure all topic-partition batches are consumed, processed and acked in lockstep. This is a major disadvantage for performance compared to systems that spawn a single consume -> process -> write thread per partition, but when the processing pipeline is >1 threads and the output is >1 max in flight, it's the only way to ensure ordered delivery.

Solution B

We continue as we are currently, with batching and checkpointing per partition on a "best attempt" basis. But, we add a new partitioning field to the pipeline configuration section. This would act similarly to kafka partitions where each message would be routed to a single processing thread based on the key, allowing you to ensure all messages of a given partition are processed serially.

You would also need to ensure that the output is configured with max in flight 1 in order to guarantee ordered delivery. However, the performance would be much better and you can perserve the relationship between processing threads and CPU cores.