Open ngbrown opened 7 months ago
Can you please elaborate more?
You didn't have problems given the same stream / data with <=3.13
, right ?
Do you use the stream client to send and receive data?
Do you have this problem with all the messages?
It seems that the problem is during the decode some value from a map ApplicationProperties
or Properties
Here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/Tests/Amqp10Tests.cs#L469. You can find all the tests we use to validate the map. Can you see if you are using something different?
The .NET stream client is only reading from the streams. The source of data in the stream is a binding to an exchange. [The source of the data for the exchange is an exchange on another RabbitMQ cluster, copied by a shovel.] There was no problem in RabbitMQ 3.12.12. The only thing that changed was upgrading the RabbitMQ cluster.
All streams started having the same problem. I haven't yet tried skipping messages or re-starting the client offset.
Ok., so you publish messages with a AMQP Client, right?
Ok., so you publish messages with a AMQP Client, right?
Yes, and the publisher didn't change.
When I rewind the client starting point, it correctly receives messages that were published on the exchange before the RabbitMQ server was upgraded. All new messages after the upgrade have a problem. The RabbitMQ.Stream.Client
proceeds to skip through every stream offset (incrementing ChunkId) without finding any more valid messages.
These messages are copied into the exchange either copied by shovel or federation. When by federation, the error is slightly different:
RabbitMQ.Stream.Client.AMQP.AmqpParseException: Read Any: Invalid type: 192
The so when the message is copied by federation that this could be referring to an FormatCode.List8 = 0xc0
and when copied by a shovel, the stream message contains FormatCode.List32 = 0xd0
.
I captured a Wireshark trace of the connections to port 5553
from the client.
At least currently, this is happening at offset 0x54
(RabbitMQ.Stream.Client.dll!RabbitMQ.Stream.Client.AMQP.Map<object>.Parse<RabbitMQ.Stream.Client.AMQP.Annotations>(ref System.Buffers.SequenceReader<byte> reader, ref int byteRead)
).
It appears that the difference (breaking change) is that there is now a new message annotation x-received-from
that was not in the old messages. This in in the message annotation (53 72
). The parser then crashes while parsing the value
, right after parsing the key
:
A3 0F 78 2D 72 65 63 65 69 76 65 64 2D 66 72 6F 6D C0 A3 01 C1 A0 0A A3 03 75 72 69 A1 2A 61 6D 71 70 3A 2F 2F ..x-received-from......uri.*amqp://
Alternatively, there is a message annotation called x-shovelled
with binary data like:
A3 0B 78 2D 73 68 6F 76 65 6C 6C 65 64 D0 00 00 01 11 00 00 00 01 D1 00 00 01 08 00 00 00 12 A3 0C 73 68 6F 76 65 6C 6C 65 64 2D 62 79 A1 13 72 61 62 62 69 74 40 38 32 30 63 35 30 64 32 31 30 37 62 A3 0B 73 68 6F 76 65 6C 2D 74 79 70 65 A1 07 64 79 6E 61 6D 69 63 ..shovelled-by..rabbit@820c50d2107b..shovel-type..dynamic
This is probably a result of the 3.13 RabbitMQ instance receiving data from another cluster (still on 3.12) either by shovel or federation.
Since the message annotation in AMQP 1.0 is a map, then the value of the map could be anything, including a list. The current client doesn't take this into account.
Partial fixed by https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/372
Describe the bug
I upgraded a RabbitMQ cluster to 3.13.1 and my client using
RabbitMQ.Stream.Client
version1.8.2
started throwing exceptions:with the following stack trace:
Reproduction steps
Connect to a stream on RabbitMQ 3.13.1, where the data is bound to an exchange that is either shoveled or federated with another cluster that is providing the data on it's own exchange (3.12.x in this case). Clients are providing data to the exchange on the source cluster.
Expected behavior
Work correctly with new versions of RabbitMQ.
Additional context
I did not enable any new feature flags after upgrading. All feature flags were enabled in 3.12.12. I did turn on the configuration
classic_queue.default_version = 2
as part of the upgrade. (Tried again without this configuration, doesn't make a difference).