Surreal-Net / Surreal.Net

Database driver for SurrealDB available for REST and RPC sessions.
Apache License 2.0
57 stars 7 forks source link

Full duplex Ws transfer using channels #110

Open ProphetLamb opened 1 year ago

ProphetLamb commented 1 year ago

Creates one channel for receiving (tx - server transfer). The channel has a associated producer and consumer.

Significant members

Class Purpose
WsRx Sends a stream over the ClientWebSocket
WsTxConsumer Pulls the tx channel and invokes registered handlers once a response has arrived. This supports multiple events, such as notification/live queries. The consumer uses a cache that evicts dead handles.
WsTxProvider Listens to blocks from ClientWebSocket and pumps the tx channel with messages. Messages can consist of multiple blocks, and are streamed. They are pumped as soon as the first part arrives. WsMessageReader handles multipart messages.

To-Do

Postscriptum: I kind of bricked my IDE by debugging tests, so I am clearly doing sth wrong kek Postpostscriptum: Still in development, sometimes a message gets lost... Postpostpostscriptum: thats what I get for cheating the initial implementation...

ProphetLamb commented 1 year ago

Debug logging shows, that sometimes the Inflater repeats the last block indefinitely, until the channel is at capacity. Then it waits for the deflater indefinitely, because it is already disposed.


10:09:18 WsReceiverInflaterEventSource: Waiting to receive a block from the socket (E:SocketWaitingCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Received a block from the socket (Count = 65, EndOfMessage = True, Closed = False) (E:SockedReceivedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Finished receiving the message from the socket (E:MessageReceiveFinishedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Pushed the message to the channel (E:MessagePushedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Waiting to receive a block from the socket (E:SocketWaitingCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Received a block from the socket (Count = 65, EndOfMessage = True, Closed = False) (E:SockedReceivedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Finished receiving the message from the socket (E:MessageReceiveFinishedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Pushed the message to the channel (E:MessagePushedCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Waiting to receive a block from the socket (E:SocketWaitingCore, T:161158)
10:09:18 WsReceiverInflaterEventSource: Received a block from the socket (Count = 62, EndOfMessage = True, Closed = False) (E:SockedReceivedCore, T:1611

At some point the debug log stops, the line is not written to the end. The log is disposed by the DbHandle, therefore we can assume that the DbHandle was disposed. This is not the case, because the database is not killed. The remaining option is that the xunit thread is softlocked at this point.... Which would explain that the test does not terminate. Interestingly enough, this occurs during the flush of the FileStream, this is highly unlikely. All in all, very wired behaviour.

I observed, that the softlock only ever occurs at the last block of the last message in the unit test.

Note that the code is followed by a ct.ThrowIfCancellationRequested(); suggesting, that the cancellation is never requested, the on top of that, the client is never disposed, otherwise we expect a log message. This supports the scenario, that the main thread is softlocked.

// log that we are waiting for the socket
log.SocketWaiting();
// receive the first part
var result = await _socket.ReceiveAsync(buffer, ct).Inv();
// log that we have received a message from the socket
log.SockedReceived(result);

ct.ThrowIfCancellationRequested();

Why the ClientWebSocket repeats the last message continuously is unknown, it might be the case that the WebSocketDeflater interacts weirdly with the network stream... I do not know.

ProphetLamb commented 1 year ago

For now I reorder the ManagedWebSocket.Dispose after the WsReceiver*.Dispose