zeromq / netmq

A 100% native C# implementation of ZeroMQ for .NET
Other
2.95k stars 744 forks source link

NetMQQueue.Enqueue blocks after 2000 Enqueue calls #759

Closed sabrogden closed 4 years ago

sabrogden commented 6 years ago

Environment

NetMQ Version: 4.0.0.1    
Operating System: Windows 10
.NET Version:  .NET Standard 2.0   

Expected behavior

You should be able to Enqueue up to your capacity you passed into your constructor. Then either items won't be added or dropped or anything other than block forever!

Actual behavior

The app never returns from Enqueue after adding 2000 items

Steps to reproduce the behavior

Always blocks when i = 2000 in x.Enqueue

var x = new NetMQQueue(10000); for (int i = 0; i < 3000; i++) { x.Enqueue(i); }

somdoron commented 6 years ago

You are right.

We can fix it by setting the receive highwater mark to zero of the receiving socket.

Do you want to make a PR?

On Tue, Oct 9, 2018, 8:23 PM sabrogden notifications@github.com wrote:

Environment

NetMQ Version: 4.0.0.1 Operating System: Windows 10 .NET Version: .NET Standard 2.0

Expected behavior

You should be able to Enqueue up to your capacity you passed into your constructor. Then either items won't be added or dropped or anything other than block forever! Actual behavior

The app never returns from Enqueue after adding 2000 items Steps to reproduce the behavior

Always blocks when i = 2000 in x.Enqueue var x = new NetMQQueue(10000); for (int i = 0; i < 3000; i++) { x.Enqueue(i); }

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/zeromq/netmq/issues/759, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9h49VRbwTvBY9FSEwH1keyapsgvJks5ujNtzgaJpZM4XTwiF .

sabrogden commented 6 years ago

I'm not seeing that make a difference, just to test I added the following code in the constructor of NetMQQueue, but still seeing the same issue.

//m_writer.Options.SendHighWatermark = m_reader.Options.ReceiveHighWatermark = capacity / 2; m_writer.Options.SendHighWatermark = 0; m_writer.Options.ReceiveHighWatermark = 0;

m_reader.Options.SendHighWatermark = 0; m_reader.Options.ReceiveHighWatermark = 0;

somdoron commented 6 years ago

Yes, you are right.

The problem is that the watermarks are set aftee the connection between the writer and reader.

The call to PairSocket.CreateSocketPair is creating the connection.

The solution should be to maybe add an overload to the create socket pair that accept initializer function.

On Tue, Oct 9, 2018, 9:00 PM sabrogden notifications@github.com wrote:

I'm not seeing that make a difference, just to test I added the following code in the constructor of NetMQQueue, but still seeing the same issue.

//m_writer.Options.SendHighWatermark = m_reader.Options.ReceiveHighWatermark = capacity / 2; m_writer.Options.SendHighWatermark = 0; m_writer.Options.ReceiveHighWatermark = 0;

m_reader.Options.SendHighWatermark = 0; m_reader.Options.ReceiveHighWatermark = 0;

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/zeromq/netmq/issues/759#issuecomment-428290383, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9teAnRH53U0IoGVDjbEpaRz8rtjfks5ujORNgaJpZM4XTwiF .

sabrogden commented 6 years ago

That works.

Now what can we do if we do hit the max, currently it just hangs, can't think that would be wanted, i certanly don't.

somdoron commented 6 years ago

That will only happen with capacity other thab zero, which is the default.

We can leave the current behavior for the enqueue method and add another tryEnqueue.

However that will be hard to implement.

On Tue, Oct 9, 2018, 10:07 PM sabrogden notifications@github.com wrote:

That works.

Now what can we do if we do hit the max, currently it just hangs, can't think that would be wanted, i certanly don't.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/zeromq/netmq/issues/759#issuecomment-428313385, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9msgCqOBXcy6xHt1uj681OziWx0Tks5ujPQBgaJpZM4XTwiF .

sabrogden commented 6 years ago

Isn't TryEnqueue just a matter of passing in a timeout into TrySend

        public bool TryEnqueue(T value, TimeSpan timeout)
        {
            bool added = false;            

            var msg = new Msg();
            msg.InitGC(EmptyArray<byte>.Instance, 0);

            lock (m_writer)
            {
                added = m_writer.TrySend(ref msg, timeout, false);
            }

            if(added)
            {
                m_queue.Enqueue(value);
            }

            msg.Close();

            return added;
        }
somdoron commented 6 years ago

You switched the order of the signal and the enqueueing, which can cause the reader to miss the message.

On Wed, Oct 10, 2018, 12:28 AM sabrogden notifications@github.com wrote:

Isn't TryEnqueue just a matter of passing in a timeout into TrySend

`public bool TryEnqueue(T value, TimeSpan timeout) { bool added = false;

    var msg = new Msg();
    msg.InitGC(EmptyArray<byte>.Instance, 0);

    lock (m_writer)
    {
        added = m_writer.TrySend(ref msg, timeout, false);
    }

    if(added)
    {
        m_queue.Enqueue(value);
    }

    msg.Close();

    return added;
}`

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/zeromq/netmq/issues/759#issuecomment-428359058, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9tktWt1OMeX453Hxaf2vtsn_Dbp0ks5ujRT1gaJpZM4XTwiF .

somdoron commented 6 years ago

The way to do it is to first lock the writer, check if you can send a signal using HasOut, enqueue and send the signal.

On Wed, Oct 10, 2018, 8:11 AM Doron Somech somdoron@gmail.com wrote:

You switched the order of the signal and the enqueueing, which can cause the reader to miss the message.

On Wed, Oct 10, 2018, 12:28 AM sabrogden notifications@github.com wrote:

Isn't TryEnqueue just a matter of passing in a timeout into TrySend

`public bool TryEnqueue(T value, TimeSpan timeout) { bool added = false;

    var msg = new Msg();
    msg.InitGC(EmptyArray<byte>.Instance, 0);

    lock (m_writer)
    {
        added = m_writer.TrySend(ref msg, timeout, false);
    }

    if(added)
    {
        m_queue.Enqueue(value);
    }

    msg.Close();

    return added;
}`

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/zeromq/netmq/issues/759#issuecomment-428359058, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9tktWt1OMeX453Hxaf2vtsn_Dbp0ks5ujRT1gaJpZM4XTwiF .

abiratur commented 5 years ago

Yes, you are right. The problem is that the watermarks are set aftee the connection between the writer and reader. The call to PairSocket.CreateSocketPair is creating the connection. The solution should be to maybe add an overload to the create socket pair that accept initializer function. On Tue, Oct 9, 2018, 9:00 PM sabrogden @.***> wrote: I'm not seeing that make a difference, just to test I added the following code in the constructor of NetMQQueue, but still seeing the same issue. //m_writer.Options.SendHighWatermark = m_reader.Options.ReceiveHighWatermark = capacity / 2; m_writer.Options.SendHighWatermark = 0; m_writer.Options.ReceiveHighWatermark = 0; m_reader.Options.SendHighWatermark = 0; m_reader.Options.ReceiveHighWatermark = 0; — You are receiving this because you commented. Reply to this email directly, view it on GitHub <#759 (comment)>, or mute the thread https://github.com/notifications/unsubscribe-auth/AClv9teAnRH53U0IoGVDjbEpaRz8rtjfks5ujORNgaJpZM4XTwiF .

Created pull request #766

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.