zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

Can't receive everything in Push-Pull pattern #761

Closed lihaoef closed 4 years ago

lihaoef commented 6 years ago

Environment

NetMQ Version:    4.0.0.175
Operating System: windows10
.NET Version:     4.5

Expected behaviour

The client can receive "helloworld".

Actual behaviour

The client receive nothing.

Steps to reproduce the behaviour

The server code is :

Task.Run(() =>
            {
                using (PushSocket push = new PushSocket())
                {
                    push.Connect("tcp://localhost:5555");
                    push.SendFrame("helloworld");
                }
            });
Console.WriteLine("finish");
Console.Read();

The client code is :

            using (PullSocket pull = new PullSocket())
            {
                pull.Bind("tcp://localhost:5555");
                while (true)
                {
                    Console.WriteLine("begin");
                    Console.WriteLine(pull.ReceiveFrameString());
                }
            }
  1. Start the client code.
  2. Start the server code.

Result: The client can not receive everything.How I can Fix this?

Any answer will be appreciated.

dev-nextprogress commented 6 years ago

It looks like the PushSocket has to be connected before the PullSocket binds. I tried a variation of your code and it worked, like this:

class Program
{
    static async Task Main( string[] args )
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        Task clientTask = null;

        var serverTask = Task.Run( () =>
        {
            using ( PushSocket push = new PushSocket("tcp://localhost:5555") )
            {
                clientTask = Task.Run(() => RunClient(cancellationTokenSource.Token), CancellationToken.None);
                Console.WriteLine("Press enter when clients are ready...");
                Console.ReadKey();
                push.SendFrame( "helloworld" );
            }
        } );

        await serverTask;
        cancellationTokenSource.Cancel();
        await clientTask;

        Console.WriteLine( "Press any key to exit..." );
        Console.Read();
    }

    static Task RunClient( CancellationToken cancellationToken )
    {
        using ( PullSocket pull = new PullSocket("tcp://localhost:5555") )
        {
            Console.WriteLine("begin");
            while ( !cancellationToken.IsCancellationRequested )
            {
                if ( pull.TryReceiveFrameString( TimeSpan.FromMilliseconds( 100 ), out var message ) )
                {
                    Console.WriteLine( message );
                }
            }
        }

        return Task.CompletedTask;
    }
}

}

lihaoef commented 5 years ago

It looks like the PushSocket has to be connected before the PullSocket binds. I tried a variation of your code and it worked, like this:

class Program
{
    static async Task Main( string[] args )
    {
        CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        Task clientTask = null;

        var serverTask = Task.Run( () =>
        {
            using ( PushSocket push = new PushSocket("tcp://localhost:5555") )
            {
                clientTask = Task.Run(() => RunClient(cancellationTokenSource.Token), CancellationToken.None);
                Console.WriteLine("Press enter when clients are ready...");
                Console.ReadKey();
                push.SendFrame( "helloworld" );
            }
        } );

        await serverTask;
        cancellationTokenSource.Cancel();
        await clientTask;

        Console.WriteLine( "Press any key to exit..." );
        Console.Read();
    }

    static Task RunClient( CancellationToken cancellationToken )
    {
        using ( PullSocket pull = new PullSocket("tcp://localhost:5555") )
        {
            Console.WriteLine("begin");
            while ( !cancellationToken.IsCancellationRequested )
            {
                if ( pull.TryReceiveFrameString( TimeSpan.FromMilliseconds( 100 ), out var message ) )
                {
                    Console.WriteLine( message );
                }
            }
        }

        return Task.CompletedTask;
    }
}

}

I don't think so. If you remove the code Console.ReadKey(); .It will receive nothing. And if you do this:

            push.SendFrame( "helloworld" );
            Console.ReadKey();

You can receive 'helloworld'. The reason in my mind is the zeroMQ dispose the push socket before it send message completely. How about you? If this is the reason.How to resolve this problem?

marian-gheorghe commented 5 years ago

Try to use a NetMQPoller on both sides (add the sockets to their respective poller). On the server side additionally use NetMQQueue (add the queue to the poller) to enqueue the messages then use the queue.ReceiveReady event to actually do the sendout On the client side use the pullSocket.ReceiveReady event to read the received message instead of busy wait

CLIENT

    using (var poller = new NetMQPoller())
    {
        using (PullSocket pull = new PullSocket())
        {
            pull.Bind("tcp://localhost:5555");
            pull.ReceiveReady += Pull_ReceiveReady;
            poller.Add(pull);
            poller.RunAsync()           
        }
    }

    private void Pull_ReceiveReady(object sender, NetMQSocketEventArgs e)
    {
        e.Socket.ReceiveFrameString()
    }

SERVER

    using (var poller = new NetMQPoller())
    {
        var queue = new NetMQQueue<string>();
        queue.ReceiveReady += Queue_ReceiveReady;
        poller.Add(queue);
        Task.Run(() =>
        {
            using (push = new PushSocket())
            {
                push.Connect("tcp://localhost:5555");
                poller.Add(push);
                poller.RunAsync();
                queue.Enqueue("helloworld");
            }
        });
        Console.WriteLine("finish");
        Console.Read();
    }

    private void Queue_ReceiveReady(object sender, NetMQQueueEventArgs<string> e)
    {
        if (e.Queue.TryDequeue(out string result, TimeSpan.FromSeconds(1.0)))
        {
            push.SendFrame("helloworld");
        }
    }   
stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.