enwi / dartzmq

A simple dart zeromq implementation/wrapper around the libzmq C++ library
https://pub.dev/documentation/dartzmq/latest/
MIT License
26 stars 17 forks source link

After two messages, unhandled exemption #18

Closed medcelerate closed 1 year ago

medcelerate commented 1 year ago

When sending messages, the server is sending a reply, however after the second message from the client the following error occurs. I can confirm I receive a response, but this error occurs. This seems to be locking the library in a state where it can no longer send messages nor receive them. Current implementation is equivalent to example code in this repo for listening on a stream and sending using _socket.sendString()

[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: ZeroMQException(156384763): Operation cannot be accomplished in current state
#0      _checkErrorCode (package:dartzmq/src/exception.dart:56:5)
#1      _checkReturnCode (package:dartzmq/src/exception.dart:49:5)
#2      ZContext._poll (package:dartzmq/src/zeromq.dart:109:9)
#3      ZContext._startPolling.<anonymous closure> (package:dartzmq/src/zeromq.dart:68:70)
#4      _Timer._runTimers (dart:isolate-patch/timer_impl.dart:398:19)
#5      _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:429:5)
#6      _RawReceivePort._handleMessage (dart:isolate-patch/isolate_patch.dart:192:26)
enwi commented 1 year ago

I am not quite sure where this error comes from, but I have also experienced it with the example and a python test server. However in a production app using other sockets it just works fine. So how can we find the bug?

medcelerate commented 1 year ago

I'm unsure, I've gotten it using the basic example setup, with a C++ server. According to the documentation for zmq this happens when a request is not met with a reply. Is it possible this library is not appropriately handling replies on a stream and blocking the socket.

enwi commented 1 year ago

Is it possible this library is not appropriately handling replies on a stream and blocking the socket?

I think not, since this library is using a compiled version of libzmq under the hood.

cvanvlack commented 1 year ago

Any more information on this? I set up the python server according to the example docs. It seems to be running with no issues.

But I am also getting the ZeroMQException(156384763): Operation cannot be accomplished in current state error. I'm fairly new to ZeroMQ, so I am unsure what you mean by "in a production app using other sockets" it works just fine.

Have you simply set up the flutter app to point at something other than localhost? Did you have to then change IP address in your python server?

I'm doing this testing on Windows in case it's relevant.

enwi commented 1 year ago

@cvanvlack We are using it for a production app that connects to a backend made with Java, which uses JeroMQ. Maybe the python implementation is not working right. But this could be tested by writing a c++ program connecting to the python server. If the same exception occurs there as well then there is either a bug in libzmq or the python implementation. If not then there must be an issue with dartzmq.

enwi commented 1 year ago

I did some testing and can confirm that a simple c++ test works fine

#include <atomic>
#include <chrono>
#include <iostream>

#include <signal.h>
#include <zmq.h>

std::atomic_bool shutdown = false;

void signalHandler(const int signal)
{
    std::cout << "Received signal: " << signal << std::endl;
    if (signal == SIGTERM)
    {
        shutdown = true;
    }
}

void loop()
{
    int ret;
    void* ctx = nullptr;

    try
    {
        // Create context
        ctx = zmq_ctx_new();

        // Create socket
        void* socket = zmq_socket(ctx, ZMQ_REQ);

        // Connect socket
        ret = zmq_connect(socket, "tcp://127.0.0.1:5566");
        if (ret)
        {
            std::cout << "zmq_connect: " << ret << std::endl;
        }

        // Loop
        auto last = std::chrono::system_clock::now();
        while (!shutdown)
        {
            const auto now = std::chrono::system_clock::now();
            if (now - last >= std::chrono::seconds(1))
            {
                last = now;

                // Send test message
                {
                    const char msg[] = "Test";
                    std::cout << "Sending: " << msg << std::endl;
                    zmq_send(socket, msg, 4, 0);
                }

                // Poll reply
                {
                    zmq_pollitem_t pollitem;
                    pollitem.socket = socket;
                    pollitem.events = ZMQ_POLLIN;
                    ret = zmq_poll(&pollitem, 1, ZMQ_DONTWAIT);
                    if (ret)
                    {
                        zmq_msg_t message;
                        ret = zmq_msg_init(&message);
                        if (ret)
                        {
                            std::cout << "zmq_msg_init: " << ret << std::endl;
                        }

                        // Read all messages
                        while ((ret = zmq_msg_recv(&message, socket, ZMQ_DONTWAIT)) >= 0)
                        {
                            const auto data = static_cast<uint8_t*>(zmq_msg_data(&message));
                            // auto size = zmq_msg_size(&message);
                            std::cout << "Received: ";
                            for (size_t i = 0; i < ret; ++i)
                            {
                                std::cout << data[i];
                            }
                            std::cout << std::endl;
                        }
                    }
                }
            }
        }
    }
    catch (const std::exception e)
    {
        std::cout << "Exception: " << e.what() << std::endl;
    }
    catch (...)
    {
        std::cout << "Unknown Exception" << std::endl;
    }

    zmq_ctx_destroy(ctx);

    std::cout << "Done!" << std::endl;

    std::flush(std::cout);
}

int main(const int argc, const char* argv[])
{
    std::cout << "Hello World!" << std::endl;
    signal(SIGTERM, signalHandler);
    loop();
}

Output:

Hello World!
Sending: Test
Received: ECHO
Received: Test
Sending: Test
Received: ECHO
Received: Test
Sending: Test
Received: ECHO
Received: Test
enwi commented 1 year ago

Looking a little deeper into this, it seems that the error occurs whenever zmq_send is called while _poll is running. If you wait a second before pressing the button again the exception does not occur.

enwi commented 1 year ago

Alright I think the issue is with the REQ and REP sockets, because you cannot send another message on a REQ socket unless you have received the reply from the REP socket already. Otherwise the internal socket state is expecting a reply and sees that another message is being sent, which is forbidden for the synchronous REQ and REP. Since the _poll function only gets called every second the error is noticeable, because you have 1 second to screw up. So the workaround would be to use the asynchronous versions of REQ and REP aka DEALER and ROUTER. The long time solution is fixing #4, though this issue might still occur when sending more than one message at a time like:

_socket.send([_presses]);
_socket.send([_presses]);