sociomantic-tsunami / swarm

Asynchronous client/node framework library
Boost Software License 1.0
14 stars 26 forks source link

Fix synchronization for RecordStream #408

Closed daniel-zullo closed 4 years ago

daniel-zullo commented 4 years ago

The stream must be only suspended if the method suspend has been currently called. This is required since ISuspendableThrottler (from ocean v5) ensures that the suspend state of an ISuspendable instance when added to a throttler is consistent with the throttler state. Without this patch-fix if a RecordStream is added as suspendable to a throttler then it will try to process the stream because suspended() was checking for the running state of the fiber and that would make an application crash in the best case or having unexpected behaviour. To avoid this situation the suspended() implementation now only checks if the stream is suspended meaning that suspend() has been currently called.

don-clugston-sociomantic commented 4 years ago

It seems the problem is that we're mapping 3 states onto a bool. Everything assumed that running meant !suspended, and suspended meant !running. The state where it hadn't started used to be mapped onto suspended, with your PR it gets mapped onto !suspended.

My fear is, that this will just shift the "mapping 3 states onto bool" problem into different places.

I think ISuspendableThrottler is wrong. It is a place that definitely needs to know about the three states. (However that might not be the only place that's wrong).

don-clugston-sociomantic commented 4 years ago

At an absolute minimum we need to improve the docs to state exactly what suspended means, and what it should return in the case where the fiber has never started. Probably the real solution is to make sure that this half-constructed state can never exist, it's really complicated and annoying.

daniel-zullo commented 4 years ago

The API of MessageFiber will throw if trying to suspend/resume a fiber that hasn't started. We can't do that for suspended() though and the API for fiber does not provide the suspended() functionality.

According to your comments and to be inline with the current ISuspendable interface, suspended() should only check for the flag indicating whether the suspension has been currently requested or not.

I'll amend the PR and documentation if that sounds good to you.

don-clugston-sociomantic commented 4 years ago

Yeah, that sounds good. Since as far as I can see, this isn't an implementation bug, it's a conceptual problem in the design. So it needs a change to the docs.

daniel-zullo commented 4 years ago

Adapted/updated based on suggestions and points made by Don

don-clugston-sociomantic commented 4 years ago

LGTM