Jroland / kafka-net

Native C# client for Kafka queue servers.
Apache License 2.0
483 stars 232 forks source link

KafkaTcpSocketTests.WriteAndReadShouldBeAsyncronous fails intermittently. #37

Open AdeMiller opened 9 years ago

AdeMiller commented 9 years ago

This test occasionally fails with the following:

FakeTcpServer: Accepting clients.
No connection to:http://localhost:8999/.  Attempting to re-connect...
Connection established to:http://localhost:8999/.
FakeTcpServer: Connected client
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: Client exception...  Exception:Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host.
FakeTcpServer: Client Disconnected.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
FakeTcpServer: writing 4 bytes.
  Expected is <System.Collections.Generic.List`1[System.Int32]> with 10 elements, actual is <System.Linq.OrderedEnumerable`2[System.Int32,System.Int32]>
  Values differ at index [7]

   at NUnit.Framework.Assert.That(Object actual, IResolveConstraint expression, String message, Object[] args)
   at kafka_tests.Unit.KafkaTcpSocketTests.WriteAndReadShouldBeAsyncronous() in KafkaTcpSocketTests.cs: line 416
AdeMiller commented 9 years ago

Looks like the following will fix this. There are several test cases that look like they might have similar concurrency issues here.


        [Test]
        public void WriteAndReadShouldBeAsyncronous()
        {
            var write = new List<int>();
            var read = new ConcurrentBag<int>();
            var expected = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            using (var server = new FakeTcpServer(FakeServerPort))
            {
                server.OnBytesReceived += data => write.AddRange(data.Batch(4).Select(x => x.ToArray().ToInt32()));

                var test = new KafkaTcpSocket(new DefaultTraceLog(), _fakeServerUrl);

                var tasks = Enumerable.Range(1, 10)
                    .SelectMany(i => new[]
                    {
                        test.WriteAsync(i.ToBytes()),
                        test.ReadAsync(4).ContinueWith(t => read.Add(t.Result.ToInt32())),
                        server.SendDataAsync(i.ToBytes())
                    }).ToArray();

                Task.WaitAll(tasks);
                Assert.That(write.OrderBy(x => x).ToList(), Is.EqualTo(expected));
                Assert.That(read.OrderBy(x => x).ToList(), Is.EqualTo(expected));
                // Don't let server go out of scope before test has finished accessing it.
                server.DropConnection();
            }
        }
Jroland commented 9 years ago

Thanks Ade, there are a few that are definitely not testing things correctly in terms of concurrency. And some that are not testing exactly what they say they are testing. They definitely need review.

Jroland commented 9 years ago

Hey Ade, do you want to do the test as a pull request so you can get credit for the fix? Otherwise I can just paste the fix in. Let me know.