ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
7.07k stars 375 forks source link

amqp message rejected if header is not string #65

Open danielllek opened 5 years ago

danielllek commented 5 years ago

I've got message with header that is not string, and it gets rejected by watermill: [watermill] 2019/04/17 15:48:14.371452 subscriber.go:245: level=ERROR msg="Processing message failed, sending nack" amqp_exchange_name=x amqp_queue_name=y err="metadata x-death is not a string, but []interface {}....

x-death header is added by dead letter, and I would like to process this message again after some time in DL.

My flow looks like this:

  1. try to process message,
  2. after few NACKs message gets moved to dead letter queue not to block main queue,
  3. there's policy on DL that moves message back to main queue after some few minutes to try to process it again.

Most simple solution is to ignore headers that are not strings and not add them to Metadata - this would work me.

I'm willing to make fix/PR, but please advise if that's good solution or should I do it in some other way (maybe flattening

roblaszczak commented 5 years ago

Metadata are strings because almost all of Pub/Subs are supporting only simple types for metadata. So the main thing that we need to keep is to make amqp.Marshaler transparent, so you can make multiple Pub/Subs pipelines.

So to solve it and keep backward compatibility we can make custom marshaling of x-death to string. What values it contains? Some ints? And do you know about any other headers in rabbit which are not string?

Unfortunately it will not work for all non-string headers, but the question is that is there any default non-string headers? If someone have something custom, he should probably to implement custom marshaller.

danielllek commented 5 years ago

Until now it's the only one that makes problems, my example value (dump by spew):

([]interface {})[

(amqp.Table)map[
time:(time.Time)2019-04-16 18:50:19 +0000 UTC 
count:(int64)642 
exchange:(string)TEST.E.Fanout.xxx.Worker.DL 
queue:(string)TEST.Q.xxx.Worker.DL 
reason:(string)expired 
routing-keys:([]interface {})[(string)TEST.Q.xxx.Worker]
] 

(amqp.Table)map[
count:(int64)642 
exchange:(string) 
queue:(string)TEST.Q.xxx.Worker 
reason:(string)rejected 
routing-keys:([]interface {})[(string)TEST.Q.xxx.Worker] 
time:(time.Time)2019-04-16 18:49:49 +0000 UTC
]
]
danielllek commented 5 years ago

OK, custom marshaler works for me, thanks. But nevertheless watermill shouldn't fail on standard headers ;)

roblaszczak commented 5 years ago

But nevertheless watermill shouldn't fail on standard headers ;) Yup, it is not so good :)

I have couple ideas how it can be fixed, but nothing sounds good:

  1. Using gob to marshal Rabbit's metadata to message metadata, but it will break all string headers
  2. Storing some metada fields in ctx, but if someone will overwrite ctx we will lose this information
  3. We can use for some "special" metadata fields (maybe some prefix in name) which will serialized with gob, to keep compatibility with simple string fields. All other fields will be not marshaled.
  4. We can make explicit blacklist of ignored fields, but we will lose transparency

But 3 and 4 sounds most reasonable. What do you think?

danielllek commented 5 years ago

clarifying 3:

  1. normal headers will be strings like currently
  2. complex types will be gob'ed and key will be something like: x-death__gob?

If you mean something like that, I like that :)

roblaszczak commented 5 years ago

Exactly :)

checkmunza commented 1 year ago

Does anyone know how many data types of a header can be other than string, in Golang context and AMQP context?

checkmunza commented 1 year ago

Does anyone know how many data types of a header can be other than string, in Golang context and AMQP context?

I think I found how each AMQP type is mapped to a Go type. https://github.com/rabbitmq/amqp091-go/blob/5c9eb2241d43a1d2ff3ae0c98b7153a38625973d/read.go#L142-L159

I think gob cannot be used in this case because it is possible for an AMQP type to be mapped to nil in Go, and gob cannot encode nil values.

kasderooi commented 2 months ago

Hi I am encountering this issue as well. Im using the delayed message plugin for amqp and also quorum type queues. These functionalities respectively utilize the "x-delay" and add the "x-delivery-count" keys with a numeric value.

Adding a prefix will break the functionality of the "x-delay" value and is not possible for the "x-delivery-count" header that is added by amqp itself, or am I misunderstanding the solution?