deniszykov / WebSocketListener

A lightweight and highly scalable asynchronous WebSocket listener
http://vtortola.github.io/WebSocketListener
81 stars 17 forks source link

BeginWrite lock error and message not sending. #33

Open MattComb opened 5 years ago

MattComb commented 5 years ago

Hi Denis,

I've finally looped around to convert from Val's code base to yours. I'm encountering a problem when I attempt to run using the rx code.

When the message tries to send, it is getting locked on the following code


public void BeginWriting()
        {
            if (Interlocked.CompareExchange(ref this.ongoingMessageWrite, 1, 0) == 1)
                throw new WebSocketException("There is an ongoing message that is being written from somewhere else. Only a single write is allowed at the time.");
        }

The first message send is ok, but is never received and the second send is blocked here so appears the message is failing to be dequeued/sent.

Any idea why this might be ?

Regards.

MattComb commented 5 years ago

The Rx code looks good and looks like

                Observable.FromAsync(server.AcceptWebSocketAsync)
                    .Select(ws => new ChatSession(ws)
                    {
                        In = Observable.FromAsync(ws.ReadDynamicAsync)
                            .DoWhile(() => ws.IsConnected)
                            .Where(msg => msg != null),

                        Out = Observer.Create<dynamic>(ws.WriteDynamic)
                    })
                    .OnErrorResumeNext(Observable.Empty<ChatSession>())
                    .DoWhile(() => server.IsStarted && !cancellation.IsCancellationRequested)
                    .Subscribe(chatSessionObserver);

The ws.WriteDynamic which occurs on the send of the message hits the following code in both the first send and the second send, but is locking on the second:

        public static void WriteDynamic(this WebSocket ws, dynamic data)
        {

            using (var writer = ws.CreateMessageWriter(WebSocketMessageType.Text))
            using (var sw = new StreamWriter(writer, Encoding.UTF8))
                sw.Write((string)data);
        }

I have tried WriteAsync here but to no avail

deniszykov commented 5 years ago

Hmm, writing should ends with writer.CloseAsync(). There is a huge warning message about this in logger. Do you provided you logger implementation in WSListenerOptions? Also all sync method (like writer.Write()) are not allowed in my fork, so sw.WriteAsync() should be used. For string there is an extension method writer.WriteStringAsync() exists.

do you use v.4.2.5?

MattComb commented 5 years ago

I had converted to the following but did not seem to improve the situation. At what point is the websocket lock removed ?

public static async void WriteDynamicAsync(this WebSocket ws, dynamic data)
        {
            using (var writer = ws.CreateMessageWriter(WebSocketMessageType.Text))
            using (var sw = new StreamWriter(writer, Encoding.UTF8))
            {
                //sw.AutoFlush = true;
                await sw.WriteAsync((string)data);
                sw.Close();

            }

        }

CloseAsync is unavailable, and yes I believe I'm using 4.2.5 as per the Nuget package

deniszykov commented 5 years ago

async void - this create untrackable async method. No one knows when its ends. This will cause parallel calls to WriteDynamicAsync() and There is an ongoing message that is being written from somewhere else error .

Also you need to call 'CloseAsync()' on writer not sw.

deniszykov commented 5 years ago

I have updated readme.md. Previously, Close() was not required. Now synchronous methods are prohibited and it is necessary to finish writing to the stream with CloseAsync().

MattComb commented 5 years ago

Hi Denis,

thanks for the quick feedback. I have tried to implement the following

        public static async void WriteDynamicAsync(this WebSocket ws, dynamic data)
        {
            using (var messageWriter = ws.CreateMessageWriter(WebSocketMessageType.Text))
            using (var streamWriter = new StreamWriter(messageWriter, Encoding.UTF8))
            {
                await streamWriter.WriteAsync("Hello World!");
                await messageWriter.CloseAsync();
            }
        }

Am getting "Can not access a closed Stream at the inner bracket close.

I realise that the method still returns void, trying to work out how to correctly use this with the Rx "out" line which is

                Observable.FromAsync(server.AcceptWebSocketAsync)
                    .Select(ws => new ChatSession(ws)
                    {
                        In = Observable.FromAsync(ws.ReadDynamicAsync)
                            .DoWhile(() => ws.IsConnected)
                            .Where(msg => msg != null),

                        Out = Observer.Create<dynamic>(ws.WriteDynamic)
                    })
deniszykov commented 5 years ago

Can not access a closed Stream at the inner bracket close.

Can you provide full exception message and stack trace?

MattComb commented 5 years ago

Exception:

{System.ObjectDisposedException: Can not access a closed Stream. at System.IO.Compression.DeflateStream.ThrowStreamClosedException() at System.IO.Compression.DeflateStream.WriteAsyncMemory(ReadOnlyMemory`1 buffer, CancellationToken cancellationToken) at System.IO.Compression.DeflateStream.WriteAsync(Byte[] array, Int32 offset, Int32 count, CancellationToken cancellationToken) at vtortola.WebSockets.Deflate.WebSocketDeflateWriteStream.WriteAsync(Byte[] buffer, Int32 offset, Int32 count, CancellationToken cancellationToken)} | System.Exception {System.ObjectDisposedException}

Stack Trace

" at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)\r\n at vtortola.WebSockets.WebSocketMessageStream.Write(Byte[] buffer, Int32 offset, Int32 count)\r\n at System.IO.Stream.Write(ReadOnlySpan`1 buffer)\r\n at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder)\r\n at System.IO.StreamWriter.Dispose(Boolean disposing)\r\n at System.IO.TextWriter.Dispose()\r\n at ChatServer.WsExtensions.WriteDynamicAsync(WebSocket ws, Object data) in core-websocket-new\WsExtensions.cs:line 32"

deniszykov commented 5 years ago

StreamWriter cached data, so you need to call FlushAsync() on StreamWriter after you finished writing. If you don't, it will flush on Dispose(), which is right after CloseAsync(). This cause Can not access a closed Stream.

.NET do some stuff behind your back :)

here WriteStringAsync method from library:

        public static async Task WriteStringAsync([NotNull] this WebSocket webSocket, [NotNull] string data, CancellationToken cancellationToken = default(CancellationToken))
        {
            if (webSocket == null) throw new ArgumentNullException(nameof(webSocket));
            if (data == null) throw new ArgumentNullException(nameof(data));

            cancellationToken.ThrowIfCancellationRequested();

            using (var msg = webSocket.CreateMessageWriter(WebSocketMessageType.Text))
            using (var writer = new StreamWriter(msg, Utf8NoBom))
            {
                await writer.WriteAsync(data).ConfigureAwait(false);
                await writer.FlushAsync().ConfigureAwait(false);
                await msg.CloseAsync().ConfigureAwait(false);
            }
        }
deniszykov commented 5 years ago

Also any 'Write/Read/Flush' synchronous calls completely destroy the performance of the application. They will block Thread Pool threads and cause thread starvation. So you need to make sure no explicit/implicit synchronous calls are made.

MattComb commented 5 years ago

:) indeed!

Thanks Denis, I am now seeing messages flowing again. Appreciate the support!

Two more things. Given this observer pattern is it still needed that I return something other than void from this method. If so, how would I best go about structuring this. What should the return type be

Once I get an answer to this, I should be complete, and I think now that I have converted over to your code base, I can provide a working version of the chatroom app as we discussed a year ago.

Best Regards

deniszykov commented 5 years ago

Maybe it is better to make Out a Subject and infinite async method polling this subject and writes messages?

        public static async Task WriteMessageAsync(WebSocket ws, IObservable<dynamic> messages)
        {
            while(ws.IsConnected)
            {
                var message = await messages.FirstAsync();
                using (var messageWriter = ws.CreateMessageWriter(WebSocketMessageType.Text))
                using (var streamWriter = new StreamWriter(messageWriter, Encoding.UTF8))
                {
                    await streamWriter.WriteAsync(message);
                    await messageWriter.CloseAsync();
                }
            }
        }

But I'm not familiar to Rx.NET. Maybe there is a better pattern for this.

Also I seen Unit is used for empty results in Rx.NET,

weweilep commented 5 years ago

Hi @deniszykov

What would the equivalent be for writing binary data? I seem to run into the same issue on the binary side, and my best approximation to the WriteStringAsync code does not seem to work correctly. Because this is async, locks are out of the question. Would it be possible to see a WriteBytesAsync in the future?

deniszykov commented 5 years ago

my best approximation to the WriteStringAsync code does not seem to work correctly

Hi @weweilep! Could you provide your version of WriteBytesAsync for review?

weweilep commented 5 years ago

@deniszykov unfortunately, I haven't found any clean code that really works well.

Simply using using (var writer = _socket.CreateMessageWriter(WebSocketMessageType.Binary)) { await writer.WriteAsync(data, 0, data.Length, _cancellationTokenSource.Token).ConfigureAwait(false); } Seems to fall into the trap of the ongoing message exception. BinaryWriter (in an attempt to emulate WriteStringAsync) is not asynchronous, and I'm not sure it's even necessary when the source of the data is already in byte[] format.

The only solution I have found is to use SemaphoneSlim(1,1) .WaitAsync()/.Release() (the async version of locks) around the CreateMessageWriter in order to disallow multiple writes, but even that has some pitfalls. I haven't yet tried to see if WriteStringAsync falls into the same trap, as my message has always been binary.

weweilep commented 5 years ago

@deniszykov I was curious so I made a test project, and I was able to get the BeginWrite lock exception to occur with WriteStringAsync as well. This is obviously a very crude and exaggerated example, but https://github.com/weweilep/WebSocketListener/tree/BeginWriteLock/Samples/BeginWriteLock is an example that will reproduce it on an echo server with multiple clients. There is a useSemaphoreSlim variable (essentially async lock) that if you enable will cause the error to go away, but I'm unsure if that is a proper solution or not (and it could be implemented on the library side as well).

deniszykov commented 5 years ago

Yes, you need some synchronization before CreateMessageWritter because parallel read/write is not supported. SemaphoreSlim is fine, queue and async state machine, TPL Dataflow queue is also solution for this problem.

Since there are many solutions this library doesn't provide default solution (only exception in case of misuse).

weweilep commented 5 years ago

Makes sense. Would be nice to see something reflected as such in samples or documentation, especially with CreateMessageWriter being hidden behind something like WriteStringAsync.

deniszykov commented 5 years ago

True, no mention of this behavior in docs. I will fix this.