LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

GlobalKTable should provide consumer lag metrics #317

Closed tsuz closed 1 month ago

tsuz commented 3 months ago

Description

Java version of Kafka Streams emits a consumer lag for GlobalKTable. For example,

# HELP kafka_consumer_consumer_fetch_manager_metrics_records_lag The latest lag of the partition kafka.consumer:name=null,type=consumer-fetch-manager-metrics,attribute=records-lag
# TYPE kafka_consumer_consumer_fetch_manager_metrics_records_lag gauge
kafka_consumer_consumer_fetch_manager_metrics_records_lag{client_id="ks-test-4-28609cab-a6c3-49fb-8a16-effb44f14bfa-global-consumer",topic="transactions",partition="4",client_type="consumer",} 0.0
kafka_consumer_consumer_fetch_manager_metrics_records_lag{client_id="ks-test-4-28609cab-a6c3-49fb-8a16-effb44f14bfa-global-consumer",topic="transactions",partition="2",client_type="consumer",} 0.0
kafka_consumer_consumer_fetch_manager_metrics_records_lag{client_id="ks-test-4-28609cab-a6c3-49fb-8a16-effb44f14bfa-global-consumer",topic="transactions",partition="5",client_type="consumer",} 0.0
kafka_consumer_consumer_fetch_manager_metrics_records_lag{client_id="ks-test-4-28609cab-a6c3-49fb-8a16-effb44f14bfa-global-consumer",topic="transactions",partition="0",client_type="consumer",} 0.0
kafka_consumer_consumer_fetch_manager_metrics_records_lag{client_id="ks-test-4-28609cab-a6c3-49fb-8a16-effb44f14bfa-global-consumer",topic="transactions",partition="3",client_type="consumer",} 0.0
kafka_consumer_consumer_fetch_manager_metrics_records_lag{client_id="ks-test-4-28609cab-a6c3-49fb-8a16-effb44f14bfa-global-consumer",topic="transactions",partition="1",client_type="consumer",} 0.0

However, this is not emitted in dotnet.

How to reproduce

  1. Run the sample below
var config = new StreamConfig<StringSerDes, StringSerDes>();
    config.ApplicationId = "test-app2";
    config.BootstrapServers = "localhost:9092";
    config.UsePrometheusReporter(9098, true);
    config.MetricsRecording = Streamiz.Kafka.Net.Metrics.MetricsRecordingLevel.DEBUG;
    config.StateDir = "./state-store";
    config.AutoOffsetReset = AutoOffsetReset.Latest;

    var isRunningState = false;
    StreamBuilder builder = new StreamBuilder();

    var storeName = "mystore2";
    var mystore = RocksDb.As<string, string>(storeName);

    builder.GlobalTable("transactions",  mystore);

    Topology t = builder.Build();
    KafkaStream stream = new KafkaStream(t, config);

    stream.StateChanged += (old, @new) =>
    {
        if (@new.Equals(KafkaStream.State.RUNNING))
        {
            isRunningState = true;
            Console.WriteLine("Running now");
        }
    };

    await stream.StartAsync();

    while (!isRunningState) {
        Thread.Sleep(500);
        Console.WriteLine("Waiting for running state");
    }
  1. Run curl localhost:9098

Checklist

Please provide the following information:

LGouellec commented 2 months ago

@tsuz I did some tests of the local branch attached in this issue. Now, we have the consumer lag per Global Consumer Thread

stream_librdkafka_consumer_metrics_consumer_lag{thread_id="test-app-146c8352-0c9c-46d6-a0af-3fa8323dca68-GlobalStreamThread",librdkafka_client_id="test-app-146c8352-0c9c-46d6-a0af-3fa8323dca68-GlobalStreamThread",application_id="test-app",broker_id="2",topic="Input2",partition_id="0"} -1

But the lag is still to -1, because the GlobalKTable never commit the offset.

We will probably use the consumer_lag_stored stats in this case : https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md

LGouellec commented 2 months ago

@tsuz Can you test the PR with your use case and let me know if it's good for you ?

tsuz commented 2 months ago

@LGouellec

Fantastic. Thank you for the quick turnaround.

I see this topic has 1 partition but has two consumer lag where one is -1? Is this a limitation on the kafka dotnet side?

stream_librdkafka_consumer_metrics_consumer_lag{thread_id="test-app-rocksdbconcurrency2-004fb3c0-3c28-42ca-ab2f-79c67a009c42-GlobalStreamThread",librdkafka_client_id="test-app-rocksdbconcurrency2-004fb3c0-3c28-42ca-ab2f-79c67a009c42-GlobalStreamThread",application_id="test-app-rocksdbconcurrency2",broker_id="11",topic="mytopic",partition_id="0"} 249
stream_librdkafka_consumer_metrics_consumer_lag{thread_id="test-app-rocksdbconcurrency2-004fb3c0-3c28-42ca-ab2f-79c67a009c42-GlobalStreamThread",librdkafka_client_id="test-app-rocksdbconcurrency2-004fb3c0-3c28-42ca-ab2f-79c67a009c42-GlobalStreamThread",application_id="test-app-rocksdbconcurrency2",broker_id="-1",topic="mytopic",partition_id="-1"} -1
LGouellec commented 2 months ago

@tsuz

Not a limitation more a classic behavior.

Librdkafka expose a metric with a partition -1 for internal assigned partition : https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md#partitions

But GlobalKtable use assign(..), so the -1 partition is never used, that's why you always have the partition -1 metric lag at -1

tsuz commented 2 months ago

@LGouellec Thanks, that makes sense. So far it looks good on my end of testing. 👍