eclipse-hono / hono

Eclipse Hono™ Project
https://eclipse.dev/hono
Eclipse Public License 2.0
449 stars 137 forks source link

how to assign parition id in dispatch router #2707

Closed iamrsaravana closed 2 years ago

iamrsaravana commented 3 years ago

I am trying to connect hono with kafka using amqp source connector as mentioned it below

https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-amqp-kafka-source-connector.html

but i am not able to give partition id while pushing data into kafka.

AMQP connector:

apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: camelamqpsourceconnector-excavator labels: strimzi.io/cluster: my-connect-cluster-excavator spec: class: org.apache.camel.kafkaconnector.amqp.CamelAmqpSourceConnector tasksMax: 1

topics: mytopic

config: camel.component.amqp.includeAmqpAnnotations: true camel.component.amqp.connectionFactory: '#class:org.apache.qpid.jms.JmsConnectionFactory' camel.component.amqp.connectionFactory.remoteURI: amqp://10.97.217.229:15672 camel.component.amqp.username: consumer@HONO camel.component.amqp.password: verysecret camel.component.amqp.testConnectionOnStartup: true camel.source.path.destinationType: queue camel.source.path.destinationName: telemetry/TECH_TENANT topics: my-topic-excavator

This is Hono output after i pushed the data: : [{'RTC': '2021-06-07 13:04:56.184441', 'Lat': 16.229851845350204, 'Long': 14.299288860939594, 'M1': 2, 'M2': 2, 'Vr': 105.6324171318401, 'Vy': 104.12052426724877, 'Vb': 98.12062632941522, 'Lr': 103.9471682038583, 'Ly': 97.67147621624122, 'Lb': 100.36805587085357, 'Freq': 14.842135582530009, 'KVA': 16.938037923237218, 'PF': 12.135827625867178, 'KW': 13.253614235906426, 'KWH': 14.40368159762895, 'FwRPM': 16.477093643872117, 'RDT': 14.03796169523524, 'Pr': 18.68899627382937, 'Tmp': 17.753152890844945, 'TW': 16.219815509590156, 'Spd': 16.6851165432929, 'Flow': 17.89677380804127, 'Err': 15.390524409532468}]} 13:04:56.186 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - ... with application properties: {orig_adapter=hono-mqtt, qos=1, device_id=Device139, orig_address=telemetry} 13:04:56.226 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - received telemetry message [device: Device117, content-type: application/octet-stream]: {'Header': [{'ID': 'Device117', 'Type': 'PCR', 'Mvmt': 'rotary', 'Fuel': 'petrol', 'SNo': 'G000123', 'IMEI': 'GNPS235765443', 'ICCID': 'IC200009', 'CCode': 'LnT', 'Ver': 'V01.02.03', 'Volt': 43.44, 'Batt': 78, 'GSM': 55, 'GPS': 94}], 'Data': [{'RTC': '2021-06-07 13:04:56.224198', 'Lat': 14.200613520221506, 'Long': 16.718119207527035, 'M1': 3, 'M2': 4, 'Vr': 102.7726065757604, 'Vy': 100.21367820074356, 'Vb': 99.62674245521268, 'Lr': 104.35108962552408, 'Ly': 106.22230469722051, 'Lb': 102.3081528102651, 'Freq': 18.51147889029693, 'KVA': 19.14883271226492, 'PF': 15.119708361088652, 'KW': 13.78428755456159, 'KWH': 16.54308364963313, 'FwRPM': 14.304083803713866, 'RDT': 15.86268764413181, 'Pr': 12.808742206456916, 'Tmp': 12.197383232205356, 'TW': 18.694375868185514, 'Spd': 19.586982178868997, 'Flow': 14.501278749367838, 'Err': 13.322722342896263}]} 13:04:56.226 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - ... with application properties: {orig_adapter=hono-mqtt, qos=1, device_id=Device117, orig_address=telemetry} 13:04:56.231 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - received telemetry message [device: Device15, content-type: application/octet-stream]: {'Header': [{'ID': 'Device15', 'Type': 'PCR', 'Mvmt': 'rotary', 'Fuel': 'petrol', 'SNo': 'G000123', 'IMEI': 'GNPS235765443', 'ICCID': 'IC200009', 'CCode': 'LnT', 'Ver': 'V01.02.03', 'Volt': 43.44, 'Batt': 78, 'GSM': 55, 'GPS': 94}], 'Data': [{'RTC': '2021-06-07 13:04:56.229054', 'Lat': 14.213487594481148, 'Long': 15.656839607765427, 'M1': 4, 'M2': 4, 'Vr': 101.64447195043797, 'Vy': 100.74106655966625, 'Vb': 96.9494029939884, 'Lr': 102.02008574324184, 'Ly': 101.40364015408419, 'Lb': 102.55915323957592, 'Freq': 13.254616130257865, 'KVA': 16.0898748772033, 'PF': 19.10140052742132, 'KW': 19.011386889350256, 'KWH': 18.65701863122188, 'FwRPM': 12.258014796954857, 'RDT': 15.791687473750056, 'Pr': 12.635892644467365, 'Tmp': 16.458395825935096, 'TW': 16.65175324696994, 'Spd': 16.739598393953074, 'Flow': 13.790263523764974, 'Err': 17.62444544168555}]} 13:04:56.231 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - ... with application properties: {orig_adapter=hono-mqtt, qos=1, device_id=Device15, orig_address=telemetry} 13:04:56.256 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - received telemetry message [device: Device103, content-type: application/octet-stream]: {'Header': [{'ID': 'Device103', 'Type': 'PCR', 'Mvmt': 'rotary', 'Fuel': 'petrol', 'SNo': 'G000123', 'IMEI': 'GNPS235765443', 'ICCID': 'IC200009', 'CCode': 'LnT', 'Ver': 'V01.02.03', 'Volt': 43.44, 'Batt': 78, 'GSM': 55, 'GPS': 94}], 'Data': [{'RTC': '2021-06-07 13:04:56.255146', 'Lat': 18.30207585711315, 'Long': 15.445108953362212, 'M1': 4, 'M2': 1, 'Vr': 104.1702201608763, 'Vy': 100.7138992213212, 'Vb': 107.61993617416675, 'Lr': 102.460382739623, 'Ly': 101.75007264737627, 'Lb': 99.47552998243789, 'Freq': 16.575099392793483, 'KVA': 17.483272935110133, 'PF': 13.848004668468029, 'KW': 15.241877145521133, 'KWH': 13.1603953035658, 'FwRPM': 17.618491973546433, 'RDT': 13.242015786849421, 'Pr': 15.879335974739913, 'Tmp': 12.974281145078216, 'TW': 14.575089980896196, 'Spd': 14.502712612811951, 'Flow': 13.743735222187617, 'Err': 17.205125618085404}]} 13:04:56.256 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - ... with application properties: {orig_adapter=hono-mqtt, qos=1, device_id=Device103, orig_address=telemetry} 13:04:56.652 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - received telemetry message [device: Device29, content-type: application/octet-stream]: {'Header': [{'ID': 'Device29', 'Type': 'PCR', 'Mvmt': 'rotary', 'Fuel': 'petrol', 'SNo': 'G000123', 'IMEI': 'GNPS235765443', 'ICCID': 'IC200009', 'CCode': 'LnT', 'Ver': 'V01.02.03', 'Volt': 43.44, 'Batt': 78, 'GSM': 55, 'GPS': 94}], 'Data': [{'RTC': '2021-06-07 13:04:56.651267', 'Lat': 17.07812406089394, 'Long': 12.634635053184754, 'M1': 3, 'M2': 4, 'Vr': 106.29190025671028, 'Vy': 107.68644918330354, 'Vb': 105.84608542410221, 'Lr': 104.36950964581112, 'Ly': 104.1854258691839, 'Lb': 104.78188259698555, 'Freq': 13.621816476357521, 'KVA': 17.345637931684273, 'PF': 16.679473561974238, 'KW': 17.217610280691478, 'KWH': 19.358898193323075, 'FwRPM': 13.003969532979907, 'RDT': 12.688861115550967, 'Pr': 13.328807914255218, 'Tmp': 15.215659635641002, 'TW': 18.44568800202448, 'Spd': 16.101795620919, 'Flow': 15.529497579953963, 'Err': 17.274688506919443}]} 13:04:56.653 [vert.x-eventloop-thread-0] INFO org.eclipse.hono.cli.app.Receiver - ... with application properties: {orig_adapter=hono-mqtt, qos=1, device_id=Device29, orig_address=telemetry}

sophokles73 commented 3 years ago

Can you explain what you are trying to achieve? Are you trying to forward data consumed from Hono's north bound Telemetry and/or Event API to a kafka cluster?

iamrsaravana commented 3 years ago

Yes. we are trying to send event api to Kafka Cluster. we are using 1.8 version, but i could not find how to send tenent's data into Kafka topics directly.

Could you share document or steps to sending data directly to kafka topics.

sophokles73 commented 3 years ago

I still do not see what you are trying to do in the end. Do you want to

  1. use the AMQP 1.0 Messaging Network and then forward data received from it to kafka, or
  2. use Kafka instead of the AMQP 1.0 Messaging Network within Hono?
iamrsaravana commented 3 years ago

planning to use Kafka instead of AMQP 1.0

sophokles73 commented 3 years ago

Kafka support is still considered experimental in Hono. You might want to try the Hono Helm chart which supports setting up a Hono installation using a single node Kafka cluster. In addition, Hono's Kafka Client Admin Guide describes the configuration properties that you need to set in order to connect Hono's protocol adapters and application clients to the Kafka broker.

b-abel commented 3 years ago

@iamrsaravana I need much more explanation of what you are trying to do, which steps did you take, and which error you are facing to be able to help you. Some background that might help you to better understand the current state: Using Hono with Kafka-based messaging is still under development and considered a preview. An option to deploy Hono preconfigured for Kafka-based messaging together with a minimal example Kafka cluster has just recently been added to the Helm chart with https://github.com/eclipse/packages/pull/237. We are currently in the process to write proper documentation but it might take a bit before everything is finalized and works out of the box. If interested, you can track some of the efforts for example in https://github.com/eclipse/hono/pull/2682 and https://github.com/eclipse/hono/issues/2683.