SNAS / openbmp

OpenBMP Server Collector
www.openbmp.org
Eclipse Public License 1.0
232 stars 76 forks source link

Unable to parse Messages #14

Closed vsrahul82 closed 8 years ago

vsrahul82 commented 8 years ago

I am consuming messages from the "openbmp.parsed.ls_link" Kafka Topic. I am facing difficulty in unwrapping the messages that i am recieving from the topic. I have the option of recieving the messages as of type Object or as type org.springframework.messaging.Message. I tried parsing the payload in both cases by casting to the type org.openbmp.handler.LsLink. But what i got was a classcastexception. I have referred to the link http://www.openbmp.org/#!docs/MESSAGE_BUS_API.md for a description of the message attributes. But i am unable to obtain the individual attributes from the object that i am recieving when reading messages from the mentioned topic. Please guide if possible with code example as to how i can parse the messages. My code for reading messages from the topic is like this.

@EnableBinding(Sink.class) public class CloudStreamConsumer {

private static Logger logger = LoggerFactory
        .getLogger(CloudStreamConsumer.class);

/**
 * Method that receives the Kafka message
 * @param payload
 * @throws Exception
 */
@ServiceActivator(inputChannel = Sink.INPUT)
//@StreamListener(Sink.INPUT)
public void logger(Message<?> payload) throws Exception {
    //logger.info("recieved : " + payload);
    logger.info("Recieved Payload : " + payload.getPayload());
    logger.info("Recieved Headers : " + payload.getHeaders());
    LsLink pbj = (LsLink)payload.getPayload();
    logger.info("Obj = " + pbj.toString());
}

}

vsrahul82 commented 8 years ago

Adding further,, i am receiving a byte stream from the kafka topic. How can i proceed further to parse the message?

vsrahul82 commented 8 years ago

Please revert on this.

TimEvens commented 8 years ago

did you solve it? I was just about to checkout spring-cloud-stream

vsrahul82 commented 8 years ago

No. I havent been able to solve this. Please help.

TimEvens commented 8 years ago

Okay, I'll validate/run a test using: spring-cloud/spring-cloud-stream-samples/sink. Let me know if there is another example you would like me to test/validate with.

TimEvens commented 8 years ago

I have it working. I will be adding a new repo called "openbmp-api-message" which is a library that folks can use for Java parsing. Within this new repo, I have an examples folder which includes spring cloud-stream sink example. I should have that new repo later today... I just have to finish the docs.

vsrahul82 commented 8 years ago

Great!!! Cant wait to get my hands on the library. Appreciate your giving time for this. Thank you. Please upload as soon as possible and provide the link.

TimEvens commented 8 years ago

No problem.. this is good.

You should be able to use the library under: https://github.com/OpenBMP/openbmp-java-api-message

The spring cloud-stream example is https://github.com/OpenBMP/openbmp-java-api-message/tree/master/examples/spring-cloud-stream-sink.

Please let me know if you have any issues, changes, suggestions, etc... Please submit those to that repo. :)

Feel free to do pull requests as well to make needed updates.

vsrahul82 commented 8 years ago

Getting following exception while running the app:

org.supercsv.exception.SuperCsvException: The number of columns to be processed (27) must match the number of CellProcessors (29): check that the number of CellProcessors you have defined matches the expected number of columns being read/written

context={lineNo=1, rowNo=1, columnNo=1, rowSource=[add, 16340955, 9b7f67e81167c977d8447b88473dd59c, 393e94c52f0498b3d20f8f9482258fa0, 2001:558:fc10:ff3a::182, 02f184b07ab05bfacc545ebdfab39043, e6a3576a88a6b04d5458a49f1312e64e, 68.86.42.92, 7016, 2016-07-25 16:54:53.340700, 200.10.15.0, 24, 1, igp, 7016 7922 1239 7908, 4, 7908, 68.86.80.1, 0, 0, null, 7922:89 7922:3000, null, null, 0, 1, null]} at org.supercsv.util.Util.executeCellProcessors(Util.java:78) at org.supercsv.io.AbstractCsvReader.executeProcessors(AbstractCsvReader.java:203) at org.supercsv.io.CsvMapReader.read(CsvMapReader.java:99) at org.openbmp.api.parsed.message.Base.parse(Base.java:74) at org.openbmp.api.parsed.message.UnicastPrefix.(UnicastPrefix.java:41) at com.comcast.poc.cloudstream.hadoop.LogSink.log_UnicastPrefix(LogSink.java:49) at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:113) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:102) at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49) at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:347) at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131) at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330) at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:166) at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:317) at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:155) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:93) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)2016-07-25 10:54:55.595 INFO 9948 --- [ kafka-binder-3] c.c.poc.cloudstream.hadoop.LogSink : UNICAST_PREFIX: [ ]

at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69)
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171)
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:516)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154)
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:513)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221)
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209)
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67)
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

I am using gradle and these are my imports:

dependencies { compile('org.springframework.cloud:spring-cloud-starter-stream-kafka') compile('org.openbmp.api:openbmp-api-message:0.1.0-SNAPSHOT') compile('org.codehaus.jackson:jackson-mapper-asl:1.9.0') compile('net.sf.supercsv:super-csv:2.1.0')

testCompile('org.springframework.boot:spring-boot-starter-test') 

}

Is it becasue of the wrong jars being used that i am getting this error? The application is parsing messages properly for certain destinations like LS_NODE, PEER but giving above exception for LS_LINK UNICAST_PREFIX. Please advice.

TimEvens commented 8 years ago

I recently added BGP-LU and add paths support to IPv4/IPv6, which has grown the number of columns from 27 to 29 for unicast_prefix. I also have added SR EPE bgp-ls support, which has grown the number of columns for ls_link. If you upgrade to the latest collector/openbmpd, you should be fine.

http://openbmp.org/#!docs/MESSAGE_BUS_API.md defines the new columns added for schema version 1.1 and 1.2. I put the change/diff at the top of the doc.

The latest (openbmpd -v) is 0.13.0-pre7

Let me know how it goes after you upgrade to the latest.

I can update the java-api to be backwards compatible as version is conveyed via headers. That might be a good idea. :)

vsrahul82 commented 8 years ago

I could not find "collector/openbmpd" . Please provide a link .

TimEvens commented 8 years ago

It's this repo. openbmpd is what produces those messages in Kafka. How are you getting those now?

vsrahul82 commented 8 years ago

I tried searching in github "openbmpd" but could not find any repo of that name. I am consuming messages from the topic "openbmp.parsed.ls_link". I have access to only this topic. I dont have control of the messages that are getting sent to this topic. Please provide more details on what i need to do to parse messages correctly.

TimEvens commented 8 years ago

ok... so someone else is running the collector, which is producing the messages. I've just updated openbmp-java-api-message to support backwards compatibility.

Can you do:

git pull
mvn clean install
cd examples/spring-cloud-stream-sink
mvn spring-boot:run

You shouldn't get that error anymore as parsing will automatically adapt to the message version.

NOTE:

Can you check with the person that is running the collector and see if they can upgrade to the latest openbmpd? The updated collector has enhancements for bgp-ls, which includes TE and EPE.

vsrahul82 commented 8 years ago

Still getting below exception when i try parsing messages coming on "openbmp.parsed.ls_link".

The number of columns to be processed (38) must match the number of CellProcessors (43): check that the number of CellProcessors you have defined matches the expected number of columns being read/written context={lineNo=1, rowNo=1, columnNo=1, rowSource=[add, 972, 0c278257fc963476ffa3be3ff2428869, 6ff067cf2f5378e9aac998b4163cbbe8, 393e94c52f0498b3d20f8f9482258fa0, 2001:558:fc10:ff3a::182, 694fd7100e011a08d16498ceb0eabc09, 2001:558:0:f412::1, 7922, 2016-07-27 16:00:30.410703, 0680.8600.1024.0000, 2001:558:0:fe80::24, 0, 44560112, null, null, IS-IS_L2, 7922, 0, 0, 2001:558:0:f412::1, 3, 0, 0, ::, ::, 50, 0, 0, 0, null, 0, null, null, null, null, 7a1020a6407314dd160a455cee13510f, e2102cd0de42d7a120bbae58732931e2]} at org.supercsv.util.Util.executeCellProcessors(Util.java:78) at org.supercsv.io.AbstractCsvReader.executeProcessors(AbstractCsvReader.java:203) at org.supercsv.io.CsvMapReader.read(CsvMapReader.java:99) at org.openbmp.api.parsed.message.Base.parse(Base.java:99) at org.openbmp.api.parsed.message.LsLink.(LsLink.java:72) at com.comcast.consumer.CloudStreamConsumer.logger(CloudStreamConsumer.java:41) at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:113) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:102) at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49) at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:347) at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131) at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330) at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:166) at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:317) at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:155) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:93) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:516) at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:513) at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

PLease advice.

vsrahul82 commented 8 years ago

Please revert on this.

TimEvens commented 8 years ago

what did you find?

vsrahul82 commented 8 years ago

Well getting the concerned folks to upgrade to the latest openbmpd is a challenge. They are very unco-operative. I used the latest code checked in for backward compatibilty and still getting the same supercsv exception for ls_link. What could be the issue? Please advice.

TimEvens commented 8 years ago

From your trace, it looks like you are still not using the latest/updated updated openbmp-java-api-message.

at org.openbmp.api.parsed.message.LsLink.(LsLink.java:72)

Line 72 is not the right place for that exception in the latest commit. Did you do a mvn clean install after you git pull the latest openbmp-java-api-message?

I reconfirmed that it's working, providing you have the right code installed/used. :)

TimEvens commented 8 years ago

Can you write me directly at tim@openbmp.org or tievens@cisco.com? I'm working with you guys already and I can provide a bit more details over email.

vsrahul82 commented 8 years ago

Sure

vsrahul82 commented 8 years ago

It is working now. Because of some caching some where the the old openbmp-java-api-message. was getting run. Thank you Tim for your time. Appreciate it very much.