eclipse / kapua

Eclipse Public License 2.0
224 stars 160 forks source link

How to find the topic rule and message formate in the broker-core? #755

Closed StevenWash closed 7 years ago

StevenWash commented 7 years ago

Hi, I am so sorry to bother you again.When I want to figure out how the broker-core store the topic message with datastore to the elasticsearch,but I cann`t know clearly that what the topic rules are and what the message formate is? But,I have learned that the console use the datastore to read data from the elasticsearch ,and the topic maybe like 'channel_id/client_id',and the channel_id maybe the scopeId. So,above all ,excuse me ,could you give me some advice to figure out the topic rules and message formate in the broker-core? Thank you very much!

ctron commented 7 years ago

I am not sure I do understand what you are trying to do.

In a nutshell, the broker takes messages from the topic structure <account-name>/<client-id> and stores them in the elasticsearch store via the internal DataStore API. After this topic prefix you also have the application ID and the application's topic. Which gives, as a complete topic:

<account-name>/<client-id>/<application-id>/topic/to/my/data.

The application ID is something which is chosen by the developer of the application.

StevenWash commented 7 years ago

Hi, Ye,you have understand my question.And I just want to know how can I find the design of the topic structure in the code.Because maybe I need to change the topic structure to meet our needs.

And,another question:when I send a message with a topic , wether the message has a fixed formate like a josn like that: { "1":"##1", "2":"##2" } or the message just a simple string type?

riccardomodanese commented 7 years ago

I'm not sure to fully understand your question. The topic structure is handled into 2 different layers with different scopes. The security layer (at broker level) enforces the ACL. The translator layer (translator modules) translates the outcoming messages into an internal representation (KapuaMessage). So the topics translation are done at the translators level. The current translators in Kapua translate messeges from Kura format to Kapua and viceversa.

About your second question the message you send to the broker should conform to the format accepted by the translator so, with the default Kapua implementation, the Kura payload format.

StevenWash commented 7 years ago

Excuse me, I have try to use mqtt-client to connect activemq,and I have add the broker-core into the activemq.I use the kapua-broker to login in,and it can connect to the activemq correctly.But when I try to send message to the topic /,the activemq will occured such problems: 1 And when I subscribe the topic /,it can also receive the message .

So what the problem is?I think it is the error to store the message into the elasticsearch ,it that right? And you can give me some addvices to deal this problems?

this is the topic and message that I send: image

this is the topic that I subscribe: image

Thank you very much ! @riccardomodanese @ctron

riccardomodanese commented 7 years ago

In order to use Kapua inside ActiveMQ few jars should be integrated and also some configuration files should be modified. So if you want to replace our deployment with your own deployment please take our current deployment as a reference. About the message it seems that your message is not compliant with the Kura protobuf encoding. please refer to: https://github.com/eclipse/kura/blob/98349bfdf5eba99a3c8f1a5816e86719aca5711f/kura/org.eclipse.kura.core.cloud/src/main/java/org/eclipse/kura/core/cloud/CloudPayloadProtoBufDecoderImpl.java https://github.com/eclipse/kura/blob/6a8a69344b1dc1fc5f00c7c4cd62fc9260a70806/kura/org.eclipse.kura.core.cloud/src/main/java/org/eclipse/kura/core/cloud/CloudPayloadProtoBufEncoderImpl.java

StevenWash commented 7 years ago

Hi @riccardomodanese , I just add the broker-core into the activemq according to your current depolyment, and I have change some configurations by taking your deployment as a reference.

I find that the broker-core use some convert methods such ad 'JmsUtil.convertToCamelKapuaMessage()' to convert the message,and the exception was occured in line 171 image The method was in org.eclipse.kapua.service.device.call.message.kura.data.KuraDataPayload.

I also learn that my message is not compliant with the Kura protobuf encoding.So should I encode the message according the Kura protobuf encoding before the method readFromByteArray in org.eclipse.kapua.service.device.call.message.kura.data.KuraDataPayload?

Thank you very much!

riccardomodanese commented 7 years ago

yes, to avoid incurring in that exception your MQTT payload should be encoded according to Kura Protobuf. The current translators support the Kura protobuf encoding (but translators are fully configurable and extendible so, if you need, you can implement your translator chain).

In other words, the fastest way is to have a Kura compliant message and send it as MQTT payload.

@ctron can point you to some example in the Kura-simulator module or you can take a look to the link below that may be helpful

https://github.com/eurotech/edc-examples/blob/master/edc-java-client/src/main/java/com/eurotech/cloud/examples/EdcJavaClient.java

StevenWash commented 7 years ago

Hi @riccardomodanese , you mean the current translators have supported the Kura protobuf encoding, so if I send a message ,the message will be encoded by the current translator,is that right?

StevenWash commented 7 years ago

Hi @riccardomodanese , I find there exist decode and encode method in the kapua.And when I send a message ,these translator will call the method 'readFromByteArray' to decode the message.But I don`t find that where the message that I send was decoded by 'toByteArray'? So where should I decode the message before it is encoded? Thank you very much!

riccardomodanese commented 7 years ago

sorry there was a bit of misunderstanding. What I meant is that you should send a Kura Protobuf encoded message since the broker is expecting that. The broker will decode the message and store it. The broker side encoding is done only when the message is sent by the broker to a device.

StevenWash commented 7 years ago

Hi @riccardomodanese, Ok,I have got your means .Thank you very much !

But I have got another problems ,I have encode the message ,then I send this message ,after that ,it have stored the message to the elsastisearch .But the line 173 in the code which is located in org.eclipse.kapua.service.datastore.internal.MessageStoreFacade.store() will occured new exception: image The new exception like that: image And you know what cause this problem?Any advices? If you can help me ,I will appreciate it a lot!

Thank you very much!

riccardomodanese commented 7 years ago

May be you miss the cause part on the stacktrace (I don't see any Kapua class involved). Can you provide more detailed information?

StevenWash commented 7 years ago

Hi @riccardomodanese , I try to declare my problems clearly! First,I hava encode the message that I send to the activemq.Then ,the message will be decode and be stored to the elasticsearch by the datestore.

Second,the problem was occurred during the period that the message was stored.First of all,it will call the method named 'processMessage' in 'org.eclipse.kapua.broker.core.listener.DataStorageMessageProcessor'. In the method named 'processMessage' ,the code ' messageStoreService.store(message.getMessage());' will occurred exception.

Third,I try to look up the 'messageStoreService.store()',so I enter the method 'store' in the 'MessageStoreServiceImpl'.And then ,I located the exception was in 'messageStoreFacade.store(message);',so I enter the 'messageStoreFacade.store()' in 'MessageStoreFacade'.

Last,I find the exception is the line 173 'mediator.onAfterMessageStore(messageInfo, messageToStore);' image Then I further find the code line 152 in the method 'onAfterMessageStore' in the 'DatastoreMediator' was cause the exception : image

And this is the exception in the activemq: image

But ,the message that I send still be sent correctly to the subscribor,and the message can also be stored in the elasticsearch.But I dont know what cause the problems and how to fix it.

Do I clarify my problems? Hope your help!Thank you so much!

riccardomodanese commented 7 years ago

Hi, your use case it's clear but from the stacktrace is not simple to debug the problem. Can you provide a code snippet to reproduce the issue (in your git repo)?

P.S. I'm assuming you are testing the latest develop code, right? Are you using the ES transport or rest client?

StevenWash commented 7 years ago

Hi @riccardomodanese , Actually ,my code is the same as your git repo,but I use the release-0.2.x branch.And the exception is related to 'broker-core',and I almost did not make any changes.The problem was to store the message to the elasticsearch.

Can you help me ? Thank you very much!

riccardomodanese commented 7 years ago

To better understand the issue I need your code (or the code that creates and sends the message at least) to exactly reproduce the message you are sending. Can you provide me in some repo?

StevenWash commented 7 years ago

Hi @riccardomodanese , Here are my broker-core code :https://github.com/StevenWash/kapua/tree/dev/huaxin/broker-core/src/main/java/org/eclipse/kapua/broker/core.Maybe a little messy.

And I decode the message in 'convertToCamelKapuaMessage':https://github.com/StevenWash/kapua/blob/dev/huaxin/broker-core/src/main/java/org/eclipse/kapua/broker/core/message/JmsUtil.java

And this is the processMessage and the exception was occured in line 77: https://github.com/StevenWash/kapua/blob/dev/huaxin/broker-core/src/main/java/org/eclipse/kapua/broker/core/listener/DataStorageMessageProcessor.java

riccardomodanese commented 7 years ago

Hi, from what I saw, this https://github.com/StevenWash/kapua/blob/dev/huaxin/broker-core/src/main/java/org/eclipse/kapua/broker/core/message/JmsUtil.java#L146 (1) is not the correct place to do this translation. (It should be done externally by your client code). In other word, your client should send to the broker a Kura protobuf compliant message. Just to clarify, the method you modified is responsible to do a 2 way translation (see https://github.com/eclipse/kapua/issues/314#issuecomment-281650782 for an explanation) So your translation is going to put the raw JMS message body (coming from the underlying broker layer) as a Kura message payload, then the broker will try to decode it as JMS message and then as a Kura message but the source message was messy. Please move the code (1) to the external client and try again to store the message.

StevenWash commented 7 years ago

Hi @riccardomodanese , Ok,thank you very much! I just want to test if the message can be stored to the elasticsearch correctly,so I try to modify the method like that.I did not realize that it will cause an exception. I will to do fix it as you advise.Thank you for your help sincerely!

riccardomodanese commented 7 years ago

You are welcome! If the incoming message is correct (so a Kura protobuf message), it should be stored to ES and you can retrieve it by the datastore find call.

StevenWash commented 7 years ago

Hi @riccardomodanese , Ok,Thank you ! But I still have a question.Why I modified the method like that will cause that exception?What the reason is?

riccardomodanese commented 7 years ago

Honestly I cannot give you reply on that whithout debugging your modified code (and honestly I cannot say you if and when I'll able to find time to do that :) ) Btw, according to the changes I suggested you, was you able to solve the issue?

StevenWash commented 7 years ago

Hi @riccardomodanese , Ok,I have got your means.But probably because of our different time zone, I do not have time to try the way you just said, I will try tomorrow. Thank you again!

StevenWash commented 7 years ago

Hi @riccardomodanese , I am sorry to bother you again.I use angular2 to write a web mqtt client.And I try to find a protobuf js lib.And I find the lib in https://github.com/google/protobuf/tree/master/js

But sorry to say I maybe haven`t figure out how to use the protobuf js lib.So I am not able to solve the issue!

riccardomodanese commented 7 years ago

Hi, yes you can use the library you found, but you should provide it the file that specifies the Kura payload format. please refer to https://github.com/eclipse/kura/blob/develop/kura/org.eclipse.kura.core.cloud/src/main/protobuf/kurapayload.proto

riccardomodanese commented 7 years ago

Hi @StevenWash, I'm assuming you solved the issue so I'm closing it. Please feel free to open a new issue if you have other questions/issues.

Regards