Open PugachA opened 3 years ago
Thanks for filing this - I'm taking a look at a similar issue to this now.
FWIW, unrelated to this, but came up during my investigation: .WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider))
does not help restart failed Kafka Source
stages when a downstream fails, which seems off to me. Going to file a separate bug for it.
Version Information
Describe the bug Re-creating SubSource for partition using the
KafkaConsumer.CommittablePartitionedSource
in parallel.Such errors only occur if there is a slow stage in the pipeline. Also, such errors lead to data loss.
To Reproduce Steps to reproduce the behavior:
Configuration
Prepare
Prepare code
```csharp var configuration = ConfigurationFactory.ParseString(File.ReadAllText("application.conf"));using var system = ActorSystem.Create("system", configuration); using var materializer = system.Materializer();
var consumerSettings = ConsumerSettings<string, IpmRequestStatistic>.Create(system, Deserializers.Utf8, new KafkaJsonSerializer())
.WithBootstrapServers("localhost:15000")
.WithGroupId("aidb-group")
.WithProperty("auto.offset.reset", "earliest");
var committerDefaults = CommitterSettings.Create(system);
var maxPartitions = 4; var batchSize = 10000; var parallel = 3;
var config = new ProducerConfig { BootstrapServers = "localhost:15001", ClientId = Dns.GetHostName(), Acks = Acks.All };
var producer = new ProducerBuilder<long, ApplicationInstallationConnect>(config) .SetKeySerializer(Serializers.Int64) .SetValueSerializer(new KafkaJsonSerializer())
.Build();
I see error in log while start app
Logs
``` [DEBUG][28.09.2021 18:14:36][Thread 0001][EventStream(system)] Logger log1-DefaultLogger [DefaultLogger] started [DEBUG][28.09.2021 18:14:36][Thread 0001][EventStream(system)] StandardOutLogger being removed [DEBUG][28.09.2021 18:14:36][Thread 0001][EventStream(system)] Default Loggers started Press any key to stop consumer. [DEBUG][28.09.2021 18:14:38][Thread 0005][akka://system/system/kafka-consumer-2] Creating Kafka consumer with settings: {"KeyDeserializer":{},"ValueDeserializer":{},"PollInterval":"00:00:00.5000000","PollTimeout":"00:00:00.5000000","PartitionHandlerWarning":"00:00:05","WaitClosePartition":"00:00:00.5000000","CommitTimeWarning":"00:00:01","CommitTimeout":"00:00:15","CommitRefreshInterval":"-00:00:00.0010000","DrainingCheckInterval":"00:00:00.0300000","StopTimeout":"00:00:30","PositionTimeout":"00:00:05","BufferSize":50,"DispatcherId":"akka.kafka.default-dispatcher","AutoCreateTopicsEnabled":true,"Properties":{"bootstrap.servers":"localhost:15000","group.id":"aidb-group","enable.auto.commit":"false","auto.offset.reset":"earliest"},"MetadataRequestTimeout":"00:00:05","ConnectionCheckerSettings":{"Enabled":false,"MaxRetries":3,"CheckInterval":"00:00:15","Factor":2.0},"GroupId":"aidb-group"} [DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceLogic`3 (akka://system)] #2 Assigning new partitions: ipmrequeststatistics [[0]], ipmrequeststatistics [[1]], ipmrequeststatistics [[2]], ipmrequeststatistics [[3]] [DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[0]] [DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[1]] [DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[2]] [DEBUG][28.09.2021 18:14:42][Thread 0018][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[3]] [DEBUG][28.09.2021 18:14:54][Thread 0028][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[0]] [DEBUG][28.09.2021 18:15:02][Thread 0026][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[3]] [DEBUG][28.09.2021 18:15:02][Thread 0027][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[2]] 28.09.2021 21:15:02 ipmrequeststatistics [[1]]: Message count=10000 | elapsed=20626 ms | Rps=484,80988 [WARNING][28.09.2021 18:15:03][Thread 0004][akka://system/system/kafka-consumer-2] RequestMessages from topic/partition ipmrequeststatistics [[2]] already requested by other stage ipmrequeststatistics [[2]] 28.09.2021 21:15:12 ipmrequeststatistics [[1]]: Message count=20000 | elapsed=9881 ms | Rps=1011,98175 28.09.2021 21:15:14 ipmrequeststatistics [[0]]: Message count=10000 | elapsed=20359 ms | Rps=491,1697 28.09.2021 21:15:20 ipmrequeststatistics [[2]]: Message count=10000 | elapsed=17892 ms | Rps=558,8913 28.09.2021 21:15:23 ipmrequeststatistics [[3]]: Message count=10000 | elapsed=20394 ms | Rps=490,33783 ```Select(b => MergeDataToDb(b))
the error is not reproducibleLogs
``` [DEBUG][29.09.2021 9:57:34][Thread 0001][EventStream(system)] Logger log1-DefaultLogger [DefaultLogger] started [DEBUG][29.09.2021 9:57:34][Thread 0001][EventStream(system)] StandardOutLogger being removed [DEBUG][29.09.2021 9:57:34][Thread 0001][EventStream(system)] Default Loggers started Press any key to stop consumer. [DEBUG][29.09.2021 9:57:36][Thread 0018][akka://system/system/kafka-consumer-2] Creating Kafka consumer with settings: {"KeyDeserializer":{},"ValueDeserializer":{},"PollInterval":"00:00:00.5000000","PollTimeout":"00:00:00.5000000","PartitionHandlerWarning":"00:00:05","WaitClosePartition":"00:00:00.5000000","CommitTimeWarning":"00:00:01","CommitTimeout":"00:00:15","CommitRefreshInterval":"-00:00:00.0010000","DrainingCheckInterval":"00:00:00.0300000","StopTimeout":"00:00:30","PositionTimeout":"00:00:05","BufferSize":50,"DispatcherId":"akka.kafka.default-dispatcher","AutoCreateTopicsEnabled":true,"Properties":{"auto.offset.reset":"earliest","group.id":"aidb-group","enable.auto.commit":"false","bootstrap.servers":"localhost:15000"},"MetadataRequestTimeout":"00:00:05","ConnectionCheckerSettings":{"Enabled":false,"MaxRetries":3,"CheckInterval":"00:00:15","Factor":2.0},"GroupId":"aidb-group"} [DEBUG][29.09.2021 9:57:39][Thread 0005][SubSourceLogic`3 (akka://system)] #2 Assigning new partitions: ipmrequeststatistics [[0]], ipmrequeststatistics [[1]], ipmrequeststatistics [[2]], ipmrequeststatistics [[3]] [DEBUG][29.09.2021 9:57:39][Thread 0014][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[0]] [DEBUG][29.09.2021 9:57:39][Thread 0019][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[1]] [DEBUG][29.09.2021 9:57:39][Thread 0018][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[2]] [DEBUG][29.09.2021 9:57:39][Thread 0016][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[3]] 29.09.2021 12:58:00 ipmrequeststatistics [[0]]: Message count=10000 | elapsed=20332 ms | Rps=491,83264 29.09.2021 12:58:09 ipmrequeststatistics [[0]]: Message count=20000 | elapsed=9037 ms | Rps=1106,4485 29.09.2021 12:58:10 ipmrequeststatistics [[2]]: Message count=10000 | elapsed=30203 ms | Rps=331,09283 29.09.2021 12:58:10 ipmrequeststatistics [[3]]: Message count=10000 | elapsed=30209 ms | Rps=331,02295 29.09.2021 12:58:10 ipmrequeststatistics [[1]]: Message count=10000 | elapsed=30252 ms | Rps=330,55524 ```Expected behavior Successfully consumer data from kafka partitions in parallel without warning and data loss
Actual behavior Re-creating SubSource for partition and data loss.
Environment Windows, .NET5.0
Additional context Please help to configure Akka Streams for parallel reading data from partitions without warnings and data loss.