akkadotnet / Alpakka

Akka Streams Connectors - Alpakka
https://alpakka.getakka.net/
Apache License 2.0
110 stars 40 forks source link

RabbitMQ docker container doesn't work in Azure Pipeline Linux container #93

Open Arkatufus opened 4 years ago

Arkatufus commented 4 years ago

There are several anomalies related with Akka.Streams.AMQP.V1 and the RabbitMQ docker container running under the ubuntu-18.04 Azure Pipelines image.

For the V1.0 plugin, our code could not establish any AMQP handshake with the container:

Akka.Streams.Amqp.V1.Tests.AmqpConnectorsTest.Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process [FAIL]
       System.OperationCanceledException : The transport 'TcpTransport' is closed.
       Stack Trace:
            at Amqp.Framing.Reader.ReadBuffer(ITransport transport, Byte[] buffer, Int32 offset, Int32 count)
            at Amqp.Framing.Reader.ReadHeader(ITransport transport)
            at Amqp.Sasl.SaslProfile.Open(String hostname, ITransport transport)
            at Amqp.Connection.Connect(SaslProfile saslProfile, Open open)
            at Amqp.Connection..ctor(Address address, IHandler handler)
            at Amqp.Connection..ctor(Address address)
         /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp.Tests/V1/AmqpConnectorsTest.cs(42,0): at Akka.Streams.Amqp.V1.Tests.AmqpConnectorsTest.Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process()
         --- End of stack trace from previous location where exception was thrown ---
       Output:
         [07:51:11.297] SEND AMQP 3 1 0 0
         [DEBUG][08/11/2020 19:51:11][Thread 0016][ActorSystem(test)] System shutdown initiated
         [DEBUG][08/11/2020 19:51:11][Thread 0015][EventStream] Shutting down: StandardOutLogger started
         [DEBUG][08/11/2020 19:51:11][Thread 0015][EventStream] All default loggers stopped

It appears from the frame log that the broker immediately closes the TCP socket as soon as the client sends a SEND AMQP 3 1 0 0 to start a handshake, and refuses any connection attempts after that.

For the standard AMQP protocol, the broker seemed to suddenly drops TCP connection on specific tests:

X Akka.Streams.Amqp.Tests.AmqpConnectorsTest.Pub_sub_from_one_source_with_multiple_sinks [5s 120ms]
   Error Message:
    System.AggregateException : One or more errors occurred. (Did not get at least one element from every fanout branch)
 ---- System.Exception : Did not get at least one element from every fanout branch
   Stack Trace:
      at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
    at System.Threading.Tasks.Task`1.get_Result()
    at Akka.Streams.Amqp.Tests.AmqpConnectorsTest.Pub_sub_from_one_source_with_multiple_sinks() in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs:line 277
 ----- Inner Stack Trace -----
   Standard Output Messages:
  [ERROR][08/11/2020 19:51:12][Thread 0015][[akka://test/user/StreamSupervisor-2/Flow-0-0-unknown-operation#2121431516]] Error during PreStart in [AmqpSource]
  Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
   ---> System.IO.IOException: Unable to write data to the transport connection: Cannot access a disposed object.
  Object name: 'System.Net.Sockets.Socket'..
   ---> System.ObjectDisposedException: Cannot access a disposed object.
  Object name: 'System.Net.Sockets.Socket'.
     at System.Net.Sockets.Socket.Send(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags, SocketError& errorCode)
     at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
     --- End of inner exception stack trace ---
     at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
     at System.IO.BufferedStream.Flush()
     at System.IO.BinaryWriter.Flush()
     at RabbitMQ.Client.Impl.SocketFrameHandler.SendHeader()
     at RabbitMQ.Client.Framing.Impl.Connection.StartAndTune()
     at RabbitMQ.Client.Framing.Impl.Connection.Open(Boolean insist)
     at RabbitMQ.Client.Framing.Impl.Connection..ctor(IConnectionFactory factory, Boolean insist, IFrameHandler frameHandler, String clientProvidedName)
     at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IFrameHandler fh)
     at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
     at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
     --- End of inner exception stack trace ---
     at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
     at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints)
     at Akka.Streams.Amqp.AmqpConnector.NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpConnector.cs:line 71
     at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpSourceStage.cs:line 75
     at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart() in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpConnector.cs:line 106
     at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
Arkatufus commented 4 years ago

Note that the docker container image was tested fine on my computer as a background application, so that should rule out that it was a botched/buggy RabbitMQ build.