LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

librdkafka stream metric: consumer lag returns -1 #318

Closed tsuz closed 2 months ago

tsuz commented 3 months ago

Description

Expected Consumer lag should return 0 or above if it is indeed consuming.

Observed

stream_librdkafka_consumer_metrics_consumer_lag{thread_id="test-app-stream2-97e42432-fe00-4088-8403-064a54eae71f-stream-thread-0",librdkafka_client_id="test-app-stream2-97e42432-fe00-4088-8403-064a54eae71f-StreamThread-0-streamiz-consumer",application_id="test-app-stream2",broker_id="10",topic="market-rate-v1",partition_id="0"} -1
stream_librdkafka_consumer_metrics_consumer_lag{thread_id="test-app-stream2-97e42432-fe00-4088-8403-064a54eae71f-stream-thread-0",librdkafka_client_id="test-app-stream2-97e42432-fe00-4088-8403-064a54eae71f-StreamThread-0-streamiz-consumer",application_id="test-app-stream2",broker_id="-1",topic="market-rate-v1",partition_id="-1"} -1

How to reproduce

  1. Create a Stream

    var config = new StreamConfig<StringSerDes, StringSerDes>();
    config.ApplicationId = "test-app-stream2";
    config.BootstrapServers = "localhost:9092";
    config.UsePrometheusReporter(9099, true);
    config.Guarantee = ProcessingGuarantee.EXACTLY_ONCE;
    config.MetricsRecording = Streamiz.Kafka.Net.Metrics.MetricsRecordingLevel.DEBUG;

    var isRunningState = false;
    StreamBuilder builder = new StreamBuilder();

    var storeName = "mystore";
    var inmemoryStateStore = InMemory.As<string, string>(storeName);

    var mystream = builder
        .Stream<string, string>("input-topic", new StringSerDes(), new StringSerDes());

    mystream.SelectKey((k, v) => {
            MarketRate member = JsonSerializer.Deserialize<SomeStruct>(v);
            return member!.foo;
        })
        .To("another-topic");

    Topology t = builder.Build();
    KafkaStream stream = new KafkaStream(t, config);

    stream.StateChanged += (old, @new) =>
    {
        if (@new.Equals(KafkaStream.State.RUNNING))
        {
            isRunningState = true;
            Console.WriteLine("Running now");
        }
    };

    await stream.StartAsync();
  1. Produce to input-topic using kafka-producer-perf-test
  2. Run curl localhost:9099 to get the metrics. Notice the lag is -1.
   <PackageReference Include="Streamiz.Kafka.Net" Version="1.5.1" />

Checklist

Please provide the following information:

tsuz commented 2 months ago

This is no longer reproducible. Closing