SignalK / signalk-server-java

DEPRECATED - see https://github.com/SignalK/signalk-java
Apache License 2.0
6 stars 9 forks source link

Race condition in FullExportProcessor #19

Open jboynes opened 8 years ago

jboynes commented 8 years ago

I'm getting repeated test failures for this class. @rob42 I'm not sure what you're running on, if it's multi-core; I'm seeing this on a iMac with an i7 processor (so 4 cores, 8 threads). I think it's due to a race condition in sending instant deltas: there's a window between when the queue is drained and the thread dies (i.e. isActive() is false) where additional messages will be queued but not sent. They should get sent when the next change event is received but that is not the behaviour the tests expect.

Another factor in this test is that there are now three components to the change: in addition to value there are also events for source and timestamp The way this is written, it will immediately send the first and then wait to send the other two. This should be minPeriod later but will actually be when the next change event is received. As a subscriber I would expect to see all three changes grouped together int he first message.

I can see a quick fix to support FIXED and INSTANT policies but supporting IDEAL now we have multiple leaf nodes changing will be a larger change. I'll submit a PR for the quick one with the IDEAL mode change to follow.

rob42 commented 8 years ago

I have a now lowly laptop with 2 cores - but I have a Rpi with 4 cores.... Ive been meaning to look at the whole event propagation thing with very large scaling - any ideas welcome.

I thought the events for source and timestamp were ignored?

jboynes commented 8 years ago

The source/timestamp events are being generated by the test. I've not checked if actual model filters them but I don't think it should as updates to those should be sent to the subscriber as well (otherwise the value will get out of sync with its metadata). It general, we will see bursts of node changes even for values. For example, a change in a "true" value may also affect the "magnetic" nodes, a change in "depth from transducer" would also change "depth from surface" and "depth under keel" nodes, basically any results from a calculation chain deriving from an affected input.

The burst comes from the model sending individual events for each leaf node that changes. That does not seem correct to me. Instead, I would expect it to send one event containing all nodes that were modified by a change, perhaps represented as a delta message. IOW, a change comes in from a sensor, it gets applied to the model, triggering a ripple of derivative changes, resulting in a delta message containing all changes, that can then be committed atomically. The delta (containing all changes) is then sent to subscribers, either instantly or merged with other deltas for rate limited subscribers.

This can be scaled in a couple of ways. Calculating and applying the changes can be separated from distributing them to subscribers. Calculation can be serialized for simplicity, or parallelized with optimistic lock for high update rates. Calculations can be checkpointed if needed to avoid redo-ing too much work. The distribution mechanism simply relies on the commit delta stream and can be parallelized as needed.

For a small boat, all updates could be handled by a single master. For a high-reliabilty environment you could run multiple masters with consensus voting on the commit stream. You could use a similar approach for a large scale application tracking thousands of vessels.

The biggest API change here would be to SignalKModel to replace the put operations with one that applied an atomic change. I think apply(delta) would be a simple alternative, but you could also have some kind of lock, put, commit combination.