ppat / storm-rabbitmq

A library of tools for interacting with RabbitMQ from Storm.
MIT License
126 stars 77 forks source link

Empty message headers due to ByteArrayLongString #31

Closed daroay closed 8 years ago

daroay commented 9 years ago

In class RabbitMQMessageScheme

In function private Map<String, Object> serialiazableHeaders(Map<String, Object> headers)

The values are instances of com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString

I expect them to be String

As a result, after looping with the instanceof the serializableHeaders comes as empty Map.

My only rabbit dependency is:

io.latent storm-rabbitmq 0.6.0
daroay commented 9 years ago

I had to extend a local RabbitMQMessageScheme for my needs that patches that.

It only works in case the headers are strings.

Can't figure out the root cause yet.

public class RabbitMQMessageScheme2 extends RabbitMQMessageScheme {

    public RabbitMQMessageScheme2(Scheme payloadScheme, String envelopeFieldName, String propertiesFieldName) {
        super(payloadScheme, envelopeFieldName, propertiesFieldName);
    }

    @Override
    public List<Object> deserialize(Message message) {
        List<Object> values = super.deserialize(message);
        Properties p = (Properties) values.get(values.size() - 1);
        setHeaders(message, p);
        return values;
    }

    private void setHeaders(Message m, Properties p){
        Message.DeliveredMessage dm = (Message.DeliveredMessage)m;
        p.getHeaders().putAll(serialiazableHeaders(dm.getHeaders()));
    }

    private Map<String, Object> serialiazableHeaders(Map<String, Object> headers) {
        if (headers == null) {
            return new HashMap<String, Object>();
        }
        Map<String, Object> serializableHeaders = new HashMap<String, Object>(headers.size());
        for (Map.Entry<String, Object> entry : headers.entrySet()) {
            serializableHeaders.put(entry.getKey(), entry.getValue().toString());
        }
        return serializableHeaders;
    }

}
SeanTAllen commented 9 years ago

Do you have a self contained test case that demonstrates the problem?

ppat commented 8 years ago

What you are noticing is the intended behavior for headers containing RabbitMQ's internal data types.

RabbitMQMessageScheme will include standard data type (String's, Int's, etc..) values in the resulting headers. But if you are using any other data types, you will need to provide your own implementation of a Scheme, so your solution above is good for that.

RabbitMQ client library uses some strange data types like com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString internally for RabbitMQ own headers. Your solution is good for that, so closing issue.