googollee / go-socket.io

socket.io library for golang, a realtime application framework.
Other
5.7k stars 835 forks source link

Concurrent encoding/decoding #203

Open eyudkin opened 6 years ago

eyudkin commented 6 years ago

I found, that mutex (passing to encoder/decoder) does not work, because it should be passed by reference. For now it looks like:

func NewDecoder(r FrameReader, mtx sync.Mutex) *Decoder {
    return &Decoder{
        r: r,
        mtx: mtx,
    }
}

(should be NewDecoder(r FrameReader, mtx *sync.Mutex)) And at the result, decoder and encoder have different mutexes and it causes bad situations when application tries to emit something in the same moment with the PING request and etc.

Unfortunately, I can not fix it fastly, because just making references to one mutex causes deadlock: DecodeHeader method locks mutex and causes infinite NextReader() cycle and so mutex will not be free until any non-ping message from client (so, it blocks server->client messages).

googollee commented 6 years ago

Which branch? v1.4 or master?

eyudkin commented 6 years ago

1.4

googollee commented 6 years ago

You mean https://github.com/googollee/go-socket.io/blob/v1.4/parser/decoder.go#L29 ?

eyudkin commented 6 years ago

Oops, sorry, I forgot that I made some changes in my fork. This mutex was my fix for some concurrent situations.

eyudkin commented 6 years ago

One of them - its writePacket call in encoder.Encode function.

func (e *Encoder) Encode(h Header, args []interface{}) (err error) {
    var w io.WriteCloser
    w, err = e.w.NextWriter(engineio.TEXT)
    if err != nil {
        return
    }

    var buffers [][]byte
    buffers, err = e.writePacket(w, h, args)

    if err != nil {
        return
    }

    for _, b := range buffers {
        w, err = e.w.NextWriter(engineio.BINARY)
        if err != nil {
            return
        }
        err = e.writeBuffer(w, b)
        if err != nil {
            return
        }
    }
    return
}

So, calls like e.w.NextWriter(engineio.TEXT) works with session (it has upgradeLocker mutex) - and its safe. But writePacket - no, and (as I understood) in some cases it closes writer, when reader works with it.

eyudkin commented 6 years ago

So, @googollee - issue is still actual (as for me) ;)

googollee commented 6 years ago

Encoder.writePacket() should not race with eath other. NextWriter() will block each other and only one return writer to do writePacket(). Could you give an example to show the issue?

eyudkin commented 6 years ago

Code screenshot Stacktraces screenshot As I understand the situation: serveWrite->encode->writePacket closes connection without any mutexes (WriteCloser interface), but serveRead ->DecodeHeader->... still works with it in other goroutine. So, my error text is: websocket: write closed

sshaplygin commented 4 years ago

@eyudkin Hi Is it actual problem?

julienmathevet commented 4 years ago

I think we have same issue. We will try to make a test to reproduce it. It seems that appears when we broadcast to many events in short delay with at least one subscriber