Open Artur- opened 5 years ago
I think this exact case is tested by https://github.com/vaadin/framework/blob/master/uitest/src/test/java/com/vaadin/tests/push/ReconnectWebsocketTest.java for FW8
I think this exact case is tested by https://github.com/vaadin/framework/blob/master/uitest/src/test/java/com/vaadin/tests/push/ReconnectWebsocketTest.java for FW8
Apparently it's not tested by this test since the test is migrated and works with Flow.
We should retest this with the latest versions especially considering the fact that we have passing ReconnectWebsocketTest.java test.
Upgraded example to 14.2.0.alpha7. Still reproduces
Alright, thanks for the info.
With Atmosphere trace logging you can see what is happening here:
The browser receives push message 8
Received push (websocket) message: for(;;);[{"syncId":8,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-4.342586269301345,"latitude":-33.80385646477082},"return $0.personUpdated($1)"]],"timings":[385,10]}]
The websocket connection is disconnected
The server creates and pushes message 9. Atmosphere happily writes it to the closed connection and does not notice anything is wrong. The message is lost forever.
[Atmosphere-Shared-101] TRACE org.atmosphere.websocket.WebSocket - WebSocket.write() for(;;);[{"syncId":9,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":1,"longitude":165.69204276168028,"latitude":-12.009253616658},"return $0.personUpdated($1)"]],"timings":[385,10]}]
[Atmosphere-Shared-101] TRACE org.atmosphere.cpr.AtmosphereResourceImpl - Invoking listener [com.vaadin.flow.server.communication.PushAtmosphereHandler$AtmosphereResourceListener@11f7e790] for 25dfa7c1-98d7-4559-840e-e2004f64168f
[Atmosphere-Shared-101] TRACE org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter - AtmosphereResourceEventImpl{ isCancelled=false isClosedByClient=false isClosedByApplication=false isResumedOnTimeout=false throwable=null resource=null}
[Atmosphere-Shared-101] TRACE org.atmosphere.cpr.BroadcasterListenerAdapter - onComplete /*
[Thread-21] DEBUG com.vaadin.flow.server.communication.UidlWriter - * Creating response to client
[Atmosphere-Shared-0] TRACE org.atmosphere.cpr.DefaultBroadcaster - /* is about to broadcast Entry{message=for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}], type=RESOURCE, future=org.atmosphere.cpr.BroadcasterFuture@5a3c362d}
[Atmosphere-Shared-0] TRACE org.atmosphere.cache.UUIDBroadcasterCache - Adding for AtmosphereResource 25dfa7c1-98d7-4559-840e-e2004f64168f cached messages for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
[Atmosphere-Shared-0] TRACE org.atmosphere.cache.UUIDBroadcasterCache - Active clients {25dfa7c1-98d7-4559-840e-e2004f64168f=1584532781663, 89f371b2-c4cc-4424-b2ec-49d47556b70b=1584532761618}
[Atmosphere-Shared-0] TRACE org.atmosphere.cpr.BroadcasterListenerAdapter - onMessage for broadcaster /* for Entry{message=for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}], type=RESOURCE, future=org.atmosphere.cpr.BroadcasterFuture@5a3c362d}
[Atmosphere-Shared-0] TRACE org.atmosphere.cpr.DefaultBroadcaster - AtmosphereResource 25dfa7c1-98d7-4559-840e-e2004f64168f available for for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
[Atmosphere-Shared-0] TRACE org.atmosphere.cpr.DefaultBroadcaster - Broadcaster /* is about to queueWriteIO for AtmosphereResource 25dfa7c1-98d7-4559-840e-e2004f64168f
This is one problem.
The server creates and pushes message 10. Atmosphere notices that the connection does not work and caches the message.
[Atmosphere-Shared-102] TRACE org.atmosphere.cpr.DefaultBroadcaster - About to write to AtmosphereResource{
uuid=25dfa7c1-98d7-4559-840e-e2004f64168f,
transport=WEBSOCKET,
isInScope=true,
isResumed=false,
isCancelled=false,
isSuspended=true,
broadcasters=/*,
isClosedByClient=false,
isClosedByApplication=false,
action=Action{timeout=-1, type=SUSPEND}}
[Atmosphere-Shared-102] TRACE org.atmosphere.cache.UUIDBroadcasterCache - Removing for AtmosphereResource 25dfa7c1-98d7-4559-840e-e2004f64168f cached message for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
[Atmosphere-Shared-102] TRACE org.atmosphere.cpr.DefaultBroadcaster - /* is broadcasting to 25dfa7c1-98d7-4559-840e-e2004f64168f
[Atmosphere-Shared-102] TRACE org.atmosphere.websocket.WebSocket - WebSocket.write() for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
[Atmosphere-Shared-102] TRACE org.atmosphere.container.version.JSR356WebSocket - WebSocket AtmosphereResource{
uuid=25dfa7c1-98d7-4559-840e-e2004f64168f,
transport=WEBSOCKET,
isInScope=true,
isResumed=false,
isCancelled=false,
isSuspended=true,
broadcasters=/*,
isClosedByClient=false,
isClosedByApplication=false,
action=Action{timeout=-1, type=SUSPEND}} failed to write 197|for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
[Atmosphere-Shared-102] TRACE org.atmosphere.cache.UUIDBroadcasterCache - Adding for AtmosphereResource 25dfa7c1-98d7-4559-840e-e2004f64168f cached messages 197|for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
Caching happens AFTER TrackMessageSizeInterceptor
has been invoked and thus the cached message starts with the message length: 197|
Channel is reconnected
Atmosphere pushes the cached message but passes it AGAIN through TrackMessageSizeInterceptor
, which adds an extra 201|
so the message becomes 201|197|for(;;);[{"syncId":10,"clientId":0,"meta":{"async":true},"execute":[[[0,3],{"id":8,"longitude":-3.5774276745521707,"latitude":-32.5840109255126},"return $0.personUpdated($1)"]],"timings":[385,10]}]
.
Client side fails because it gets a double wrapped message. This it the problem you first see
To reproduce, start project with push:
Create a SSH tunnel
Open app through tunnel
Kill SSH tunnel using
exit
+ ctrl-cRe-establish tunnel using
The first message delivered to the application contains an extra
length|
prefix, e.g.when it should be