Subscription drop event is enqueued (volatile subscription listener's onClose calls enqueueSubscriptionDropNotification). This sets dropData as well (wrapping drop reason and exception).
Before drop event is processed, connection to ES is re-established. Reconnect hook calls runSubscription (via onReconnect).
In runSubscription, following fields are reset/cleared:
stopped
allowProcessing
isDropped
dropData (as per issue #10)
Drop event is (finally) processed. However, since there is no dropData at this moment, drop reason is reported as unknown to CatchUpSubscriptionListener, which limits possible reactions. (At this point, we shut down the subscription/module entirely in our app since drop reason is unknown, while it would in fact be possible to simply wait for reconnect if original drop reason SubscriptionDropReason.ConnectionClosed was available).
Note that various other semi-problematic scenarios are available in case of reconnect before drop event is processed, but most of those seem to be covered by various checks. The most obvious possible problem is as follows:
All above steps up to drop event processing are the same.
Before drop event gets processed, runSubscription (Runnable lambda it submits to the executor, to be precise) gets executed up to (and including) creation of new volatile subscription (line 193 in CatchUpSubscription in esjc 2.1.0). New volatile subscription is created and started and is assigned to subscription field.
Drop event is processed. Volatile subscription is stopped (by calling subscription.unsubscribe() in dropSubscription), since subscription field is not null. However, it is the new subscription that is stopped, thus stopping the processing of events in seemingly 'live' CatchUpSubscription.
Suggested solution is to wait in runSubscription until previous drop is processed (in case it is enqueued).
Illustrative code:
Replace
isDropped.set(false);
dropData.set(null);
with
while (dropData.get() != null && !isDropped.get()) {}
isDropped.set(false);
dropData.set(null);
(some kind of wait/signal would probably make sense instead of busy-waiting in while loop, but idea is the same - if dropData is set and isDropped is still false, wait).
Consider the following scenario:
CatchUpSubscription
.onClose
callsenqueueSubscriptionDropNotification
). This setsdropData
as well (wrapping drop reason and exception).runSubscription
(viaonReconnect
).runSubscription
, following fields are reset/cleared:stopped
allowProcessing
isDropped
dropData
(as per issue #10)dropData
at this moment, drop reason is reported as unknown toCatchUpSubscriptionListener
, which limits possible reactions. (At this point, we shut down the subscription/module entirely in our app since drop reason is unknown, while it would in fact be possible to simply wait for reconnect if original drop reasonSubscriptionDropReason.ConnectionClosed
was available).Note that various other semi-problematic scenarios are available in case of reconnect before drop event is processed, but most of those seem to be covered by various checks. The most obvious possible problem is as follows:
runSubscription
(Runnable
lambda it submits to the executor, to be precise) gets executed up to (and including) creation of new volatile subscription (line 193 inCatchUpSubscription
in esjc 2.1.0). New volatile subscription is created and started and is assigned tosubscription
field.subscription.unsubscribe()
indropSubscription
), sincesubscription
field is not null. However, it is the new subscription that is stopped, thus stopping the processing of events in seemingly 'live'CatchUpSubscription
.Suggested solution is to wait in
runSubscription
until previous drop is processed (in case it is enqueued).Illustrative code:
Replace
with
(some kind of wait/signal would probably make sense instead of busy-waiting in while loop, but idea is the same - if
dropData
is set andisDropped
is still false, wait).