Closed lukasswm closed 7 months ago
How can you close your application ? Properly with stream.Dispose()
or stream.Start(token)
or not ?
I do both. the Stream is hosted in a BackgroundService. await _kafkaStream.StartAsync(stoppingToken); get's the cancellation token of ExecuteAsync(CancellationToken stoppingToken) and I override IDisposable of the background service to dispose the KafkaStream propery.
using Streamiz.Kafka.Net;
namespace Test.Streams;
public class Streamer : BackgroundService
{
private KafkaStream? _kafkaStream;
private readonly TopologyBuilder _TopologyBuilder;
private readonly StreamConfigBuilder _StreamConfigBuilder;
public Streamer(TopologyBuilder TopologyBuilder, StreamConfigBuilder StreamConfigBuilder)
{
_TopologyBuilder = TopologyBuilder;
_StreamConfigBuilder = StreamConfigBuilder;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var topology = _TopologyBuilder.Build();
var config = _StreamConfigBuilder.Build();
_kafkaStream = new KafkaStream(topology, config);
await _kafkaStream.StartAsync(stoppingToken);
}
public override void Dispose()
{
_kafkaStream?.Dispose();
base.Dispose();
}
}
@lukasswm Can you enable debug logs and attached the debug log of your application when you reproduce the bug please ?
here is the log. I removed the message payloads and some other information. But I hope it is still fine.
info: Streamiz.Kafka.Net.KafkaStream[0]
stream-application[domain.data.service] Start creation of the stream application with this configuration:
Stream property:
client.id:
num.stream.threads: 1
default.key.serdes: Streamiz.Kafka.Net.SerDes.StringSerDes
default.value.serdes: Streamiz.Kafka.Net.SerDes.StringSerDes
default.timestamp.extractor: Streamiz.Kafka.Net.Processors.Internal.FailOnInvalidTimestamp
commit.interval.ms: 1000
processing.guarantee: AT_LEAST_ONCE
transaction.timeout: 00:00:10
poll.ms: 100
max.poll.records: 500
max.poll.restoring.records: 1000
max.task.idle.ms: 0
buffered.records.per.partition: 1000
inner.exception.handler: System.Func`2[System.Exception,Streamiz.Kafka.Net.ExceptionHandlerResponse]
production.exception.handler: System.Func`2[Confluent.Kafka.DeliveryReport`2[System.Byte[],System.Byte[]],Streamiz.Kafka.Net.ExceptionHandlerResponse]
deserialization.exception.handler: System.Func`4[Streamiz.Kafka.Net.ProcessorContext,Confluent.Kafka.ConsumeResult`2[System.Byte[],System.Byte[]],System.Excepti
on,Streamiz.Kafka.Net.ExceptionHandlerResponse]
rocksdb.config.setter: System.Action`2[System.String,Streamiz.Kafka.Net.State.RocksDb.RocksDbOptions]
follow.metadata: False
state.dir: C:\Users\lukas.schwendemann\AppData\Local\Temp\streamiz-kafka-net
replication.factor: 1
windowstore.changelog.additional.retention.ms: 86400000
offset.checkpoint.manager:
metrics.interval.ms: 30000
metrics.recording.level: INFO
metrics.reporter: System.Action`1[System.Collections.Generic.IEnumerable`1[Streamiz.Kafka.Net.Metrics.Sensor]]
expose.librdkafka.stats: False
start.task.delay.ms: 5000
parallel.processing: False
max.degree.of.parallelism: 8
application.id: domain.data.service
schema.registry.url: localhost:8081
avro.serializer.auto.register.schemas: True
protobuf.serializer.auto.register.schemas: True
Client property:
bootstrap.servers: localhost:9095
metadata.max.age.ms: 500
Consumer property:
max.poll.interval.ms: 300000
enable.auto.commit: False
enable.auto.offset.store: False
partition.assignment.strategy: cooperative-sticky
heartbeat.interval.ms: 200
auto.offset.reset: earliest
Producer property:
queue.buffering.max.kbytes: 10240
Admin client property:
None
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] Creating shared producer client
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] Creating consumer client
info: Streamiz.Kafka.Net.KafkaStream[0]
stream-application[domain.data.service] State transition from CREATED to REBALANCING
info: Streamiz.Kafka.Net.KafkaStream[0]
stream-application[domain.data.service] Starting Streams client with this topology : Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000000 (tocs: [meta-topic])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [table-store])
--> none
<-- KSTREAM-SOURCE-0000000000
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000002 (tocs: [value-topic])
--> KSTREAM-JOIN-0000000003
Processor: KSTREAM-JOIN-0000000003 (stores: [])
--> KSTREAM-FILTER-0000000004
<-- KSTREAM-SOURCE-0000000002
Processor: KSTREAM-FILTER-0000000004 (stores: [])
--> KSTREAM-MAP-0000000005
<-- KSTREAM-JOIN-0000000003
Processor: KSTREAM-MAP-0000000005 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-FILTER-0000000004
Sink: KSTREAM-SINK-0000000006 (toc: domain.private.data)
<-- KSTREAM-MAP-0000000005
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
Starting to apply internal tocs for topology 0 in toc manager (try: 1, max retry : 10).
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
Complete to apply internal tocs in toc manager
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
Starting to apply internal tocs for topology 1 in toc manager (try: 1, max retry : 10).
dbug: Streamiz.Kafka.Net.Processors.DefaultTocManager[0]
Complete to apply internal tocs in toc manager
info: Streamiz.Kafka.Net.Processors.GlobalStreamThread[0]
global-stream-thread domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-GlobalStreamThread Starting
warn: Streamiz.Kafka.Net.State.OffsetCheckpointFile[0]
Read checkpoint offsets from recovery file : C:\Users\lukas.schwendemann\AppData\Local\Temp\streamiz-kafka-net\domain.data.service\global\.checkpoint.rec
info: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Restoring state for global store table-store
dbug: Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter[0]
Log consumer domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-GlobalStreamThread#consumer-1 - [thrd:main]: meta-topic [0]: offset reset (
at offset 2, broker 1001) to BEGINNING: fetch failed due to requested offset not available on the broker: Broker: Offset out of range
info: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Global store table-store is completely restored
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateUpdateTask[0]
Initializing topology with processor source : Streamiz.Kafka.Net.Processors.SourceProcessor`2[System.String,Test.domain.Data.A.Models.MetaData]
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
Process context initialized
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
Process context initialized
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateUpdateTask[0]
Initializing topology with processor source : Streamiz.Kafka.Net.Processors.KTableSourceProcessor`2[System.String,Test.domain.Data.A.Models.MetaData]
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KTableSourceProcessor[0]
Process context initialized
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] Starting
info: Streamiz.Kafka.Net.Processors.GlobalStreamThread[0]
global-stream-thread domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-GlobalStreamThread State transition from CREATED to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State transition from CREATED to STARTING
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks since 2884.9211ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks since 1085.2583ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks since 1090.3973ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.TaskCreator[0]
Created task 1-0 with assigned partition value-topic [[0]]
dbug: Streamiz.Kafka.Net.Processors.Internal.TaskCreator[0]
Created task 1-0 with assigned partition value-topic [[0]]
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Initializing state stores.
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Initializing state stores
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Initializing topology with theses source processors : KSTREAM-SOURCE-0000000002.
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Initializing process context
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Process context initialized
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Process context initialized
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State transition from STARTING to PARTITIONS_ASSIGNED
info: Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener[0]
Partition assignment took 00:00:00.0337474 ms.
Currently assigned active tasks: 1-0
Revoked assigned active tasks:
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Adding new record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Record added in queue. New size : 1
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Added record into the buffered queue of partition value-topic [[0]], new queue size is 1
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Add 1 records in tasks in 00:00:00.2112423
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1016.2206ms has elapsed (commit interval is 1000ms)
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State transition from PARTITIONS_ASSIGNED to RUNNING
info: Streamiz.Kafka.Net.KafkaStream[0]
stream-application[domain.data.service] State transition from REBALANCING to RUNNING
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
stream-thread[domain.data.service-b08971f1-20c2-4fc8-bf6a-73b3d04e8ddb-stream-thread-0] State is RUNNING, initializing and restoring tasks if necessary
info: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Task 1-0 state transition from CREATED to RUNNING
info: Streamiz.Kafka.Net.Processors.StreamThread[0]
Restoration took 365ms for all tasks 1-0
dbug: Streamiz.Kafka.Net.Processors.Internal.StoreChangelogReader[0]
Finished restoring all changelogs
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Polling record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Rrecord polled. (Record info [Toc:value-topic|Partiti
on:[0]|Offset:68])
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Start processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Forward<String,RawData> message with key 89818 and value com.Test.timeseriesanalytics.avro.RawData to each next proces
sor
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next pro
cessor
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata
[toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next p
rocessor
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata [t
oc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Forward<String,String> message with key 123456789 and value {...} to each next proc
essor
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Process<String,String> message with key 123456789 and {...} with record metadata [
toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Completed processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1012.8492ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Comitting
dbug: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
stream-task[1|0] Flushing all stores registered in the state manager
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
stream-task[1|0] Flushing producer
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
stream-task[1|0] Record persisted: (timestamp 1669989569475) toc=[domain.private.data] partition=[[0]] offset=[62]
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committed all active tasks 1-0 in 45.9156ms
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Processing 1 records in 00:00:00.9474470
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Adding new record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Record added in queue. New size : 1
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Added record into the buffered queue of partition value-topic [[0]], new queue size is 1
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Add 1 records in tasks in 00:00:00.0002425
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Polling record in queue
dbug: Streamiz.Kafka.Net.Processors.Internal.RecordQueue[0]
stream-task[1|0] - recordQueue [record-queue-value-topic-1-0] Rrecord polled. (Record info [Toc:value-topic|Partiti
on:[0]|Offset:68])
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Start processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.SourceProcessor[0]
stream-task[1|0]|processor[KSTREAM-SOURCE-0000000002]- Forward<String,RawData> message with key 89818 and value com.Test.timeseriesanalytics.avro.RawData to each next proces
sor
dbug: Streamiz.Kafka.Net.Processors.KStreamKTableJoinProcessor[0]
stream-task[1|0]|processor[KSTREAM-JOIN-0000000003]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next pro
cessor
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata
[toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamFilterProcessor[0]
stream-task[1|0]|processor[KSTREAM-FILTER-0000000004]- Forward<String,TagInfo> message with key 89818 and value Test.domain.Data.A.Models.TagInfo to each next p
rocessor
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Process<String,TagInfo> message with key 89818 and Test.domain.Data.A.Models.TagInfo with record metadata [t
oc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.KStreamMapProcessor[0]
stream-task[1|0]|processor[KSTREAM-MAP-0000000005]- Forward<String,String> message with key 123456789 and value {...} to each next proc
essor
dbug: Streamiz.Kafka.Net.Processors.SinkProcessor[0]
stream-task[1|0]|processor[KSTREAM-SINK-0000000006]- Process<String,String> message with key 123456789 and {...} with record metadata [
toc:value-topic|partition:0|offset:68]
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Completed processing one record [Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475]
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Processing 1 records in 00:00:00.7518565
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
stream-task[1|0] Record persisted: (timestamp 1669989569475) toc=[domain.private.data] partition=[[0]] offset=[63]
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1079.5559ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.StreamTask[0]
stream-task[1|0] Comitting
dbug: Streamiz.Kafka.Net.Processors.Internal.ProcessorStateManager[0]
stream-task[1|0] Flushing all stores registered in the state manager
dbug: Streamiz.Kafka.Net.Kafka.Internal.RecordCollector[0]
stream-task[1|0] Flushing producer
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committed all active tasks 1-0 in 3.1334ms
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1071.6336ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1076.8629ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1083.3077ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1088.1938ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1088.9109ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1089.1086ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1084.8793ms has elapsed (commit interval is 1000ms)
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing all global globalStores registered in the state manager
dbug: Streamiz.Kafka.Net.Processors.Internal.GlobalStateManager[0]
Flushing store table-store
dbug: Streamiz.Kafka.Net.Processors.StreamThread[0]
Committing all active tasks 1-0 since 1084.4777ms has elapsed (commit interval is 1000ms)
@lukasswm ,
Indeed, it seems this record Toc:value-topic|Partition:0|Offset:68|Timestamp:1669989569475
is processed twice.
Do you removed the topic value-topic and recreate with the same name in a short time ? Because it seems that it processed twice, but it also poll twice from the consumer directly.
Can you consume the topic with the CLI like :
kafka-console-consumer --bootstrap-server localhost:9095 --topic value-topic --property print.key=true --from-beginning
And put the results
Description
Referencing the same set up as in #212
Starting the application when there are already messages on a topic will produce the messages twice in the restart process. Issue does not occur every time. feels like a raice condition. but when it occurs it will produce every message twice. At least I tested it with 3 Messages and all of them got duplicated.
How to reproduce