justinstenning / SharedMemory

C# shared memory classes for sharing data between processes (Array, Buffer and Circular Buffer)
https://www.nuget.org/packages/SharedMemory
Other
569 stars 119 forks source link

RpcBuffer message timeouts #52

Open deviousasti opened 4 years ago

deviousasti commented 4 years ago

I have a very simple implementation using RpcBuffer in F#. But it keeps missing messages.

If I send messages between host and client in a loop, about a tenth of the time responses time-out. I'm not sure exactly how to replicate the failures, they seem like heisenbugs.

This isn't an accurate repro, but it has the ratio of timeouts I'm seeing.

    static class Program
    {
        private static async Task Main(string[] args)
        {
            // Ensure a unique channel name
            var rpcName = $"rpc-{Guid.NewGuid()}";
            var rpcMaster = new RpcBuffer(rpcName, bufferNodeCount: 2);
            var input = Encoding.Unicode.GetBytes(new String('I', 1024));
            var output = Encoding.Unicode.GetBytes(new String('O', 1024));
            var rpcSlave = new RpcBuffer(rpcName,
                (_, payload) => Task.Run(async () =>
                {
                    await Task.Delay(100);
                    return output;
                })
            );

            async Task Run()
            {
                var result = await rpcMaster.RemoteRequestAsync(input, timeoutMs: 1000);
                if (!result.Success)
                {
                    Console.WriteLine("Timedout");
                }
                else
                {
                    var sresult = Encoding.Unicode.GetString(result.Data);
                    Console.WriteLine($"Received: {sresult.Substring(1, 5)}...");
                }
            }

            void LogIfFailed(Task task)
            {
                if(task.IsFaulted)
                    Console.WriteLine("Failed");
            }

            var eval = 
            Parallel.For(1, 20, 
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, _ => LogIfFailed(Run()));

            Console.ReadLine();
        }

    }
justinstenning commented 4 years ago

Have not had a chance to look at this in detail yet, but can you try with a bigger node count/buffer size?

deviousasti commented 4 years ago

I did, and it's still the same. It's sort of random when it fails. The original, simpler case is in F#. I can post it you'd like.

raegwolf commented 4 years ago

@deviousasti were you able to resolve this? We're seeing a similar issue.

deviousasti commented 4 years ago

Sorry Daniel, I tried several things, but eventually gave up. This was for a VS extension, and synchronization with VS's threading model proved to be difficult.

I switched to streamjsonrpc from the VS team, and it works fine. The RPC interface is similar enough. You can use named pipes instead of mmap.

raegwolf commented 3 years ago

Thanks Asti, such a pity as this is an otherwise great implementation and I love the fact that it hardly has any dependencies. Unfortunately I wasn't able to track down the problem and we're going to move over to the MS RPC option you suggest.

A huge thank you to @spazzarama for providing this library. It was enormously helpful to us as part of a large architectural change in our platform.

justinstenning commented 3 years ago

@DanielChilcott @deviousasti sorry for the late reply. I wonder if this is caused by timing out waiting for slot in full buffer? Could perhaps have used infinite timeout to workaround?

justinstenning commented 3 years ago

@DanielChilcott @deviousasti I think this is actually a performance related issue, if you leave the timeoutMs at the default (30s) they all succeed.

deviousasti commented 3 years ago

Thank you for continued investigations.IIRC, I did try various values of timeout but messages were still missing. The situation might have changed now though.

deviousasti commented 3 years ago

Shall I close this issue?

justinstenning commented 3 years ago

I might keep it open a little longer, the performance was not what I expected, and even without a 100ms delay there was some weirdness.

mightypanda commented 3 years ago

Hello, thank you for this super-useful library! I've been using it with RpcBuffer to send high-frequent messages, such as the value changing from a WPF slider component, and i noticed a pattern that seems invariant from buffer size or number of allocated nodes.

The very first time i send a burst of messages the buffer fills up quickly and starts to give timeout, if i sent like 500 messages i get the first ones then some of the others delayed (not related to the timeout value, they arrive at 1 second frequency). After this "burst" everything goes lightning fast, no matter how many messages and how frequently i send them.

Without digging in the code, it "feels" like it is an initialization problem. Hope this helps!

UPDATE: it seems if I send a message first and wait "a little", then everything will be smooth after even if i send at high frequency.

justinstenning commented 3 years ago

@mightypanda thanks for that information. That is an interesting behaviour, so clearly something is going on.

justinstenning commented 2 years ago

I think this is probably related to ThreadPool exhaustion / starvation

justinstenning commented 2 years ago

@deviousasti I know you found another solution, however I believe this has been fixed on the following branch: https://github.com/spazzarama/SharedMemory/tree/non-async-benchmarks

Any chance you can retest?

@mightypanda if you could also test that would be really helpful.

justinstenning commented 2 years ago

This is the updated example - with no failures:

    static class Program
    {
        private static async Task Main(string[] args)
        {
            // Ensure a unique channel name
            var rpcName = $"rpc-{Guid.NewGuid()}";
            var rpcMaster = new RpcBuffer(rpcName, bufferNodeCount: 2);
            var input = Encoding.Unicode.GetBytes(new String('I', 1024));
            var output = Encoding.Unicode.GetBytes(new String('O', 1024));
            var rpcSlave = new RpcBuffer(rpcName,
                (_, payload) =>
                {
                    Thread.Sleep(100);
                    return output;
                }, receiveThreads: 4
            );

            async Task Run()
            {
                var result = rpcMaster.RemoteRequest(input, timeoutMs: 1000);
                if (!result.Success)
                {
                    Console.WriteLine("Timedout");
                }
                else
                {
                    var sresult = Encoding.Unicode.GetString(result.Data);
                    Console.WriteLine($"Received: {sresult.Substring(1, 5)}...");
                }
            }

            void LogIfFailed(Task task)
            {
                if (task.IsFaulted)
                    Console.WriteLine("Failed");
            }

            var eval =
            Parallel.For(1, 80,
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, _ => LogIfFailed(Run()));

            Console.ReadLine();
        }
    }