eclipse / kura

Eclipse Kura™ is a versatile framework to supercharge your edge devices, streamlining the process of configuring your gateway, connecting sensors, and IoT devices to seamlessly collect, process, and send data to the cloud.
https://eclipse.dev/kura/
Eclipse Public License 2.0
494 stars 305 forks source link

Documentation for subscribing data to in-built-MQTT Server and sending data to Kapua #2409

Closed bitvijays closed 4 years ago

bitvijays commented 5 years ago

o/ Everyone. Hope you are doing well Thank you for the awesome work!

Is there any documentation (to create a program) which could help in the below :

1a) Connect/ subscribe to the in-built MQTT Server (or any MQTT Server) from Kura. 1b) Subscribe to the specific topics. 1c) Convert the messages to Kura-payload 1d) Send the kura-payload to Kapua (Probably, Example Publisher can be referred here)

Also, as per my understanding, "org.eclipse.kura.cloud.CloudService" is used to connect to the Cloud Platform? and "org.elclipse.kura.cloudconnection.eclipseiot.mqtt.ConnectionManager" is used for connecting to MQTT Servers?

I have a simple application: Let's say A kit which send a data to a MQTT Server in some format.

PS: Not a Java developer and quite new to Kura; Have done the Hello World example and have setup Kura-Development example

I have tried the below: 2a) Tried to directly use paho mqtt client in java. However, it seems it is suggested to use CloudConnections? ; CloudEndpoint? 2b) Tried to modify ExamplePublisher "Cloud connection" from Kapua to MQTT Server. However, that didn't work. 2c) Did looked at the open/ closed issue to figure out any solutions

Plus: Is there any gitter/ irc/ slack channel for faster contribution/ doubts clearance?

Also, I am happy to write a document the learning (If someone is happy to guide me) and contribute. Thank You :)

MMaiero commented 5 years ago

Hi, thank you for trying out Kura. "org.eclipse.kura.cloud.CloudService" and "org.elclipse.kura.cloudconnection.eclipseiot.mqtt.ConnectionManager" are both two factories that can be used to connect to Mqtt brokers. The difference between them is that they use different mqtt namespaces.

To connect to the internal broker you have to instantiate a new cloud connection for example a new instance of "org.eclipse.kura.cloud.CloudService" and point to mqtt://127.0.0.1:1883. To get messages from that connection, you'll have to instantiate a new subscriber. Then you can use, for example, wire components to process the message and publish it to another cloud service.

Regarding the questions, we have our forum and a mailing list.

As you know this project is open source so, any help is more than welcome to grow the project.

bitvijays commented 5 years ago

@MMaiero Hope you are doing well. Thank you for your support. So, I was able to connect to a local MQTT Server and send a sample message to the Kapua platform. I have few more questions, would appreciate if you could answer them

https://github.com/eclipse/kapua/wiki/K-Payload-JSON-Format

a) Does the format sent from the Sensor (constrained device) always have to be in the above format? If so, all constrained device have to follow a standard which is not the case. If no, How we can have accept the message other than the above format? Because as of now, if the format specification is not met, application throws Parse Exception?

b) Does the topic format always have to be like "account_name/client_id/app_id/resource_id"? How can we receive MQTT data from different topics? Considering constrained devices don't have the buffer to set a large topic? and just send like /data/random.identifer(ABCD)/metrics?

c) Is there a way to get the topic published? in Java code? For example I want one application to subscribe on multiple devices as I have multiple devices (Smart-Citizen Kit/ PySense/ PyTrack) sending data.

Thank you for your support. Also, have created a document regarding learnings

https://github.com/eclipse/kura/wiki/Subscribing-data-to-in-built-MQTT-Server-and-sending-data-to-Cloud-Platform-(Kapua)

MMaiero commented 5 years ago

Hi, related to a), you can create a specific cloud service or, even better, define a specific parser for your devices. We tried to design a general approach that is capable to use different message types. The only constraint is that, to enable processing, at the end the message managed by Kura applications has to be a Kura Message.

Apart from the Account_name/client_id, You can craft the topic as you want and, to manage set of devices, define generic subscriptions. But those constraints are for the default cloud service.

If you want to be more generic, you can create your specific Cloud Endpoint and it will follow your own rules.

Best regards, Matteo

bitvijays commented 5 years ago

Hey @MMaiero Thank you for your support. Quick Questions:

a) Is there a way to get the topic published? in Java code?

Example: sensor send the MQTT data on topic /account_name/client_id/app_id and the last topic is changing like

/account_name/client_id/app_id/SCK -- Smart Citizen Kit /account_name/client_id/app_id/PySense - Data from PySense

Currently, whenever the data arrives, onMessageArrived function provides the Kura Payload. If there's a way, to know the topic like above, we can do specific work on it. The other way maybe is to include the "Source":"SCK" in the metrics field.

b) Is there any documentation on creating "define a specific parser and using it" or how we can use "Cloud connection" to get specific format?

c) By specific Cloud Endpoint, you meant utilizing "org.elclipse.kura.cloudconnection.eclipseiot.mqtt.ConnectionManager" and this https://eclipse.github.io/kura/cloud-api/4-stack-dev-guide.html

The pseudo code for my program is currently like Create a Java program

1) onMessageArrived, check the topic 2) If "/account_name/client_id/app_id/SCK" send it to parser_SCK If "/account_name/client_id/app_id/PySense" send it to parser_PySense

3) parser_SCK/PySense, would convert it to Kura Payload format and send it to Kapua. If anything is already in Kura payload, just send it to Kapua.

If this doesn't work, I will have to write a python code for subscribing to custom topics and then send to Kura in Kura payload. (Want to avoid this, want to have everything running on Kura).

Thanks again for your help and patience with (probably) stupid questions.

MMaiero commented 5 years ago

Hi, Regarding a), it depends your setup. But in general you should be able to get the publishing topic. Having a look here (https://github.com/eclipse/kura/blob/develop/kura/org.eclipse.kura.core.cloud/src/main/java/org/eclipse/kura/core/cloud/CloudServiceImpl.java#L529) you’ll see that the dispatching is done based on the topic. If working with subscribers, the subscriber will get in KuraMessage the application topic https://github.com/eclipse/kura/blob/develop/kura/org.eclipse.kura.core.cloud/src/main/java/org/eclipse/kura/core/cloud/CloudServiceImpl.java#L603.

For b), there is no specific documentation, but I’ll suggest you to have a look at the existing cloud connection providers: https://github.com/eclipse/kura/blob/develop/kura/org.eclipse.kura.core.cloud https://github.com/eclipse/kura/tree/develop/kura/org.eclipse.kura.cloudconnection.eclipseiot.mqtt.provider

And to the existing marshalers/unmarshallers like: https://github.com/eclipse/kura/blob/develop/kura/org.eclipse.kura.json.marshaller.unmarshaller.provider/src/main/java/org/eclipse/kura/internal/json/marshaller/unmarshaller/JsonMarshallUnmarshallImpl.java

For c) currently the parsing is done at the level of cloud connection: this means that downstream applications have just to deal with KuraMessages and not have to deal with low level specificities.

Best regards, Matteo

-- Matteo Maiero Senior Software Engineer

direct: +39 0433 485 902 email: matteo.maiero@eurotech.commailto:matteo.maiero@eurotech.com

EUROTECH Imagine.Build.Succeed. HEADQUARTERS EUROTECH S.p.A. – Via Fratelli Solari, 3/a | 33020 Amaro UD | Italy | Tel. +39. 0433 485411 | Fax +39. 0433 485499 | Cap. Soc. 8.878.946,00 € I.V. | P.IVA / C.F.: IT 01791330309 | REA Udine 196115 www.eurotech.comhttp://www.eurotech.com/


Il presente messaggio ed ogni suo allegato sono da intendersi inviati esclusivamente agli effettivi destinatari e potrebbero essere soggetti a restrizioni legali. Se avete ricevuto questo messaggio per errore vi invitiamo a darne immediata notifica al mittente e cancellarlo dal vostro sistema. Qualsiasi altro uso di questo messaggio da parte vostra e' strettamente proibito.

This e-mail, and any file attached to it, is meant only for the intended recipient of the transmission and may be a communication privileged by law. If you have received it in error, please notify the sender immediately and delete the original from your system. Any other use of this e-mail by you is strictly forbidden.

On 1 Apr 2019, at 14:08, Vijay Kumar notifications@github.com<mailto:notifications@github.com> wrote:

Hey @MMaierohttps://github.com/MMaiero Thank you for your support. Quick Questions:

a) Is there a way to get the topic published? in Java code?

Example: sensor send the MQTT data on topic /account_name/client_id/app_id and the last topic is changing like

/account_name/client_id/app_id/SCK -- Smart Citizen Kit /account_name/client_id/app_id/PySense - Data from PySense

Currently, whenever the data arrives, onMessageArrived function provides the Kura Payload. If there's a way, to know the topic like above, we can do specific work on it. The other way maybe is to include the "Source":"SCK" in the metrics field.

b) Is there any documentation on creating "define a specific parser and using it" or how we can use "Cloud connection" to get specific format?

c) By specific Cloud Endpoint, you meant utilizing "org.elclipse.kura.cloudconnection.eclipseiot.mqtt.ConnectionManager" and this https://eclipse.github.io/kura/cloud-api/4-stack-dev-guide.html

The pseudo code for my program is currently like Create a Java program

  1. onMessageArrived, check the topic

  2. If "/account_name/client_id/app_id/SCK" send it to parser_SCK If "/account_name/client_id/app_id/PySense" send it to parser_PySense

  3. parser_SCK/PySense, would convert it to Kura Payload format and send it to Kapua. If anything is already in Kura payload, just send it to Kapua.

If this doesn't work, I will have to write a python code for subscribing to custom topics and then send to Kura in Kura payload. (Want to avoid this, want to have everything running on Kura).

Thanks again for your help and patience with (probably) stupid questions.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHubhttps://github.com/eclipse/kura/issues/2409#issuecomment-478552716, or mute the threadhttps://github.com/notifications/unsubscribe-auth/AJ9s_nxGQMdrUfgYpJx71Z8IgEywbEJfks5vcfa1gaJpZM4cO04c.

MMaiero commented 4 years ago

@bitvijays Can this issue be closed? Thanks

pshashipreetham commented 2 years ago

Hi @MMaiero , following python code is used to to get the data from Kura which is getting published on the data/metrics, able to connect to broker but not able to receive the message.

@bitvijays can you help me out here ? followed this link

import paho.mqtt.client as mqttClient
import time

def on_connect(client, userdata, flags, rc):
    if rc == 0:

        print("Connected to broker")

        global Connected  # Use global variable
        Connected = True  # Signal connection

    else:

        print("Connection failed")

def on_message(client, userdata, message):
    print("Message received: " + message.payload)

Connected = False  # global variable for the state of the connection

broker_address = "127.0.0.1"  # Broker address
port = 1883  # Broker port
user = "kapua-broker"  # Connection username
password = "kapua-password"  # Connection password

client = mqttClient.Client("instance1")  # create new instance
client.username_pw_set(user, password=password)  # set username and password
client.on_connect = on_connect  # attach function to callback
client.on_message = on_message  # attach function to callback

client.connect(broker_address, port=port)  # connect to broker

client.loop_start()  # start the loop

while Connected != True:  # Wait for connection
    time.sleep(0.1)

client.subscribe("Account123/instance1/EXAMPLE_PUBLISHER/data/metrics")

try:
    while True:
        time.sleep(1)

except KeyboardInterrupt:
    print
    "exiting"
    client.disconnect()
    client.loop_stop()