ppat / storm-rabbitmq

A library of tools for interacting with RabbitMQ from Storm.
MIT License
126 stars 77 forks source link

need help #19

Closed yuhengd closed 9 years ago

yuhengd commented 9 years ago

Hi,

I just begin to use storm and I want to integrate it with rabbitmq. Can anyone provide a sample code to implemente the backtype.storm.spout.Scheme to deserialize a RabbitMQ message payload?

A simple example would be of great help. I am confused about this part. I am using json style message in my rabbitmq payload.

Thanks.

arinto commented 9 years ago

Here's my simple scheme to deserialize a payload as a String:

import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;

import com.google.common.base.Charsets;

public class SimpleStringScheme implements Scheme {

    /**
     * 
     */
    private static final long serialVersionUID = 3195794623690787473L;
    private static final Logger logger = LoggerFactory.getLogger(SimpleStringScheme.class);

    private final String encoding;

    public SimpleStringScheme(String inEncoding) {
        this.encoding = inEncoding;
    }

    public SimpleStringScheme() {
        this(Charsets.UTF_8.toString());
    }

    @Override
    public List<Object> deserialize(byte[] ser) {
        String chars = null;
        try {
            chars = new String(ser, encoding);
        } catch (UnsupportedEncodingException e) {
            logger.error("fail to deserialize");
        }

        return Collections.singletonList((Object)chars);
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("str");
    }

}

Then you can use the message scheme as below:

       Scheme simpleStringRmqMsgScheme = new SimpleStringScheme();
       Declarator directExchWorkQueue = new DirectExchWorkQueueDecl(exchange, queue, routing);
       IRichSpout imgRmqSpout = new RabbitMQSpout(simpleStringRmqMsgScheme, directExchWorkQueue);
yuhengd commented 9 years ago

Thank you Arinto, that's very helpful!

I will implement mine according to this example.

best,

On Fri, Oct 31, 2014 at 10:55 AM, Arinto Murdopo notifications@github.com wrote:

Here's my simple scheme to deserialize a payload as a String:

import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.List;

import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields;

import com.google.common.base.Charsets;

public class SimpleStringScheme implements Scheme {

/**
 *
 */
private static final long serialVersionUID = 3195794623690787473L;
private static final Logger logger = LoggerFactory.getLogger(SimpleStringScheme.class);

private final String encoding;

public SimpleStringScheme(String inEncoding) {
    this.encoding = inEncoding;
}

public SimpleStringScheme() {
    this(Charsets.UTF_8.toString());
}

@Override
public List<Object> deserialize(byte[] ser) {
    String chars = null;
    try {
        chars = new String(ser, encoding);
        logger.trace("deserialization result: {}", chars);
    } catch (UnsupportedEncodingException e) {
        logger.error("fail to deserialize");
    }

    return Collections.singletonList((Object)chars);
}

@Override
public Fields getOutputFields() {
    return new Fields("str");
}

}

Then you can use the message scheme as below:

   Scheme simpleStringRmqMsgScheme = new SimpleStringScheme();
   Declarator directExchWorkQueue = new DirectExchWorkQueueDecl(exchange, queue, routing);
   IRichSpout imgRmqSpout = new RabbitMQSpout(simpleStringRmqMsgScheme, directExchWorkQueue);

— Reply to this email directly or view it on GitHub https://github.com/ppat/storm-rabbitmq/issues/19#issuecomment-61272271.

arinto commented 9 years ago

You're welcome. Please close the issue accordingly :+1:

yuhengd commented 9 years ago

Hi guys,

I am using RabbitMQSpout to read data from rabbitmq. Here is what I see in the console when I run the topology locally:

101703 [Thread-16-storm-obser-spout] INFO backtype.storm.daemon.executor - Acking message 33 113099 [Thread-16-storm-obser-spout] INFO backtype.storm.daemon.task - Emitting: storm-obser-spout default [{"readings":[0.0,0.0,0.0,88.007,0.0,0.0,0.0,17.83,64.0,0.0,0.0,0.0,0.0,11.071,0.2,1.69,0.0,0.0,-0.23,88.007,0.0,0.0,0.0,0.0,0.0,0.0],"observationId":"f45ef06f-88dc-4d50-b28e-24937dda5240","deploymentId":" http://www.xxxx.org/resource/deployment#aiken_8","deviceId":"0","observationDateTime":"11-05-2014 19:05:15 UTC"}, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@2a8ec790, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@3cfd4c4] 113099 [Thread-16-storm-obser-spout] INFO backtype.storm.daemon.task - Emitting: storm-obser-spout __ack_init [-3424274568001459772 0 2]

Can anybody tell me where does these two lines come from: "io.latent.storm.rabbitmq.RabbitMQMessageScheme$Envelope@2a8ec790, io.latent.storm.rabbitmq.RabbitMQMessageScheme$Properties@3cfd4c4" ?

How can I parse each field in my JSON format messages like "readings" and "observationDateTime"?

The storm-json provided by https://github.com/rapportive-oss/storm-json seemed to work only for storm version <= 0.6.0.

Is there a newer version of storm-json?

I attached my topology source code.

Thanks.

On Fri, Oct 31, 2014 at 11:13 AM, Arinto Murdopo notifications@github.com wrote:

You're welcome. Please close the issue once accordingly [image: :+1:]

— Reply to this email directly or view it on GitHub https://github.com/ppat/storm-rabbitmq/issues/19#issuecomment-61274985.