zeromq / jeromq

Pure Java ZeroMQ
Mozilla Public License 2.0
2.34k stars 484 forks source link

New client cannot receive any data in PUB/SUB mode #954

Open 317787106 opened 1 year ago

317787106 commented 1 year ago

I open one zmq server and one client with PUB/SUB mode, it works well at frst. I open and close the client repeatly. Some hours later, when I open the new client, it cannot recevie any message any more, but the server is still send msg. Not any warn or error log occurs. I use the tool arthas (https://gitee.com/arthas/arthas) to debug the server : Use followong command:

watch zmq.socket.pubsub.Dist distribute -x 4

Get following output:

method=zmq.socket.pubsub.Dist.distribute location=AtExit
ts=2023-05-15 09:24:18; [cost=0.019258ms] result=@ArrayList[
    @Object[][
        @Msg[
            MORE=@Integer[1],
            COMMAND=@Integer[2],
            CREDENTIAL=@Integer[32],
            IDENTITY=@Integer[64],
            SHARED=@Integer[128],
            metadata=null,
            flags=@Integer[1],
            type=@Type[
                DATA=@Type[DATA],
                DELIMITER=@Type[DELIMITER],
                $VALUES=@Type[][isEmpty=false;size=2],
                name=@String[DATA],
                ordinal=@Integer[0],
            ],
            fileDesc=null,
            size=@Integer[12],
            data=@byte[][
                @Byte[98],
                @Byte[108],
                @Byte[111],
                @Byte[99],
                @Byte[107],
                @Byte[84],
                @Byte[114],
                @Byte[105],
                @Byte[103],
                @Byte[103],
                @Byte[101],
                @Byte[114],
            ],
            buf=@HeapByteBuffer[
                hb=@byte[][isEmpty=false;size=12],
                offset=@Integer[0],
                isReadOnly=@Boolean[false],
                bigEndian=@Boolean[true],
                nativeByteOrder=@Boolean[false],
                SPLITERATOR_CHARACTERISTICS=@Integer[16464],
                mark=@Integer[-1],
                position=@Integer[0],
                limit=@Integer[12],
                capacity=@Integer[12],
                address=@Long[0],
            ],
            writeIndex=@Integer[0],
            readIndex=@Integer[0],
        ],
    ],
    @Dist[
        pipes=@ArrayList[
            @Pipe[
                inpipe=@YPipe[zmq.pipe.YPipe@52f8dd9b],
                outpipe=@YPipe[zmq.pipe.YPipe@5013c41a],
                inActive=@Boolean[false],
                outActive=@Boolean[false],
                hwm=@Integer[1000],
                lwm=@Integer[500],
                msgsRead=@Long[1],
                msgsWritten=@Long[1000],
                peersMsgsRead=@Long[0],
                peer=@Pipe[zmq.pipe.Pipe@4ae245d5(SessionBase[2]->Pub[3])],
                sink=@Pub[Pub[1]],
                state=@State[ACTIVE],
                delay=@Boolean[false],
                identity=null,
                credential=null,
                conflate=@Boolean[false],
                parent=@Pub[Pub[1]],
                $assertionsDisabled=@Boolean[true],
                ctx=@Ctx[zmq.Ctx@65caf4e8],
                tid=@Integer[3],
            ],
        ],
        matching=@Integer[0],
        active=@Integer[0],
        eligible=@Integer[0],
        more=@Boolean[false],
    ],
    null,
]

Output is combined by {params,target,returnObj}. It seems that closed client is still in the array pipes of Dist, and the new client cannot be added to pipes. So new client cannot receive any message. Has any solution?

DayJun commented 1 year ago

Can you use netstat to see if the source port and the dest port are the same port? I've meet the same problem.