microsoft / psi

Platform for Situated Intelligence
https://github.com/microsoft/psi/wiki
Other
538 stars 96 forks source link

Thread-safety bug for the pipeline's stopping state #272

Open austinbhale opened 1 year ago

austinbhale commented 1 year ago

Hi, I receive exceptions when trying to dispose a data replay pipeline close to its end. Two threads enter Pipeline.Stop at a similar time, which leads to one thread thinking the pipeline is in the Completed state while the other is still trying to stop it.

Context

I am implementing a seeking capability for replaying data from a store using the PsiStoreStreamReader and Importer. The importer reads data from a store and disposes of the pipeline when there are no more messages. The user can stop the replay at any time, which calls Pipeline.Dispose from another thread.

When (T1) the replay stops on its own and (T2) user disposes of the pipeline at a similar time, there are cases where two threads (T1 + T2) have entered Pipeline.Stop simultaneously. The current way of handling this isn't sufficient as the following IsStopping check evaluates to false for both threads, since its state is set to Stopping after the check: https://github.com/microsoft/psi/blob/35cb04ce24569677d3a8da0216e9450e66cbbd83/Sources/Runtime/Microsoft.Psi/Executive/Pipeline.cs#L836

Solution

In terms of keeping consistent with your state pattern, I'm not sure if you want to ensure thread safety for every pipeline state. The most pressing fix was to change: if (this.IsStopping) -> if (Interlocked.CompareExchange(ref this.isPipelineStopping, 1, 0) != 0)

Which is similar to how you check if a pipeline's been disposed: https://github.com/microsoft/psi/blob/35cb04ce24569677d3a8da0216e9450e66cbbd83/Sources/Runtime/Microsoft.Psi/Executive/Pipeline.cs#L682

You could also have this private property private long isPipelineStopping = 0; as a long type so you can keep using the IsStopping getter as IsStopping => Interlocked.Read(ref this.isPipelineStopping) == 1;

chitsaw commented 1 year ago

Thanks for pointing this out and for your suggestions! Yes, this has been broken for some time and we should definitely fix it. However, looking more closely at the code, I see a few additional issues potentially affecting state consistency, so we might need to come up with a more of a holistic solution that would require a bit more thought. Hopefully the workaround you have above will be adequate until we come up with a more permanent fix.