eclipse-ditto / ditto

Eclipse Ditto™: Digital Twin framework of Eclipse IoT - main repository
https://eclipse.dev/ditto/
Eclipse Public License 2.0
694 stars 230 forks source link

Apache Kafka Integration with Eclipse Ditto is not working . #1515

Closed rajkraleti closed 2 years ago

rajkraleti commented 2 years ago

I have created a thing in using the Hello world example given here

https://www.eclipse.org/ditto/intro-hello-world.html

then tried to create a thing using the Apache Kafka 2.x example given , I have established connection with the Kafka installed in my local and established connection to eclipse Ditto .

The connection was successful how ever post dropping messages in the Kafka topic - Things are not getting updated or created or modified .

there was no log / no information about what happened with the message that was dropped .

FYR : I followed the message format shown in the ditto protocol example

image

Can we have a demonstration of Apache Kafka Source & target side integration for Eclipse Ditto or its a known issue on which team is still working ?

DerSchwilk commented 2 years ago

Hey @rajkraleti,

you can have a look at the logs of the connection via https://www.eclipse.org/ditto/connectivity-manage-connections.html#retrieve-connection-logs. Be sure to enable them prior with https://www.eclipse.org/ditto/connectivity-manage-connections.html#enable-connection-logs.

If your connection receives the Kafka messages you should see reasons for dropping/ not applying the messages in your connection logs. Thus far we don't know of any issues with the Kafka connectivity.

rajkraleti commented 2 years ago

Hi Thank @DerSchwilk This only shows the conenction status - which was showing only connection status - and it was succesful

Hoe ever there was no update in the thing . This looks like a bug to me.

DerSchwilk commented 2 years ago

@rajkraleti I think you're confusing the connection status endpoint (https://www.eclipse.org/ditto/connectivity-manage-connections.html#retrieve-connection-status) wit the connection log endpoint.

Via the connection log endpoint you should also be able to see the inbound messages of the connection. If that is not the case, your connection is not receiving any messages from your Kafka broker.

rajkraleti commented 2 years ago

Hi I used the URL connections/ad573a5c-94f4-48e3-bd8f-2ce9dbc7096e/logs - As per the link you have supplied

and It gave some content like this

{ "connectionId": "ad573a5c-94f4-48e3-bd8f-2ce9dbc7096e", "connectionLogs": [ { "correlationId": "", "timestamp": "2022-10-20T10:40:16.571876059Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:40:16.567835375Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:40:41.646898837Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:40:41.644587409Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:41:06.713364817Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:41:06.713066544Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:41:31.824751284Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:41:31.822858624Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:41:56.895511664Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:41:56.894799247Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:42:21.988421023Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:42:21.987964576Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:42:27.000635988Z", "category": "connection", "type": "other", "level": "success", "message": "Trying to reconnect" }, { "correlationId": "", "timestamp": "2022-10-20T10:42:33.152119719Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" }, { "correlationId": "", "timestamp": "2022-10-20T10:42:47.214996362Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:42:47.214854282Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:42:52.231805429Z", "category": "connection", "type": "other", "level": "success", "message": "Trying to reconnect" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:00.155319454Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:12.274765828Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:43:12.274378950Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:17.293342073Z", "category": "connection", "type": "other", "level": "success", "message": "Trying to reconnect" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:24.178775087Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:37.366723381Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:43:37.365838271Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:42.385538166Z", "category": "connection", "type": "other", "level": "success", "message": "Trying to reconnect" }, { "correlationId": "", "timestamp": "2022-10-20T10:43:48.155042247Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" }, { "correlationId": "", "timestamp": "2022-10-20T10:44:02.436203602Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:44:02.435833492Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-20T10:44:07.451835944Z", "category": "connection", "type": "other", "level": "success", "message": "Trying to reconnect" }, { "correlationId": "", "timestamp": "2022-10-20T10:44:15.148345794Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" } ], "enabledSince": "2022-10-20T10:44:16.098855914Z", "enabledUntil": "2022-10-20T11:44:18.092207891Z" }

After restarting Kafka the connection was succesful .

How ever , it does not show any reason why message of creating / modify thing is not working .

As per the above log status Kafka connection is succesful

I have dropped below message following the Ditto Protocol Structure Defined

{ "topic": "org.eclipse.ditto/fancy-car/things/twin/commands/modify", "headers": { "content-type": "application/merge-patch+json", "correlation-id": "090397" }, "path": "/", "value": { "thingId": "org.eclipse.ditto:fancy-car", "policyId": "org.eclipse.ditto:fancy-car", "definition": "org.eclipse.ditto:SomeModel:1.0.0", "attributes": { "manufacturer": "ACME", "VIN": "0815666337", "VIN2": "081567889" } }, "features": { "transmission": { "properties": { "automatic": true, "mode": "eco", "cur_speed": 90, "gear": 5 } } } }

Ideally this should modify my existing thing with thingId "org.eclipse.ditto:fancy-car" but its not modifying . Also Iam not sure whether ditto consumed the message and failed or Ditto did not consume at all !

Am I still missing something ?

DerSchwilk commented 2 years ago

Since you can't see any messages received in your logs, you can assume that the connection doesn't receive any. This can be either a wrong configuration in the connection or something wrong when sending messages to your broker. I would recommend analyzing in your Apache Kafka broker if the messages are published correctly to the topic you specified in your connection.

thjaeckle commented 2 years ago

{ "correlationId": "", "timestamp": "2022-10-20T10:43:12.274765828Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-20T10:43:12.274378950Z. Will reconnect after PT5S" },

that is from your connection logs - and looks not "healthy"

rajkraleti commented 2 years ago

@thjaeckle && @DerSchwilk
Thanks for the udpate , The older logs have some failures because I have restarted the Kafka broker as this is not working .

How ever the latest timestamp message show { "correlationId": "", "timestamp": "2022-10-20T10:44:15.148345794Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" } ], "enabledSince": "2022-10-20T10:44:16.098855914Z", "enabledUntil": "2022-10-20T11:44:18.092207891Z" }

connection was succesful with no message post restart .

When I manually listen to Kafka - I can see multiple messages in the topic . How ever Ditto is not reading anything . raj@raj:/usr/local/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning { "topic": "org.eclipse.ditto/fancy-car/things/twin/commands/modify", "headers": { "content-type": "application/merge-patch+json", "correlation-id": "090397" }, "path": "/", "value": { "thingId": "org.eclipse.ditto:fancy-car", "policyId": "org.eclipse.ditto:fancy-car", "definition": "org.eclipse.ditto:SomeModel:1.0.0", "attributes": { "manufacturer": "ACME", "VIN": "0815666337", "VIN2": "081567889" } }, "features": { "transmission": { "properties": { "automatic": true, "mode": "eco", "cur_speed": 90, "gear": 5 } } } }


Does it need any policy updates to work with Kafka and read the messages ? because Iam using only default policy created with the helloworld thing creation for the fancy car example .

DerSchwilk commented 2 years ago

The policy is only important when the first step (Receiving the messages in the connection) works. In order to CRUD your things via the connection the connections authorizationContext needs permissions in the things policy. I however doubt that that is the problem here because you would see a corresponding log in your connection.

Like stated earlier your connection doesn't seem to get any inbound messages. If you validated that your Kafka Broker publishes them correctly (If I get that correct you did that by subscribing manually to the topic), the most likely place where there could be a problem is in the connection configuration. (And there likely in the Source configuration.)

Can you check again that you configured the source correctly and followed the guide: https://www.eclipse.org/ditto/connectivity-protocol-bindings-kafka2.html#source-format

rajkraleti commented 2 years ago

I have created connection following the connection source configuration document for source

below is the request response for connection information -

http://localhost:8080/api/2/connections/

[ { "id": "ad573a5c-94f4-48e3-bd8f-2ce9dbc7096e", "name": "KafkatestConnection", "connectionType": "kafka", "connectionStatus": "open", "uri": "tcp://10.0.2.15:9092", "sources": [ { "addresses": [ "testTopic" ], "consumerCount": 1, "qos": 1, "authorizationContext": [ "ditto:inbound-auth-subject" ], "enforcement": { "input": "{{ header:deviceid }}", "filters": [ "{{ entity:id }}" ] }, "acknowledgementRequests": { "includes": [] }, "headerMapping": {}, "payloadMapping": [ "Ditto" ], "replyTarget": { "address": "testTopic", "headerMapping": {}, "expectedResponseTypes": [ "response", "error", "nack" ], "enabled": true } } ], "targets": [ { "address": "events/twin", "topics": [ "/_/things/twin/events" ], "authorizationContext": [ "ditto:outbound-auth-subject" ], "headerMapping": { "message-id": "{{ header:correlation-id }}", "content-type": "application/vnd.eclipse.ditto+json" } } ], "clientCount": 1, "failoverEnabled": true, "validateCertificates": true, "processorPoolSize": 1, "specificConfig": { "saslMechanism": "plain", "bootstrapServers": "10.0.2.15:9092" }, "tags": [] } ]

Connection status for that connection Id looks succesful as well .


{ "type": "connectivity.responses:retrieveConnectionStatus", "status": 200, "connectionId": "ad573a5c-94f4-48e3-bd8f-2ce9dbc7096e", "connectionStatus": "open", "liveStatus": "open", "recoveryStatus": "succeeded", "connectedSince": "2022-10-20T15:04:39.211715677Z", "clientStatus": [ { "type": "client", "client": "9bfdab8c0333", "status": "open", "recoveryStatus": "succeeded", "statusDetails": "CONNECTED", "inStateSince": "2022-10-20T15:04:39.211715677Z" } ], "sourceStatus": [ { "type": "source", "client": "9bfdab8c0333", "address": "testTopic", "status": "open", "statusDetails": "Consumer started.", "inStateSince": "2022-10-20T15:04:33.995469339Z" } ], "targetStatus": [ { "type": "target", "client": "9bfdab8c0333", "address": "events/twin", "status": "open", "statusDetails": "Producer started.", "inStateSince": "2022-10-20T15:04:33.993705568Z" } ] }


DerSchwilk commented 2 years ago

I tried to reproduce it to provide you some more information on what could be the issue. For me it worked just fine and I received messages in my connection:

{ "connectionId": "a1a89b9a-d440-48d6-9d44-9070a42da7bb", "connectionLogs": [ { "correlationId": "", "timestamp": "2022-10-21T05:48:19.597112Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" }, { "correlationId": "", "timestamp": "2022-10-21T05:48:33.821454Z", "category": "connection", "type": "other", "level": "failure", "message": "Connection failed due to: Unexpected consumer failure. - cause : Can't establish connection with kafkaBroker after 1 attempts. At 2022-10-21T05:48:33.821366Z. Will reconnect after PT5S" }, { "correlationId": "", "timestamp": "2022-10-21T05:48:38.845390Z", "category": "connection", "type": "other", "level": "success", "message": "Trying to reconnect" }, { "correlationId": "", "timestamp": "2022-10-21T05:48:46.577822Z", "category": "connection", "type": "other", "level": "success", "message": "Connection successful" }, { "correlationId": "82f77fec-562f-4d0e-8615-5b48a6fcf662", "timestamp": "2022-10-21T06:05:37.348240Z", "category": "response", "type": "published", "level": "success", "message": "Successfully published response. - Message headers: [content-type=application/vnd.eclipse.ditto+json, correlation-id=82f77fec-562f-4d0e-8615-5b48a6fcf662] - Message payload: {\"topic\":\"unknown/unknown/things/twin/errors\",\"headers\":{\"kafka.timestamp\":\"1666332337332\",\"kafka.topic\":\"testTopic\",\"ditto-connection-id\":\"a1a89b9a-d440-48d6-9d44-9070a42da7bb\",\"response-required\":false,\"content-type\":\"application/json\",\"correlation-id\":\"82f77fec-562f-4d0e-8615-5b48a6fcf662\"},\"path\":\"/\",\"value\":{\"status\":400,\"error\":\"placeholder:unresolved\",\"message\":\"The placeholder '{{ header:device_id }}' could not be resolved.\",\"description\":\"Some placeholders could not be resolved. Check the spelling of the placeholder and make sure all required headers are set.\"},\"status\":400}", "address": "_responses" }, { "correlationId": "82f77fec-562f-4d0e-8615-5b48a6fcf662", "timestamp": "2022-10-21T06:05:37.349040Z", "category": "source", "type": "consumed", "level": "failure", "message": "Ran into a failure when parsing an input command: The placeholder '{{ header:device_id }}' could not be resolved. Some placeholders could not be resolved. Check the spelling of the placeholder and make sure all required headers are set. - Message headers: [correlation-id=82f77fec-562f-4d0e-8615-5b48a6fcf662, kafka.timestamp=1666332337338, kafka.topic=testTopic, content-type=application/vnd.eclipse.ditto+json, ditto-expected-response-types=[\"response\",\"error\",\"nack\"], ditto-connection-id=a1a89b9a-d440-48d6-9d44-9070a42da7bb, ditto-reply-target=0] - Message payload: null", "address": "testTopic" }, { "correlationId": "82f77fec-562f-4d0e-8615-5b48a6fcf662", "timestamp": "2022-10-21T06:05:37.349213Z", "category": "response", "type": "dispatched", "level": "failure", "message": "Response was not successful. This may be the case for when a thing could not be found or the authorization subject of the consuming source was not allowed to write a thing. - Message headers: [response-required=false, correlation-id=82f77fec-562f-4d0e-8615-5b48a6fcf662, kafka.timestamp=1666332337338, kafka.topic=testTopic, content-type=application/vnd.eclipse.ditto+json, ditto-expected-response-types=[\"response\",\"error\",\"nack\"], ditto-connection-id=a1a89b9a-d440-48d6-9d44-9070a42da7bb, ditto-reply-target=0] - Message payload: {\"type\":\"things.responses:errorResponse\",\"status\":400,\"thingId\":\"unknown:unknown\",\"payload\":{\"status\":400,\"error\":\"placeholder:unresolved\",\"message\":\"The placeholder '{{ header:device_id }}' could not be resolved.\",\"description\":\"Some placeholders could not be resolved. Check the spelling of the placeholder and make sure all required headers are set.\"}}", "address": "_responses", "entityType": "thing", "entityId": "unknown:unknown" },

  1. Are you sure that the IP-Adress specified in your connection resolves to the correct Kafka broker? (In my case I used localhost:9092 since I run my broker in a local docker container)
  2. Are you sure you created the zookeper topic correctly (Running /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper SERVER-IP:2181 --replication-factor 1 --partitions 1 --topic testTopic in the broker container does that perfectly fine for me)
rajkraleti commented 2 years ago

Yes the IP Address was my local IP and followed the same procedure to install & Create topics with Kafka .

I was able to consume Kafka messages from Listener written in Java from that topic but Ditto is not listening anything .

Does it have any version related constraints to work with Kafka ? I have installed scala kafka_2.13-3.2.3

DerSchwilk commented 2 years ago

There are no incompatibilities with newer versions that I'm aware of. I tried it with version 2.7.0, maybe you can try again with that version. If it works then, maybe there really is a problem.

rajkraleti commented 2 years ago

@DerSchwilk can you please share me the connection info .

Assuming the local Kafka installation with VM has a problem .

I have tried a managed Kafka Connecting with Eclipse Ditto this time.

Below are the steps I have replicated .

  1. Created a org.eclipse.ditto:fancy-car following the example given _----------------------------------------------------------------- https://www.eclipse.org/ditto/intro-hello-world.html

  2. using the policy nginx :ditto I have created a connection to managed confluent kafka (Apache 2.X) as my local Kafka is not working . with below connection { "id": "kafka123", "name": "Kafka Connection", "connectionType": "kafka", "connectionStatus": "open", "uri": "tcp://pkc-6ojv2.us-west4.gcp.confluent.cloud:9092", "sources": [ { "addresses": [ "testTopic" ], "consumerCount": 1, "qos": 1, "authorizationContext": [ "nginx:ditto" ], "acknowledgementRequests": { "includes": [] }, "headerMapping": {}, "payloadMapping": [ "Ditto" ], "replyTarget": { "enabled": false } } ], "targets": [], "clientCount": 1, "failoverEnabled": true, "validateCertificates": true, "processorPoolSize": 1, "specificConfig": { "saslMechanism": "plain", "bootstrapServers": "pkc-6ojv2.us-west4.gcp.confluent.cloud:9092" }, "tags": [] }

these Kafka Topics are unsecured and accessible for next 10 days if you want to replicate .

  1. then published messages in Ditto protocol as explained

{ "topic": "fancycar/2/things/twin/commands/modify", "headers": { "correlation-id": "112211", "content-type": "application/vnd.eclipse.ditto+json" }, "path": "/", "value": { "thingId": "facncycar:2", "policyId": "org.eclipse.ditto:fancy-car", "definition": "org.eclipse.ditto:SomeModel:1.0.0", "attributes": { "location": { "latitude": 44.673856, "longitude": 8.261719 } }, "features": { "accelerometer": { "properties": { "x": 3.141, "y": 2.718, "z": 1, "unit": "g" }, "desiredProperties": { "x": 4 } } } } }


https://www.eclipse.org/ditto/protocol-examples-creatething.html


How ever there was no message / update in the logs apart from below log messages

image

I have tried replicating similar use case with MQTT connection , HTTP Connection - I was able to manage things how ever Kafka is not working due to any reason . Please observe it.

DerSchwilk commented 2 years ago

When trying to connect to your managed Kafka instance, I also get the reconnecting logs in my connection. Observing the connectivity service logs, I can see that the connection fails with TimeoutException: Timeout expired while fetching topic metadata Kafka, which hints to an authentication failure. (https://stackoverflow.com/questions/54254686/timeoutexception-timeout-expired-while-fetching-topic-metadata-kafka)

I thus also created a confluent hosted Kafka instance to validate and get the same issue with an anonymus authentication. and TCP. If I use an API key as credentials and SSL, I get it working and also receive messages in my connection.

thjaeckle commented 2 years ago

Closing due to inactivity to provided feedback, assuming the question has been resolved.

rajkraleti commented 2 years ago

It worked wih SSL - Thank you @thjaeckle , will try some more use cases and post questions . Thanks a lot for the support @DerSchwilk