spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

Thread Safety Issue in serializeOnOutbound Method of KafkaStreamsMessageConversionDelegate #2904

Closed LazroLeader closed 9 months ago

LazroLeader commented 9 months ago

Description

I've identified a thread safety issue in the serializeOnOutbound method of the KafkaStreamsMessageConversionDelegate class. Specifically, the usage of PerRecordContentTypeHolder is not thread-safe, which could lead to incorrect behavior in multi-threaded environments (For example when inbound topic has multiple partitions and and streams is processed conncurrently).

Steps to Reproduce

The issue arises in scenarios where the serializeOnOutbound method is invoked concurrently by multiple threads, leading to potential race conditions and incorrect handling of the contentType for each record.

I've implemented POC application for reproducing this scenario: https://github.com/LazroLeader/CloudStreamRaceConditionPOC

Expected Behavior

Each thread should have its own instance of PerRecordContentTypeHolder to ensure that the contentType is handled correctly without interference from other threads, avoiding outbound topic having contentType of null.

Actual Behavior

Due to the shared use of PerRecordContentTypeHolder, there's a risk that concurrent modifications by multiple threads could lead to unpredictable behavior and incorrect contentType processing. This leads to some messages produced to outbound topic having header with key contentType equal to null.

Proposed Solution

I have addressed this issue by introducing a ThreadLocal<PerRecordContentTypeHolder> to ensure that each thread has its own isolated instance of PerRecordContentTypeHolder, thus making the serializeOnOutbound method thread-safe. Here's a snippet of the proposed change:

final ThreadLocal<PerRecordContentTypeHolder> perRecordContentTypeHolderThreadLocal = ThreadLocal.withInitial(PerRecordContentTypeHolder::new);

Also changing every occurance of:

perRecordContentTypeholder

to


perRecordContentTypeHolderThreadLocal.get()
sobychacko commented 9 months ago

@LazroLeader Thanks for finding and addressing this bug. I will merge soon. Just out of curiosity, what prevented you from using the native de/serialization from Kafka Streams, which is the default in the SCSt Kafka Streams binder? The fact that you are opting into the framework-provided message conversion indicates that you have a compelling use case to do so. I am just curious about that use case.

sobychacko commented 9 months ago

Could you add your name as an author and update the copyright year on the class you changed to 2024?

sobychacko commented 9 months ago

@LazroLeader Merged the PR. More PR contributions are welcomed.

Thanks for confirming your use case; that makes sense. Here are the two reasons provided:

Keeping Schemas Separate: Our team works on a bunch of different projects that all evolve on their own. We didn't want to lock ourselves into sharing schemas across these projects to keep things flexible and make life a bit easier.

Ser/Des Config Hassles: Honestly, we've always had problems when trying to get SCSt to play nice with Kafka's native serialization/deserialization. It felt like we were spending too much time on config. Using the framework's conversion methods just streamlined things for us.

This might not be the most efficient way, but the ease of use has been super valuable.

sobychacko commented 9 months ago

@LazroLeader I also backported this chage to the 4.0.x branch via https://github.com/spring-cloud/spring-cloud-stream/commit/b7de020e165fe07ca2c5b30d2bc2caca26a6f66a. It didn't cleanly cherry pick, so I had to manually back-port.