spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
993 stars 606 forks source link

In concurrent scenarios when using function output, the "scst_partition" header may be randomly omitted #2961

Closed hightea closed 3 months ago

hightea commented 3 months ago

This issue is rather similar to https://github.com/spring-cloud/spring-cloud-stream/issues/2299 but in function output context.

In concurrent context, thefunction enhancer (in charge of defining the target partition) can be null. This is caused by the PartitionAwareFunctionWrapper#apply() method. The issue is that if the enhancer is null, the partition key is not enforced by spring cloud stream (header scst_partition), which can be a major issue in message partitioning (wrong partition).

If the input message already have an scst_partition set by another application this header can be "reused", causing issue if the source topic have more partition that the destination topic.

Detection environment :

Having the scst_partition already set causes our data to fail being produced in non existent partitions (in case of concurrency).

To Reproduce Run the StreamBridgeTests#scstPartitionAlwaysSetEvenInConcurrentScenariosWithFunctions() test in the linked PR. The test shouldn't fail

Version of the framework
last (4.1.2)

sobychacko commented 3 months ago

@hightea Thanks for the report. Looks like the test you have in the PR passes. I didn't fully follow through, but could you clarify what needs to be fixed? Maybe provide a way to fail the test?

hightea commented 3 months ago

@sobychacko oh! that's weird ! Did you run the StreamBridgeTests#scstPartitionAlwaysSetEvenInConcurrentScenariosWithFunctions() not the StreamBridgeTests#scstPartitionAlwaysSetEvenInConcurrentScenarios() ? The test should fail out of the box.

The issue is that the PartitionAwareFunctionWrapper is shared between threads and the enhancer can be null for a thread, beacause it's reset to null after each underlying function's apply. Note that I don't see a particuliar reason to reset the enhancer to null after each call, but I can miss something.

sobychacko commented 3 months ago

sorry, i didn't run the test yet - just by going your comment in the original thread above.

Run the StreamBridgeTests#scstPartitionAlwaysSetEvenInConcurrentScenariosWithFunctions() test in the linked PR. The test shouldn't fail.

Which test demonstrates the issue? the one in the PR?

hightea commented 3 months ago

oh sorry, I wasn't clear enough in the description. Yes the test in the PR demonstrates the issue. The test curently fails, but the expected behavior is that it shouldn't fail.

sobychacko commented 3 months ago

@hightea Fixed this issue on upstream/main. Can you try your scenario using the latest snapshot? (4.1.3-SNAPSHOT, that is). Please let us know, if you find more issues related to this. Thanks for bringing this issue up!

hightea commented 3 months ago

@sobychacko thanks for the fix, it works fine!

However I don't see when it could be useful to reset the enhancer to null (avoiding the need to check if there is a partition header). It was added for the #2249 and if I remove it, the corresponding unit test still passes (StreamBridgeTests#test_2249() ).

Anyway, thanks again for the fix and the reactivity!

sobychacko commented 3 months ago

@hightea I didn't fully look into the history of that change. @olegz added it as part of a bug fix, but I didn't want to remove it due to potential (unknown) regressions. If you are up to thoroughly evaluating that code and making sure there are no such regressions, please feel free to send a PR. Then we can review it.