gojekfarm / xtools

XTools is a submodule based repo to host re-usable Golang code.
MIT License
5 stars 0 forks source link

xkafka: Sequential & Async Consumer Patterns #25

Closed sonnes closed 6 months ago

sonnes commented 8 months ago

Latest version - https://github.com/gojekfarm/xtools/issues/25#issuecomment-1805073951

v0

This is a proposal to define the behavior of xkafka.Consumer for sequential and async consumption use cases.

The processing mode is determined by the xkafka.Concurrency option.

Sequential Processing

Sequential processing is the default mode. It is same as xkafka.Concurrency(1). By default, the consumer is initialized with settings as per https://github.com/confluentinc/librdkafka/wiki/Consumer-offset-management.

The offset can also be moved manually by enabling xkafka.ManualOffset option. In this case, the consumer will store the offset after the message is processed and the message.Status is Success or Skip. The stored offsets will be automatically committed, unless the enable.auto.commit option is set to false.

handler := xkafka.HandlerFunc(func(ctx context.Context, message *xkafka.Message) error {
    // process message
    return nil
})

// default sequential processing
consumer := xkafka.NewConsumer(
    "default",
    handler,
    xkafka.Brokers{"localhost:9092"},
    xkafka.Topics{"test"},
)

// manual commit handler
handler := xkafka.HandlerFunc(func(ctx context.Context, message *xkafka.Message) error {
    // ... process message

    message.AckSuccess()

    return nil
})

consumer := xkafka.NewConsumer(
    "manual-commit",
    handler,
    xkafka.Brokers{"localhost:9092"},
    xkafka.Topics{"test"},
    xkafka.ManualOffset(true),
)

Async Processing

Async processing is enabled by setting xkafka.Concurrency to a value greater than 1. The consumer will use a pool of Go routines to process messages concurrently.

Manual Offset

When ManualOffset is enabled in async processing mode, the consumer will process messages concurrently using a pool, but will store the offset sequentially.

But, this approach comes with a caveat. When an earlier message fails after later messages have already been processed, the consumer will not move the offset. Adding a retry middleware or a dead letter queue can help solve this problem.

Error Handling

Error handling can be done by:

xkafka.ErrorHandler is called for each error either from the handler or internally from kafka. Unhandled errors will cause the consumer to stop processing and panic.

Queues

Both sequential and async processing modes require some form of a queue to guarantee at-least-once processing in case of errors, crashes, or restarts.

The queue can be a local on-disk queue (BadgerDB, SQLite, etc.) or a distributed queue (Redis, RabbitMQ, etc.).

Consumers can be configured to use a queue by setting xkafka.Queue option. The queue must implement xkafka.Queuer interface.

type Queuer interface {
    Put(ctx context.Context, message *Message) error
    Get(ctx context.Context, id string) (*Message, error)
    Delete(ctx context.Context, id string) error
    All(ctx context.Context, cb func(message *Message) error) error
}

queue := xkafkaqueue.NewSQLiteQueue("default", "/tmp/xkafka.db")

consumer := xkafka.NewConsumer(
    "default",
    handler,
    xkafka.Brokers{"localhost:9092"},
    xkafka.Topics{"test"},
    xkafka.Queue(queue),
)

~

sonnes commented 7 months ago

Sequential

❯ go run *.go sequential
12:30PM INF [ADMIN] created topic name=seq-cl686tenr2kh4uig2rc0 partitions=2 result=[seq-cl686tenr2kh4uig2rc0]
12:30PM INF [PRODUCER] published message key=key-0
12:30PM INF [PRODUCER] published message key=key-1
12:30PM INF [PRODUCER] published message key=key-2
12:30PM INF [PRODUCER] published message key=key-3
12:30PM INF [PRODUCER] published message key=key-4
12:30PM INF [PRODUCER] published message key=key-5
12:30PM INF [PRODUCER] published message key=key-6
12:30PM INF [PRODUCER] published message key=key-7
12:30PM INF [PRODUCER] published message key=key-8
12:30PM INF [PRODUCER] published message key=key-9
12:30PM INF [MESSAGE] Completed key=key-4 offset=0 partition=1 duration=136.091541ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-0 offset=0 partition=0 duration=193.991542ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-5 offset=1 partition=1 duration=145.073916ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-1 offset=1 partition=0 duration=182.084584ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-6 offset=2 partition=1 duration=123.971792ms status=FAIL
12:30PM INF [MESSAGE] Completed key=key-2 offset=2 partition=0 duration=196.630375ms status=FAIL
12:30PM INF [SEQUENTIAL] Consumers exited count=6
12:30PM INF [MESSAGE] Completed key=key-7 offset=3 partition=1 duration=128.025958ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-3 offset=3 partition=0 duration=182.017083ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-8 offset=4 partition=0 duration=148.027208ms status=SUCCESS
12:30PM INF [MESSAGE] Completed key=key-9 offset=5 partition=0 duration=165.088125ms status=SUCCESS

Sequential with Manual Offset

❯ go run *.go sequential --manual
12:36PM INF [ADMIN] created topic name=seq-cl689r6nr2kh6ofv0sjg partitions=2 result=[seq-cl689r6nr2kh6ofv0sjg]
12:36PM INF [PRODUCER] published message key=key-0
12:36PM INF [PRODUCER] published message key=key-1
12:36PM INF [PRODUCER] published message key=key-2
12:36PM INF [PRODUCER] published message key=key-3
12:36PM INF [PRODUCER] published message key=key-4
12:36PM INF [PRODUCER] published message key=key-5
12:36PM INF [PRODUCER] published message key=key-6
12:36PM INF [PRODUCER] published message key=key-7
12:36PM INF [PRODUCER] published message key=key-8
12:36PM INF [PRODUCER] published message key=key-9
12:36PM INF [MESSAGE] Completed key=key-0 offset=0 partition=0 duration=85.111958ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-4 offset=0 partition=1 duration=103.899667ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-1 offset=1 partition=0 duration=39.068833ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-2 offset=2 partition=0 duration=39.060875ms status=FAIL
12:36PM INF [MESSAGE] Completed key=key-5 offset=1 partition=1 duration=179.055292ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-6 offset=2 partition=1 duration=24.081667ms status=FAIL
12:36PM INF [SEQUENTIAL] Consumers exited count=6
12:36PM INF [MESSAGE] Completed key=key-6 offset=2 partition=1 duration=70.0275ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-2 offset=2 partition=0 duration=182.020417ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-7 offset=3 partition=1 duration=117.638167ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-3 offset=3 partition=0 duration=103.020375ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-8 offset=4 partition=0 duration=129.025417ms status=SUCCESS
12:36PM INF [MESSAGE] Completed key=key-9 offset=5 partition=0 duration=47.046708ms status=SUCCESS
sonnes commented 7 months ago

Async

❯ go run *.go async 
3:42PM INF [ADMIN] created topic name=seq-cl6b16mnr2khgj4hhhd0 partitions=2 result=[seq-cl6b16mnr2khgj4hhhd0]
3:42PM INF [PRODUCER] published message key=key-0
3:42PM INF [PRODUCER] published message key=key-1
3:42PM INF [PRODUCER] published message key=key-2
3:42PM INF [PRODUCER] published message key=key-3
3:42PM INF [PRODUCER] published message key=key-4
3:42PM INF [PRODUCER] published message key=key-5
3:42PM INF [PRODUCER] published message key=key-6
3:42PM INF [PRODUCER] published message key=key-7
3:42PM INF [PRODUCER] published message key=key-8
3:42PM INF [PRODUCER] published message key=key-9
3:43PM INF [MESSAGE] Completed key=key-4 offset=0 partition=1 duration=28.03525ms status=SUCCESS
3:43PM INF [MESSAGE] Completed key=key-1 offset=1 partition=0 duration=105.686291ms status=SUCCESS
3:43PM INF [MESSAGE] Completed key=key-6 offset=2 partition=1 duration=126.102ms status=FAIL
3:43PM INF [MESSAGE] Completed key=key-2 offset=2 partition=0 duration=54.840125ms status=FAIL
3:43PM INF [MESSAGE] Completed key=key-0 offset=0 partition=0 duration=177.810334ms status=SUCCESS
3:43PM INF [MESSAGE] Completed key=key-5 offset=1 partition=1 duration=197.134792ms status=SUCCESS
3:43PM INF [MESSAGE] Completed key=key-3 offset=3 partition=0 duration=155.128209ms status=FAIL
3:43PM INF [MESSAGE] Completed key=key-7 offset=3 partition=1 duration=166.596292ms status=FAIL
3:43PM INF [SEQUENTIAL] Consumers exited count=8
3:43PM INF [MESSAGE] Completed key=key-9 offset=5 partition=0 duration=151.053375ms status=SUCCESS
3:43PM INF [MESSAGE] Completed key=key-8 offset=4 partition=0 duration=195.054959ms status=SUCCESS

Async with Manual Offset

❯ go run *.go async --manual 
3:46PM INF [ADMIN] created topic name=seq-cl6b33enr2khhg1duvgg partitions=2 result=[seq-cl6b33enr2khhg1duvgg]
3:46PM INF [PRODUCER] published message key=key-0
3:46PM INF [PRODUCER] published message key=key-1
3:46PM INF [PRODUCER] published message key=key-2
3:46PM INF [PRODUCER] published message key=key-3
3:46PM INF [PRODUCER] published message key=key-4
3:46PM INF [PRODUCER] published message key=key-5
3:46PM INF [PRODUCER] published message key=key-6
3:46PM INF [PRODUCER] published message key=key-7
3:46PM INF [PRODUCER] published message key=key-8
3:46PM INF [PRODUCER] published message key=key-9
3:46PM INF [MESSAGE] Completed key=key-4 offset=0 partition=1 duration=29.070666ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-6 offset=2 partition=1 duration=54.871ms status=FAIL
3:46PM INF [MESSAGE] Completed key=key-1 offset=1 partition=0 duration=103.057666ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-2 offset=2 partition=0 duration=21.103833ms status=FAIL
3:46PM INF [MESSAGE] Completed key=key-0 offset=0 partition=0 duration=129.675166ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-7 offset=3 partition=1 duration=50.656959ms status=FAIL
3:46PM INF [MESSAGE] Completed key=key-5 offset=1 partition=1 duration=174.065791ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-3 offset=3 partition=0 duration=57.945041ms status=FAIL
3:46PM INF [SEQUENTIAL] Consumers exited count=8
3:46PM INF [MESSAGE] Completed key=key-6 offset=2 partition=1 duration=54.035375ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-7 offset=3 partition=1 duration=129.027875ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-3 offset=3 partition=0 duration=146.022584ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-2 offset=2 partition=0 duration=146.072125ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-9 offset=5 partition=0 duration=21.040375ms status=SUCCESS
3:46PM INF [MESSAGE] Completed key=key-8 offset=4 partition=0 duration=24.390833ms status=SUCCESS
sonnes commented 7 months ago

Based on the simulation, by using the default behavior of storing the offsets before processing the message, users will loose the message if the consumer encounters an unhandled error.

xkafka.Consumer should change the default behavior to always store the offsets after processing the message. This will ensure that errored messages are not lost and can be retried when the consumer is restarted.

sonnes commented 7 months ago

This is a proposal to define the behavior of xkafka.Consumer for sequential and async consumption along with offset management strategies.

The processing mode is determined by the xkafka.Concurrency option. By default, the consumer is initialized with enable.auto.offset.store=false. The offset is "stored" after the message is processed. The offset is "committed" based on the enable.auto.commit & auto.commit.interval.ms options.

It is important to understand the difference between "store" and "commit". The offset is "stored" in the consumer's memory and is "committed" to Kafka. The offset is "stored" after the message is processed and the message.Status is Success or Skip. The stored offsets will be automatically committed, unless the enable.auto.commit option is set to false.

Error Handling

By default, xkafka.Consumer will stop processing, commit last stored offset, and panic if there is a Kafka error or if the handler returns an error.

Errors can be handled by using one or more of the following options:

xkafka.ErrorHandler is called for every error that is not handled by the handler or the middlewares. It is also called for errors returned by underlying Kafka client.

Sequential Processing

Sequential processing is the default mode. It is same as xkafka.Concurrency(1).

handler := xkafka.HandlerFunc(func(ctx context.Context, message *xkafka.Message) error {
    // ... process message

    message.AckSuccess()

    return nil
})

consumer := xkafka.NewConsumer(
    "manual-commit",
    handler,
    xkafka.Brokers{"localhost:9092"},
    xkafka.Topics{"test"},
)

Async Processing

Async processing is enabled by setting xkafka.Concurrency to a value greater than 1. The consumer will use a pool of Go routines to process messages concurrently.

Manual Commit

By default, the consumer will automatically commit the offset based on the enable.auto.commit & auto.commit.interval.ms options. The offsets are committed asynchronously in the background.

The consumer can be configured to commit the offset manually by setting xkafka.EnableManualCommit option to true. When ManualCommit is enabled, the consumer will synchronously commit the offset after each message is processed.

NOTE: Enabling ManualCommit will add an overhead to each message. It is recommended to use ManualCommit only when necessary.

Queues

Both sequential and async processing modes require some form of a queue to guarantee at-least-once processing in case of errors, crashes, or restarts.

The queue can be a local on-disk queue (BadgerDB, SQLite, etc.) or a distributed queue (Redis, RabbitMQ, etc.).

Consumers can be configured to use a queue by setting xkafka.Queue option. The queue must implement xkafka.Queuer interface.

type Queuer interface {
    Put(ctx context.Context, message *Message) error
    Get(ctx context.Context, id string) (*Message, error)
    Delete(ctx context.Context, id string) error
    All(ctx context.Context, cb func(message *Message) error) error
}

queue := xkafkaqueue.NewSQLiteQueue("default", "/tmp/xkafka.db")

consumer := xkafka.NewConsumer(
    "default",
    handler,
    xkafka.Brokers{"localhost:9092"},
    xkafka.Topics{"test"},
    xkafka.Queue(queue),
)
ajatprabha commented 7 months ago

It would be great if we can add an example for a consumer which uses async processing but through the help of either middleware, or a wrapped handler, can also acquire locks based a particular criteria.

Example: Say I have a queue where any message can belong to arbitrary set of IDs, however, I want to process messages for a single ID sequentially, but other IDs can be concurrent with similar sequential guarantee. This will give best of both worlds. The example can be bare minimum, but thinking on this will give us clarity whether the API can support it or not. It will also highlight how can the offset be stored in this case. Can they be stored out of order, or they have to be stored in order too?