s8sg / goflow

A Golang based high performance, scalable and distributed workflow framework
MIT License
1.06k stars 129 forks source link

fix flow state update error #54

Closed dan2li closed 11 months ago

dan2li commented 1 year ago

Update the counter in the statestore using atomic operations to solve the issue of incorrect node indegree calculations in scenarios of concurrent updates from multiple workers.

s8sg commented 1 year ago

Thank you @dan2li for the pull request. Can you please describe what changes this PR does

dan2li commented 1 year ago

Hi @s8sg

Issue: Suppose there is a DAG as follows, where node C must be executed after the completion of nodes B1, B2, B3, and B4, which is achieved by storing a counter in the StateStore. When each of the preceding nodes (B[x]) is completed, the counter is incremented, and only when the value of the counter equals the indegree of node C, can C be executed. A ↓ B1 B2 B3 B4 ↓ C

In cases where multiple workers run concurrently, multiple B[x] nodes may complete simultaneously and update the counter's value at the same time. However, the current update operation func incrementCounter is not an atomic operation, which may lead to dirty data and cause incorrect real-time indegree calculations for node C.

Solution: Use the RedisStateStore's existing func Incrto update the counter, which calls Redis's atomic method IncrBy for implementation.

Furthermore, this requires other StateStore implementations to provide atomic methods for updating data.

s8sg commented 11 months ago

@dan2li I was trying to reproduce it. And I found the same issue Thank you for raising it.

But is client.IncrBy atomic ?

I remember reverting similar change as it was cause the same issue.

dan2li commented 11 months ago

@s8sg My understanding is that client.IncrBy will send IncrBy commands directly to redis, and redis' single-threaded model ensures the atomicity of commands. This change has been running in our environment for several weeks now, and so far we haven't encountered the same issue. :)