rabbitmq / rabbitmq-amqp1.0-client

Erlang AMQP 1.0 client
Other
20 stars 29 forks source link

Continuation frames without delivery_id are not handled correctly. #11

Closed Lutziferbox closed 6 years ago

Lutziferbox commented 6 years ago

Hi,

I am using this rabbitmq-amqp1.0-client in an elixir project to read AMQP messages from an Azure Event Hub. Yesterday, there appeared some broken message in the queue and the amqp-client did break during handling it. The message looks something like this: "\":null,\"CorrelationId\":null,..." and the amqp-client reacts with the warning log "Unhandled session frame {{:"v1_0.transfer", {:uint, 0}, :undefined, :undefined, :undefined, :undefined, false, :undefined, :undefined, :undefined, :undefined, :undefined},", followed by the frame and "in state {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>,". Afterwards it terminates with " State machine #PID<0.5511.0> terminating Last event in was {{:"v1_0.transfer", {:uint, 0}, {:uint, 30586}, {:binary, ""}, {:uint, 0}, true, false, :undefined, :undefined, :undefined, :undefined, true}, <<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113, 117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 129, 0, 0, 0, 0, 0, 12, 20, 56, 163, 12, 120, 45, 111, 112, 116, 45, 111, 102, 102, ...>>} When State == :mapped Data == {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>, {:ssl, {:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined},

PID<0.5503.0>}},

%{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0}, :attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined, {:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false, {{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0}, true, true, :undefined, :undefined, :undefined, :undefined, true}," followed by the frame again and something like this: "<<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113, 117, 101, 110, ...>>]}, :never}}, %{}, %{0 => 0}, 1, [], %{address: 'xxx', container_id: "xxx", hostname: "xxx", notify: #PID<0.5499.0>, outgoing_max_frame_size: 65536, port: 5671, sasl: {:plain, "xxx", "xxx"}, tls_opts: {:secure_port, []}, transfer_limit_margin: 0}, %{}, %{},

PID<0.5499.0>}

Reason for termination = {:bad_return_value, {:primitive_type_unsupported, 176}}"

I did not find a way to react to this error from within my wrapping code.

Could you please help me figure out what exactly is going wrong and what to do about it? Thanks a lot in advance!

Lutziferbox commented 6 years ago

Hi again, first of all thank you for your answer and your tipps on how to get support for my issue. My problem is that I am not using rabbitmq at all but only this client to connect to an Azure Event Hub instead.

My client is running an Elixir umbrella app within a Docker container that uses the rabbitmq-amqp1.0-client like this:

def start_link(partition) do
  GenServer.start_link(__MODULE__, partition)
end

def init(partition) do
  send(self(), :connect)
  {:ok, %{status: :disconnected, receiver: nil, partition: partition}}
end

def handle_info(:connect, %{status: :disconnected, partition: partition} = state) do
  connection_info = %{
    address: 'xxx',
    hostname: "xxx",
    container_id: "xxx",
    port: 5671,
    sasl: {:plain, xxx-policy, xxx-key},
    tls_opts: {:secure_port, []}
  }

  with {:ok, connection} <- :amqp10_client.open_connection(connection_info),
       {:ok, session} <- :amqp10_client.begin_session(connection),
       {:ok, receiver} <- apply(:amqp10_client, :attach_receiver_link, [session, "receiver", partition]),
       :ok <- :amqp10_client.flow_link_credit(receiver, 1000, :never)
  do
    {:noreply, %{state | status: :connected, receiver: receiver}}
  end
end

def handle_info({:amqp10_msg, _link, message}, state) do
  {message_content, message_annotations} = do_something_with_message(message)
  {:noreply, state}
end

The code did work fine until recently when we started to see the following error message:

` 17:43:18.712 [warn] Unhandled session frame {{:"v1_0.transfer", {:uint, 0}, :undefined, :undefined, :undefined, :undefined, false, :undefined, :undefined, :undefined, :undefined, :undefined}, "xxx-very-long-not-valid-json-string" <> ...} in state {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>, {:ssl, {:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined},

PID<0.5503.0>}},

%{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0}, :attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined, {:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false, {{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0}, true, true, :undefined, :undefined, :undefined, :undefined, true}, ["xxx-another-very-long-not-valid-json-string (truncated) 17:43:18.717 [error] State machine #PID<0.5511.0> terminating Last event in was {{:"v1_0.transfer", {:uint, 0}, {:uint, 30586}, {:binary, ""}, {:uint, 0}, true, false, :undefined, :undefined, :undefined, :undefined, true}, <<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113, 117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 129, 0, 0, 0, 0, 0, 12, 20, 56, 163, 12, 120, 45, 111, 112, 116, 45, 111, 102, 102, ...>>} When State == :mapped Data == {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>, {:ssl, {:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined},

PID<0.5503.0>}},

%{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0}, :attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined, {:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false, {{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0}, true, true, :undefined, :undefined, :undefined, :undefined, true}, ["xxx-another-very-long-not-valid-json-string" <> ..., <<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113, 117, 101, 110, ...>>]}, :never}}, %{}, %{0 => 0}, 1, [], %{address: 'xxx', container_id: "xxx", hostname: "xxx", notify: #PID<0.5499.0>, outgoing_max_frame_size: 65536, port: 5671, sasl: {:plain, "xxx", "xxx"}, tls_opts: {:secure_port, []}, transfer_limit_margin: 0}, %{}, %{},

PID<0.5499.0>}

Reason for termination = {:bad_return_value, {:primitive_type_unsupported, 176}} `

I am still working on means to reproduce this issue with a short script, but didn't find any good way to do so yet.

Any help is much appreciated.

kjnilsson commented 6 years ago

We happily try to answer questions on the mailing list about the wider RabbitMQ ecosystem which this client is a part of.

That said what would really help in this case is a network capture of the offending frame (tcpdump -w or wireshark) as well as full stack traces.

kjnilsson commented 6 years ago

Ok I think I know what is going on. The delivery_id of the transfer frame you received isn't set. The client doesn't expect this however according to the spec delivery_id can be omitted for continuation transfers. I think I can see a partial frame in the state dump which means this this is a continuation transfer. I'm not sure why it results in the parsing error at the end but there definitely is a bug around continuation transfers that needs fixing.

Lutziferbox commented 6 years ago

Wow, thank you so much for keeping thinking about this. Sadly I am not a big help in solving this issue since my erlang is absolutely basic, but I really appreciate your work here. I will try to encourage some colleagues of mine to also have a look! Your assessment of the situation however looks good to me since the message in question was very long and might have been bigger then the allowed frame size.

lukebakken commented 6 years ago

@Lutziferbox - is there a chance you can try out @kjnilsson 's fix in the rabbitmq-amqp1.0-client-11 branch? If there is a packaging step or other assistance I can provide let me know.

Lutziferbox commented 6 years ago

Hey Luke, First of all, thanks for the quick action from all fo you. I pulled the new master and tried processing the broken message again and it looks like the issue still persists. Testing is not very easy at the moment, so it will take me some time to provide the full stacktrace and all additional information. I‘ll add to this thread as soon as I have more evidence.

michaelklishin commented 6 years ago

We should really provide an alpha package here to avoid any possible issues with the wrong version tested.

What kind of package (generic UNIX, Debian, etc) do you need?

On Wed, Dec 20, 2017 at 7:29 PM, Lutziferbox notifications@github.com wrote:

Hey Luke, First of all, thanks for the quick action from all fo you. I pulled the new master and tried processing the broken message again and it looks like the issue still persists. Testing is not very easy at the moment, so it will take me some time to provide the full stacktrace and all additional information. I‘ll add to this thread as soon as I have more evidence.

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub https://github.com/rabbitmq/rabbitmq-amqp1.0-client/issues/11#issuecomment-353094217, or mute the thread https://github.com/notifications/unsubscribe-auth/AAAEQpXVczeV8288PHXqXkwlsxfosLz7ks5tCSf2gaJpZM4RCcNc .

-- MK

Staff Software Engineer, Pivotal/RabbitMQ

Lutziferbox commented 6 years ago

My solution is running within an Ubuntu Docker Image.

kjnilsson commented 6 years ago

I just performed a manual test using the QPid amqp broker that like Azure Event Hub also omits the delivery_id for continuation frames and the client works fine with that using current master so I'm hoping the "Unhandled session frame" warning now shouldn't be there. That said there may be some other issue that occurs after that.

michaelklishin commented 6 years ago

3.8.0-alpha.51 should include #13. @Lutziferbox can you please conduct your test with that artifact and if it goes well, report back here? If it doesn't, let's continue on the mailing list until we find out enough to file a follow-up issue.

michaelklishin commented 6 years ago

Err, meant to link to 3.7.1-alpha.49 but they are effectively identical at the moment.

Lutziferbox commented 6 years ago

I created two test projects to test the broken messages with your new version and to extreact the complete message from the stream. They are not completely done yet, so I will need until tomorrow to finish everything and give more detailed feedback. I will write as soon as I have more information.

Lutziferbox commented 6 years ago

I finally could reconstruct the full message that was send from the Azure Event Hub which can not be processed in a correct way. From the structure it is not different from the other messages, meaning multiple JSON objects separated by each other with a new line. However the length is insane. This are 65 different JSON objects glued together with new lines resulting in a single message containing 124340 characters. I think the real issue here is located somewhere in the decoder not being able to handle a buffer that big or something like that. The message is not meant to be pasted here but also not really confidential so I could send it to you on some other way.

Lutziferbox commented 6 years ago

Concerning 3.7.1-alpha.49, I am not using any RabbitMQ Server but only the Azure Services and have not set up any other infrastructure. Thats why its so hard to dig into this.

michaelklishin commented 6 years ago

Moved to https://github.com/rabbitmq/rabbitmq-amqp1.0-client/issues/14. We can upload builds of only this client easily. Most people currently use it via the cross-protocol Shovel feature in 3.7.0.

Lutziferbox commented 6 years ago

Its a little bit late by now, but I can confirm that the error is fixed and the newest version of the rabbitmq-amqp1.0-client can handle even very large messages from the Azure Event Hub. Thank you so much for your support with this issue!

kjnilsson commented 6 years ago

great, thanks for reporting back.