socketry / async-websocket

Asynchronous WebSocket client and server, supporting HTTP/1 and HTTP/2 for Ruby.
MIT License
166 stars 18 forks source link

Message corruption when using a connection from multiple tasks #37

Closed u3shit closed 2 years ago

u3shit commented 2 years ago

I have a client that calls write_message from multiple async tasks, and it looks like sometimes the messages get mixed up. Here's a simple example that just sends the same byte many times and on the other end it verifies it:

#!/usr/bin/env -S falcon serve --bind http://localhost:9900 --count 1 -c
require 'async/websocket/adapters/rack'

class BinaryWebSocket < Async::WebSocket::Connection
  def parse x; x; end
  def dump x; x; end
end

class Test
  def call env
    Async::WebSocket::Adapters::Rack.open(env, handler: BinaryWebSocket, &method(:open))
  end

  def open connection
    while msg = connection.read
      fail msg[0,100].inspect unless msg == msg[0] * msg.bytesize
    end
  end
end
run Test.new
require 'async'
require 'async/barrier'
require 'async/http/endpoint'
require 'async/websocket/client'

class BinaryWebSocket < Async::WebSocket::Connection
  def parse x; x; end
  def dump x; x; end
end

Async do |task|
  ep = Async::HTTP::Endpoint.parse 'ws://localhost:9900'
  Async::WebSocket::Client.connect ep, handler: BinaryWebSocket do |connection|
    i = 0
    barrier = Async::Barrier.new
    10.times do
      barrier.async do
        loop do
          size = rand(2) == 1 ? 50 : 1024*1024
          char = rand(255).chr
          connection.write char*size
          connection.flush

          puts i if (i+=1) % 100 == 0
        end
      end
    end
    barrier.wait
  end
end

I actually have to either use ruby --jit with the client or use the workaround I mentioned in https://github.com/socketry/protocol-websocket/issues/8 otherwise it doesn't seem to manifest, but I that's probably just a timing issue. I end up with errors like "\xE4\xC1\xE1[f>\xE1Kf>\x0F`u\x03c\xE9\x9BS\x965\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/\xEAI\xE7/c\xA4f>\xE1[f.\xE1[\xDF\xA9\x1A\x05\x8C\xFAIV\x8C\xFAIV\x8C\xFAIV\x8C\xFAIV", or without masking "\x82\x7F\x00\x00\x00\x00\x00\x10\x00\x00\x82\x7F\x00\x00\x00\x00\x00\x10\x00\x00\x82\x7F\x00\x00\x00\x00\x00\x10\x00\x00\x82\x7F\x00\x00\x00\x00\x00\x10\x00\x00\x82\x7F\x00\x00\x00\x00\x00\x10\x00\x00" which eeriely looks like a websocket frame header repeated many times. This only happens with big messages, if I only send 50-100 byte messages it doesn't seem to manifest (it fits into a single write syscall?). Is this a supported usage or should I wrap my writes into some kind of mutex?

ioquatix commented 2 years ago

If you want to write to a websocket from multiple tasks, you need to protect it with a Async::Semaphore, otherwise the underlying socket might do a partial write and then mix up the data. Since there is a cost to having a semaphore, it's not the default at the lower level. There are lots of ways to deal with this - an Async::Queue for buffering messages, or a semaphore for mutual exclusion, etc.

u3shit commented 2 years ago

Okay, thanks, I've refactored my code to put messages into an Async::LimitedQueue instead, hopefully that means less context switches that a semaphore. Right now I have two tasks, one reads messages in a loop and the other writes them (the chat example does something like this, so it should be alright).