go-zeromq / zmq4

[WIP] Pure-Go implementation of ZeroMQ-4
BSD 3-Clause "New" or "Revised" License
344 stars 57 forks source link

REP socket races on client connection #119

Closed egorse closed 2 years ago

egorse commented 2 years ago

Noticed that there may be problem to send 1st response over REP socket with v0.14.0

The issue seem that receiver listener goroutine starts prior connection added to repReader/repWriter, so it may happens that send over REP called prior writer get the connection registered.

The next test is failing:

func TestRepSocket(t *testing.T) {
    defer goleak.VerifyNone(t)
    assert := asserts.New(t)

    ctx, cancel := context.WithCancel(context.Background())
    rep := zmq4.NewRep(ctx)
    ep := "ipc://@test.rep.socket"
    err := rep.Listen(ep)
    assert.NoError(err)

    maxClients := 100
    maxMsgs := 1000
    wgClients := &sync.WaitGroup{}
    wgServer := &sync.WaitGroup{}
    client := func() {
        defer wgClients.Done()
        ping := zmq4.NewMsgString("ping")
        for n := 0; n < maxMsgs; n++ {
            func() {
                ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
                defer cancel()
                req := zmq4.NewReq(ctx)
                err := req.Dial(ep)
                assert.NoError(err)
                err = req.Send(ping)
                assert.NoError(err)
                msg, err := req.Recv() // WARN the ctx is not supported yet!!!
                assert.NoError(err)
                assert.Equal(string(msg.Frames[0]), "pong")
                err = req.Close()
                assert.NoError(err)
            }()
        }
    }
    server := func() {
        defer wgServer.Done()
        pong := zmq4.NewMsgString("pong")
        for {
            msg, err := rep.Recv()
            if errors.Is(err, context.Canceled) {
                break
            }
            assert.NoError(err)
            if err != nil {
                break
            }
            assert.Equal(string(msg.Frames[0]), "ping")
            err = rep.Send(pong)
            assert.NoError(err)
        }
    }

    wgServer.Add(1)
    go server()
    wgClients.Add(maxClients)
    for n := 0; n < maxClients; n++ {
        go client()
    }
    wgClients.Wait()
    cancel()
    wgServer.Wait()
    rep.Close()
}