LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
455 stars 74 forks source link

Infinite queue size increase #195

Closed rav13 closed 1 year ago

rav13 commented 1 year ago

Description

Hi @LGouellec,

We are testing heavily streamiz library on production and I think that we stumbled across a potential issue with .net implementation. In some cases our app has issues with reading data by stream and it is pushing them to the queue. These messages in the queue are not being processed further. Also it seems that we have a lag in the consumer group and no further messages are processed from the moment of adding the first message to the queue.

We are using streamiz version 1.3.2 and conflunet.kafka in version 1.9.3.

Log:

[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[0]] metadata found (begin offset: 0 / end offset : 2)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[1]] metadata found (begin offset: 86 / end offset : 85)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[2]] metadata found (begin offset: 15 / end offset : 14)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[3]] metadata found (begin offset: 61 / end offset : 77)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[4]] metadata found (begin offset: 0 / end offset : 139)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[5]] metadata found (begin offset: 594 / end offset : 594)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[6]] metadata found (begin offset: 76 / end offset : 75)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[7]] metadata found (begin offset: 76 / end offset : 75)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[8]] metadata found (begin offset: 1668 / end offset : 1667)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[9]] metadata found (begin offset: 522 / end offset : 521)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[10]] metadata found (begin offset: 20 / end offset : 19)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      State store AppName.StreamsApp.3-store-aggregation-changelog [[11]] metadata found (begin offset: 63 / end offset : 62)
[2022-10-28 09:40:43] dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
      Added partitions with offsets AppName.StreamsApp.3-store-aggregation-changelog-[0]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[1]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[2]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[3]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[4]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[5]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[6]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[7]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[8]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[9]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[10]#Beginning [-2],AppName.StreamsApp.3-store-aggregation-changelog-[11]#Beginning [-2] to the restore consumer, current assignment is AppName.StreamsApp.3-store-aggregation-changelog-[0],AppName.StreamsApp.3-store-aggregation-changelog-[1],AppName.StreamsApp.3-store-aggregation-changelog-[2],AppName.StreamsApp.3-store-aggregation-changelog-[3],AppName.StreamsApp.3-store-aggregation-changelog-[4],AppName.StreamsApp.3-store-aggregation-changelog-[5],AppName.StreamsApp.3-store-aggregation-changelog-[6],AppName.StreamsApp.3-store-aggregation-changelog-[7],AppName.StreamsApp.3-store-aggregation-changelog-[8],AppName.StreamsApp.3-store-aggregation-changelog-[9],AppName.StreamsApp.3-store-aggregation-changelog-[10],AppName.StreamsApp.3-store-aggregation-changelog-[11]
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[0|7] - recordQueue [record-queue-Topic.0-0-7] Adding new record in queue
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[0|7] - recordQueue [record-queue-Topic.0-0-7] Record added in queue. New size : 1
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[0|7] Added record into the buffered queue of partition Topic.0 [[7]], new queue size is 1
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[0|7] - recordQueue [record-queue-Topic.0-0-7] Adding new record in queue
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[0|7] - recordQueue [record-queue-Topic.0-0-7] Record added in queue. New size : 2
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[0|7] Added record into the buffered queue of partition Topic.0 [[7]], new queue size is 2
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[0|7] - recordQueue [record-queue-Topic.0-0-7] Adding new record in queue
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
      stream-task[0|7] - recordQueue [record-queue-Topic.0-0-7] Record added in queue. New size : 3
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
      stream-task[0|7] Added record into the buffered queue of partition Topic.0 [[7]], new queue size is 3
[2022-10-28 09:40:44] dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
      Add 3 records in tasks in 00:00:00.0042329

Stream configuration:

var streamConfig = new StreamConfig<StringSerDes, StringSerDes>
{
    BootstrapServers = _config["KafkaBootstrapServers"],
    ApplicationId = streamAppId,
    CommitIntervalMs = 60000,
    AutoOffsetReset = AutoOffsetReset.Earliest
};

streamConfig.SaslMechanism = SaslMechanism.ScramSha512;
streamConfig.SecurityProtocol = SecurityProtocol.SaslSsl;
streamConfig.SaslUsername = _config["KafkaSaslUsername"];
streamConfig.SaslPassword = ConfigurationHelper.GetConfigurationFromSystemEnvironment("KAFKA_SASL_PASSWORD");
streamConfig.SslCaLocation = _config["KafkaSslCaLocation"];
streamConfig.SslCertificateLocation = _config["KafkaSslCertificateLocation"];
streamConfig.SslKeyLocation = _config["KafkaSslKeyLocation"];
streamConfig.StateDir = "/opt/app/state";
streamConfig.AllowAutoCreateTopics = true;
streamConfig.Partitioner = Partitioner.Murmur2;

Changelog topic config:

cleanup.policy=compact,delete
segment.bytes=1073741824
retention.ms=172800000
message.timestamp.type=CreateTime
retention.bytes=-1

How to reproduce

Don't know how to reproduce

LGouellec commented 1 year ago

AppName.StreamsApp.3-store-aggregation-changelog's topic config :

cleanup.policy=compact,delete,
segment.bytes=1073741824,
retention.ms=172800000,
message.timestamp.type=CreateTime,
retention.bytes=-1
LGouellec commented 1 year ago

Hi @rav13 ,

Thanks for your issue. I will check your issue as soon as possible.

First thing, changelog topic with a cleanup.policy to compact,delete is a little bit dangerous. For your information, Streamiz auto create theirs changelog topics with a cleanup.policy to compact to keep at least the last value for each given key.

I imagine you create this changelog topic manually with your specific cleanup.policy.

LGouellec commented 1 year ago

Merged into the 1.4. 1.4.0 release soon. Stay tuned