LGouellec / streamiz

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/streamiz/
MIT License
469 stars 75 forks source link

Exactly Once Issues #385

Open hedmavx opened 4 days ago

hedmavx commented 4 days ago

Description

When running on the dev branch (future 1.7 version) or 1.6 we encounter the following issues (OS: macOS 15.1 (24B83)):

[09:59:31:140 DBG] Fabians-MBP 78 Log producer trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-6-streamiz-producer#producer-35 - [thrd:main]: TxnCoordinator/1: Failed to add partition "aoshima-impersonation-events" [6] to transaction: Broker: Producer attempted an operation with an old epoch <s:Streamiz.Kafka.Net.Kafka.KafkaLoggerAdapter>
[09:59:31:140 DBG] Fabians-MBP 78 Log producer trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-6-streamiz-producer#producer-35 - [thrd:main]: Fatal transaction error: Failed to add partitions to transaction: Local: This instance has been fenced by a newer instance (_FENCED) <s:Streamiz.Kafka.Net.Kafka.KafkaLoggerAdapter>
[09:59:31:140 DBG] Fabians-MBP 78 Log producer trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-6-streamiz-producer#producer-35 - [thrd:main]: Fatal error: Local: This instance has been fenced by a newer instance: Failed to add partitions to transaction: Local: This instance has been fenced by a newer instance <s:Streamiz.Kafka.Net.Kafka.KafkaLoggerAdapter>
[09:59:31:142 ERR] Fabians-MBP 64 stream-thread[.NET Long Running Task] Error producer trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-6-streamiz-producer#producer-35 - Failed to add partitions to transaction: Local: This instance has been fenced by a newer instance <s:Streamiz.Kafka.Net.Kafka.KafkaLoggerAdapter>
[09:59:31:145 ERR] Fabians-MBP 64 stream-task[2|6] Error encountered sending record to topic trex-transaction-events for task 2-6 due to:
stream-task[2|6] Error Code : Local_PurgeQueue
stream-task[2|6] Message : Local: Purged in queue
stream-task[2|6] Exception handler choose to FAIL the processing, no more records would be sent.
 <s:Streamiz.Kafka.Net.Kafka.Internal.RecordCollector>
[09:59:31:145 ERR] Fabians-MBP 64 stream-task[2|6] Error encountered sending record to topic trex-transactions-ktable-repartition for task 2-6 due to:
stream-task[2|6] Error Code : Local_PurgeQueue
stream-task[2|6] Message : Local: Purged in queue
stream-task[2|6] Exception handler choose to FAIL the processing, no more records would be sent.
 <s:Streamiz.Kafka.Net.Kafka.Internal.RecordCollector>
[09:59:31:145 ERR] Fabians-MBP 64 stream-task[4|6] Error encountered sending record to topic aoshima-impersonation-events for task 4-6 due to:
stream-task[4|6] Error Code : Local_PurgeQueue
stream-task[4|6] Message : Local: Purged in queue
stream-task[4|6] Exception handler choose to FAIL the processing, no more records would be sent.
 <s:Streamiz.Kafka.Net.Kafka.Internal.RecordCollector>
[09:59:31:149 ERR] Fabians-MBP 65 stream-task[4|6] Error encountered sending record to topic trex-impersonations-ktable-repartition for task 4-6 due to:
stream-task[4|6] Error Code : Local_Fenced
stream-task[4|6] Message : Local: This instance has been fenced by a newer instance
stream-task[4|6] Exception handler choose to FAIL the processing, no more records would be sent.
 <s:Streamiz.Kafka.Net.Kafka.Internal.RecordCollector>
[09:59:31:149 ERR] Fabians-MBP 65 Failed to process stream task 4-6 due to
[09:59:31:149 ERR] Fabians-MBP 65 Failed to process stream task 4-6 due to the following error: <s:Streamiz.Kafka.Net.Processors.Internal.TaskManager>
Streamiz.Kafka.Net.Errors.StreamsException: Error encountered trying to send record to topic trex-impersonations-ktable-repartition [stream-task[4|6] ] : Local: This instance has been fenced by a newer instance
   at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.ManageProduceException(ProduceException`2 produceException)
at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.SendInternal[K,V](String topic, K key, V value, Headers headers, Nullable`1 partition, Int64 timestamp, ISerDes`1 keySerializer, ISerDes`1 valueSerializer)
   at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.Send[K,V](String topic, K key, V value, Headers headers, Int64 timestamp, ISerDes`1 keySerializer, ISerDes`1 valueSerializer)
   at Streamiz.Kafka.Net.Processors.SinkProcessor`2.Process(K key, V value)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.<>c__DisplayClass39_0.<Forward>b__0(IProcessor genericProcessor)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(IEnumerable`1 processors, Action`1 action)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
   at Streamiz.Kafka.Net.Processors.KStreamFilterProcessor`2.Process(K key, V value)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(Object key, Object value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.<>c__DisplayClass37_0`2.<Forward>b__0(IProcessor processor)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(IEnumerable`1 processors, Action`1 action)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward[K1,V1](K1 key, V1 value)
   at Streamiz.Kafka.Net.Processors.KStreamMapProcessor`4.Process(K key, V value)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.<>c__DisplayClass39_0.<Forward>b__0(IProcessor genericProcessor)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(IEnumerable`1 processors, Action`1 action)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Forward(K key, V value)
   at Streamiz.Kafka.Net.Processors.SourceProcessor`2.Process(K key, V value)
at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(Object key, Object value)
   at Streamiz.Kafka.Net.Processors.AbstractProcessor`2.Process(ConsumeResult`2 record)
   at Streamiz.Kafka.Net.Processors.StreamTask.<>c__DisplayClass46_0.<Process>b__0()
   at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
   at Streamiz.Kafka.Net.Processors.StreamTask.Process()
   at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Process(Int64 now)
   at Streamiz.Kafka.Net.Processors.StreamThread.<>c__DisplayClass66_0.<Run>b__4()
   at Streamiz.Kafka.Net.Crosscutting.ActionHelper.MeasureLatency(Action action)
   at Streamiz.Kafka.Net.Processors.StreamThread.Run()
[09:59:31:151 ERR] Fabians-MBP 65 stream-thread[trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-6] Failed to close stream thread due to the following error: <s:Streamiz.Kafka.Net.Processors.StreamThread>
Streamiz.Kafka.Net.Errors.ProductionException: stream-task[2|6] Error encountered sending record to topic trex-transactions-ktable-repartition for task 2-6 due to:
stream-task[2|6] Error Code : Local_PurgeQueue
stream-task[2|6] Message : Local: Purged in queue
stream-task[2|6] Exception handler choose to FAIL the processing, no more records would be sent.

at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.CheckForException()
at Streamiz.Kafka.Net.Kafka.Internal.RecordCollector.Flush()
at Streamiz.Kafka.Net.Processors.StreamTask.FlushState()
at Streamiz.Kafka.Net.Processors.StreamTask.PrepareCommit()
at Streamiz.Kafka.Net.Processors.Internal.TaskManager.Close()
at Streamiz.Kafka.Net.Processors.StreamThread.CompleteShutdown()
[09:59:42:472 ERR] Fabians-MBP 35 stream-thread[trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-1] Encountered the following unexpected Kafka exception during assignment partitions (aoshima-impersonations [[2]],aoshima-impersonations [[3]],test-topic-impersonations [[2]],test-topic-impersonations [[3]],trex-roles-ktable-repartition [[2]],trex-roles-ktable-repartition [[3]],trex-transactions [[2]],trex-transactions [[3]],test-topic-transactions [[2]],test-topic-transactions [[3]],trex-transactions-by-ref-ktable-repartition [[2]],trex-transactions-by-ref-ktable-repartition [[3]],trex-impersonations-ktable-repartition [[2]],trex-impersonations-ktable-repartition [[3]],trex-transactions-ktable-repartition [[2]],trex-transactions-ktable-repartition [[3]]), this usually indicate Streams internal errors <s:Streamiz.Kafka.Net.Processors.StreamThread>
System.IndexOutOfRangeException: Index was outside the bounds of the array.
   at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior)
at Streamiz.Kafka.Net.Metrics.StreamMetricsRegistry.GetSensor[T](String name, String description, MetricsRecordingLevel metricsRecordingLevel, Sensor[] parents)
at Streamiz.Kafka.Net.Metrics.StreamMetricsRegistry.GetSensor(IDictionary`2 listSensors, String sensorName, String key, String description, MetricsRecordingLevel metricsRecordingLevel, Sensor[] parents)
   at Streamiz.Kafka.Net.Metrics.StreamMetricsRegistry.TaskLevelSensor(String threadId, TaskId taskId, String sensorName, String description, MetricsRecordingLevel metricsRecordingLevel, Sensor[] parents)
   at Streamiz.Kafka.Net.Metrics.Internal.TaskMetrics.ActiveRestorationSensor(String threadId, TaskId taskId, StreamMetricsRegistry metricsRegistry)
   at Streamiz.Kafka.Net.Processors.StreamTask.RegisterSensors()
   at Streamiz.Kafka.Net.Processors.StreamTask..ctor(String threadId, TaskId id, IEnumerable`1 partitions, ProcessorTopology processorTopology, IConsumer`2 consumer, IStreamConfig configuration, IKafkaSupplier kafkaSupplier, StreamsProducer producer, IChangelogRegister changelogRegister, StreamMetricsRegistry streamMetricsRegistry)
   at Streamiz.Kafka.Net.Processors.Internal.TaskCreator.CreateTask(IConsumer`2 consumer, StreamsProducer producer, TaskId id, IEnumerable`1 partitions)
   at Streamiz.Kafka.Net.Processors.Internal.AbstractTaskCreator`1.CreateTasks(IConsumer`2 consumer, StreamsProducer producer, IDictionary`2 tasksToBeCreated)
at Streamiz.Kafka.Net.Processors.Internal.TaskManager.CreateTasks(ICollection`1 assignment)
   at Streamiz.Kafka.Net.Kafka.Internal.StreamsRebalanceListener.PartitionsAssigned(IConsumer`2 consumer, List`1 partitions)
[09:59:42:477 ERR] Fabians-MBP 59 stream-thread[trex-b687fc29-5c70-4206-bc61-a6e8c100dd2c-stream-thread-5] Encountered the following unexpected Kafka exception during assignment partitions (aoshima-impersonations [[7]],test-topic-impersonations [[7]],trex-roles-ktable-repartition [[7]],trex-transactions [[7]],test-topic-transactions [[7]],trex-transactions-by-ref-ktable-repartition [[7]],trex-impersonations-ktable-repartition [[7]],trex-transactions-ktable-repartition [[7]]), this usually indicate Streams internal errors <s:Streamiz.Kafka.Net.Processors.StreamThread>
Streamiz.Kafka.Net.Errors.ProcessorStateException: Error opening store roles-ktable at location /var/folders/ms/bfhknk7906s4smmy39w5ntd00000gn/T/streamiz-kafka-net/trex/7-7/rocksdb/roles-ktable
 ---> RocksDbSharp.RocksDbException: IO error: lock hold by current process, acquire time 1731059970 acquiring thread 29079318528: /var/folders/ms/bfhknk7906s4smmy39w5ntd00000gn/T/streamiz-kafka-net/trex/7-7/rocksdb/roles-ktable/LOCK: No locks available
   at RocksDbSharp.Native.rocksdb_open_column_families(IntPtr options, IntPtr name, Int32 num_column_families, String[] column_family_names, IntPtr[] column_family_options, IntPtr[] column_family_handles)
   at RocksDbSharp.RocksDb.Open(DbOptions options, String path, ColumnFamilies columnFamilies)
   at Streamiz.Kafka.Net.State.RocksDbKeyValueStore.OpenRocksDb(DbOptions dbOptions, ColumnFamilyOptions columnFamilyOptions)
   --- End of inner exception stack trace ---
   at Streamiz.Kafka.Net.State.RocksDbKeyValueStore.OpenRocksDb(DbOptions dbOptions, ColumnFamilyOptions columnFamilyOptions)
   at Streamiz.Kafka.Net.State.RocksDbKeyValueStore.OpenDatabase(ProcessorContext context)
   at Streamiz.Kafka.Net.State.RocksDbKeyValueStore.Init(ProcessorContext context, IStateStore root)
   at Streamiz.Kafka.Net.State.Internal.WrappedStateStore`1.Init(ProcessorContext context, IStateStore root)
at Streamiz.Kafka.Net.State.Internal.WrappedStateStore`1.Init(ProcessorContext context, IStateStore root)
   at Streamiz.Kafka.Net.State.Cache.CachingKeyValueStore.Init(ProcessorContext context, IStateStore root)
   at Streamiz.Kafka.Net.State.Internal.WrappedStateStore`1.Init(ProcessorContext context, IStateStore root)
at Streamiz.Kafka.Net.State.Internal.WrappedKeyValueStore`2.Init(ProcessorContext context, IStateStore root)
   at Streamiz.Kafka.Net.State.Metered.MeteredKeyValueStore`2.Init(ProcessorContext context, IStateStore root)
at Streamiz.Kafka.Net.Processors.AbstractTask.RegisterStateStores()
at Streamiz.Kafka.Net.Processors.StreamTask.InitializeStateStores()
at Streamiz.Kafka.Net.Processors.Internal.TaskManager.CreateTasks(ICollection`1 assignment)

How to reproduce

streamiz

config

StreamConfig<StringSerDes, StringSerDes>(
ApplicationId = config.ServiceConfig.ServiceName,
BootstrapServers = config.Kafka.BootstrapServers,
AutoRegisterSchemas = true,
SchemaRegistryUrl = config.Kafka.SchemaRegistry.ToString(),
NumStreamThreads = 8,
ReplicationFactor = 1,
AutoOffsetReset = AutoOffsetReset.Earliest,
Guarantee = ProcessingGuarantee.EXACTLY_ONCE,
CommitIntervalMs = 100,
LingerMs = 0,
PollMs = 10,
MaxPollIntervalMs = 10000,
SessionTimeoutMs = 10000,
CompressionType = CompressionType.Zstd,
Logger = loggerFactory,
MetricsRecording = MetricsRecordingLevel.DEBUG,
InnerExceptionHandler = (fun _ -> ExceptionHandlerResponse.FAIL), 
ProductionExceptionHandler = (fun _ -> ProductionExceptionHandlerResponse.FAIL),
DeserializationExceptionHandler = (fun _ _ _ -> ExceptionHandlerResponse.FAIL)
    )

topology

        let streamBuilder = StreamBuilder()

        streamBuilder
            .Stream<string, string>("test-input-topic")
            .Map<string, string>(fun _ _ _ ->
                let newKey = Guid.NewGuid().ToString()
                KeyValuePair.Create(newKey, newKey))
            .GroupByKey()
            .Aggregate<string>(
                (fun _ -> Guid.NewGuid().ToString()),
                (fun key curr acc -> acc),
                RocksDb.As<string, string, StringSerDes, StringSerDes>("test-ktable")
            )
            .ToStream()
            .To("test-output-topic")

        streamBuilder.Build()

docker

services:
  kafka:
    image: confluentinc/cp-kafka:7.7.1
    hostname: kafka
    container_name: kafka
    ports:
      - 29092:29092
      - 29997:9997
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_NUM_PARTITIONS: 8
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997

  karapace-registry:
    image: ghcr.io/aiven-open/karapace:4.0.0
    entrypoint:
      - /bin/bash
      - /opt/karapace/start.sh
      - registry
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      KARAPACE_ADVERTISED_HOSTNAME: karapace-registry
      KARAPACE_BOOTSTRAP_URI: kafka:9092
      KARAPACE_PORT: 8081
      KARAPACE_HOST: 0.0.0.0
      KARAPACE_CLIENT_ID: karapace
      KARAPACE_GROUP_ID: karapace-registry
      KARAPACE_MASTER_ELIGIBILITY: "true"
      KARAPACE_TOPIC_NAME: _schemas
      KARAPACE_LOG_LEVEL: WARNING
      KARAPACE_COMPATIBILITY: FULL
      KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
      KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

  karapace-rest:
    image: ghcr.io/aiven-open/karapace:4.0.0
    entrypoint:
      - /bin/bash
      - /opt/karapace/start.sh
      - rest
    depends_on:
      - kafka
      - karapace-registry
    ports:
      - "8082:8082"
    environment:
      KARAPACE_PORT: 8082
      KARAPACE_HOST: 0.0.0.0
      KARAPACE_ADVERTISED_HOSTNAME: karapace-rest
      KARAPACE_BOOTSTRAP_URI: kafka:9092
      KARAPACE_REGISTRY_HOST: karapace-registry
      KARAPACE_REGISTRY_PORT: 8081
      KARAPACE_ADMIN_METADATA_MAX_AGE: 0
      KARAPACE_LOG_LEVEL: WARNING
      KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
      KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

  kafdrop:
    platform: linux/amd64
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      SCHEMAREGISTRY_CONNECT: "http://localhost:8081"
      CMD_ARGS: "--message.format=PROTOBUF"
    depends_on:
      - kafka
      - karapace-registry

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8888:8080
    depends_on:
      - kafka
      - karapace-registry
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://karapace-registry:8081
      # KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
      # KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
      DYNAMIC_CONFIG_ENABLED: 'true'  # not necessary, added for tests
      KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
      KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

Checklist

Please provide the following information:

hedmavx commented 4 days ago

Before starting up the application, I put 3 messages into the test-input-topic

LGouellec commented 3 days ago

Hey @hedmavx,

Thank you for this amazing report. I'll try to reproduce and fix the issue as soon as possible.

Best regards,