eclipse-ditto / ditto

Eclipse Ditto™: Digital Twin framework of Eclipse IoT - main repository
https://eclipse.dev/ditto/
Eclipse Public License 2.0
683 stars 222 forks source link

ditto not sending MQTT messages when a thing is updated #1627

Closed rinelb closed 1 year ago

rinelb commented 1 year ago

I am not receiving updates via MQTT from ditto, How do i setup the ditto connection to send MQTT messages when a thing is updated

i can send data to ditto via MQTT and the features of the thing gets updated I followed the example show on https://github.com/eclipse-ditto/ditto-examples/blob/master/mqtt-bidirectional/README.md

im running eclipse ditto on docker on windows 10 i ran docker compose from the ditto/deployment/docker folder

it create a think called "my.test:octopus" and send data via MQTT to update the features from the tutorial above

here is my CRUD Connection Template

{
  "id": "mqtt-example-connection-123",
  "name": "testing",
  "connectionType": "mqtt",
  "connectionStatus": "open",
  "uri": "tcp://test.mosquitto.org:1883",
  "sources": [
    {
      "addresses": [
        "ditto-tutorial/#"
      ],
      "consumerCount": 1,
      "qos": 0,
      "authorizationContext": [
        "nginx:ditto"
      ],
      "headerMapping": {},
      "payloadMapping": [
        "javascript"
      ],
      "replyTarget": {
        "address": "{{header:reply-to}}",
        "headerMapping": {},
        "expectedResponseTypes": [
          "response",
          "error"
        ],
        "enabled": true
      }
    }
  ],
  "targets": [
    {
      "address": "ditto-tutorial/{{ thing:id }}",
      "topics": [
        "_/_/things/twin/events",
        "_/_/things/live/messages"
      ],
      "qos": 0,
      "authorizationContext": [
        "nginx:ditto"
      ],
      "issuedAcknowledgementLabel": "{{connection:id}}:my-custom-ack",
      "headerMapping": {}
    }
  ],
  "clientCount": 1,
  "failoverEnabled": true,
  "validateCertificates": true,
  "processorPoolSize": 1,
  "specificConfig": {
    "lastWillTopic": "my/last/will/topic",
    "lastWillMessage": "my last will message",
    "publisherId": "my-awesome-mqtt-publisher-client-id",
    "clientId": "my-awesome-mqtt-client-id",
    "lastWillRetain": "false",
    "reconnectForRedelivery": "true",
    "reconnectForRedeliveryDelay": "5s",
    "lastWillQos": "1",
    "cleanSession": "false",
    "separatePublisherClient": "true"
  },
  "mappingDefinitions": {
    "javascript": {
      "mappingEngine": "JavaScript",
      "options": {
        "incomingScript": "function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) {\r\n    const jsonString = String.fromCharCode.apply(null, new Uint8Array(bytePayload));\r\n    const jsonData = JSON.parse(jsonString); \r\n    const thingId = jsonData.thingId.split(':'); \r\n    const value = { \r\n        temp_sensor: { \r\n            properties: { \r\n                value: jsonData.temp \r\n            } \r\n        },\r\n        altitude: {            \r\n            properties: {                \r\n                value: jsonData.alt            \r\n            }        \r\n        }    \r\n    };    \r\n    return Ditto.buildDittoProtocolMsg(\r\n        thingId[0], // your namespace \r\n        thingId[1], \r\n        'things', // we deal with a thing\r\n        'twin', // we want to update the twin\r\n        'commands', // create a command to update the twin\r\n        'modify', // modify the twin\r\n        '/features', // modify all features at once\r\n        headers, \r\n        value\r\n    );\r\n}",
        "outgoingScript": ""
      }
    }
  },
  "tags": []
}

im using python script to send data via MQTT to update the my.test:octopus device

>python send_mqtt_data_test.py
ditto-tutorial/my.test:octopus b'{"temp": "17.77", "alt": "344.21", "thingId": "my.test:octopus"}'
recieved
ditto-tutorial/my.test:octopus b'{"temp": "19.73", "alt": "349.06", "thingId": "my.test:octopus"}'
recieved
ditto-tutorial/my.test:octopus b'{"temp": "19.0", "alt": "317.99", "thingId": "my.test:octopus"}'

I can see on the UI that ditto is updating the my.test:octopus when ever it recieves mqtt messages show above

i am running a seperate python script that is subscribing to the following topics

topic0 = "my.test:octopus/#"
topic1 = "ditto-tutorial/#"
topic2 = "my/last/will/topic"
topic3 = "_/_/things/live/messages/#"
topic4 = "test/ditto/data"

it recieves topic1 only, this is just to make sure the subscribe is working

note for topic "test/ditto/data" i added

 "headerMapping": {
        "mqtt.topic": "test/ditto/data",
        "mqtt.qos": 0
      }

to the "replyTarget" and "targets" tags and both on the templet above but that didnt work so i removed it.

then i went back to the same templet above and caputred the logs below shows the logs of the "eclipse/ditto-connectivity:lates"

>docker ps
CONTAINER ID   IMAGE                                COMMAND                  CREATED      STATUS                 PORTS                      NAMES
95a275b0097e   nginx:1.21-alpine                    "/docker-entrypoint.…"   8 days ago   Up 5 hours             0.0.0.0:8080->80/tcp       docker-nginx-1
7c6f1be40a78   eclipse/ditto-connectivity:latest    "/usr/bin/tini -- sh…"   8 days ago   Up 5 hours (healthy)   8080/tcp                   docker-connectivity-1
179c2b7060d1   eclipse/ditto-things:latest          "/usr/bin/tini -- sh…"   8 days ago   Up 5 hours (healthy)   8080/tcp                   docker-things-1
04814d55b067   eclipse/ditto-things-search:latest   "/usr/bin/tini -- sh…"   8 days ago   Up 5 hours (healthy)   8080/tcp                   docker-things-search-1
57fd3066e4af   eclipse/ditto-gateway:latest         "/usr/bin/tini -- sh…"   8 days ago   Up 5 hours (healthy)   0.0.0.0:8081->8080/tcp     docker-gateway-1
ffec7a9f9e67   swaggerapi/swagger-ui:v4.14.3        "/docker-entrypoint.…"   8 days ago   Up 5 hours             80/tcp, 8080/tcp           docker-swagger-ui-1
59b155e8ade8   eclipse/ditto-policies:latest        "/usr/bin/tini -- sh…"   8 days ago   Up 5 hours (healthy)   8080/tcp                   docker-policies-1
80a1e94b5821   mongo:5.0                            "docker-entrypoint.s…"   8 days ago   Up 5 hours             0.0.0.0:27017->27017/tcp   docker-mongodb-1

>docker container logs 7c6f1be40a78 -f -t -n 500
2023-04-26T11:38:51.510424852Z 2023-04-26 13:38:51,509 INFO  [] o.e.d.i.u.p.a.SubUpdater akka://ditto-cluster/user/live-signal-aware-sub-supervisor/subUpdater2 - DData is in sync with cluster state
2023-04-26T11:39:36.078581553Z 2023-04-26 13:39:36,077 INFO  [] o.e.d.i.u.p.a.SubUpdater akka://ditto-cluster/user/connectivity-sub-supervisor/subUpdater2 - Start to sync cluster state
2023-04-26T11:39:36.079302723Z 2023-04-26 13:39:36,078 INFO  [] o.e.d.i.u.p.a.SubUpdater akka://ditto-cluster/user/connectivity-sub-supervisor/subUpdater2 - DData is in sync with cluster state
2023-04-26T11:39:45.048817148Z 2023-04-26 13:39:45,047 INFO  [] o.e.d.i.u.p.c.PersistenceCleanupActor akka://ditto-cluster/user/connectivityRoot/persistenceCleanup - Quiet period expired, starting stream from <>
2023-04-26T11:39:45.057702501Z 2023-04-26 13:39:45,057 INFO  [] o.e.d.i.u.p.c.PersistenceCleanupActor akka://ditto-cluster/user/connectivityRoot/persistenceCleanup - Stream complete. Next stream in <PT9M45.3515625S> from start
2023-04-26T11:42:08.517459709Z 2023-04-26 13:42:08,517 INFO  [bca4d2ce-99a8-4c9c-87be-9086b5ce9b3c] o.e.d.c.s.m.p.ConnectionPersistenceActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa - Appending the following tags to event of type <connectivity.events:connectionModified> for connection with ID <mqtt-example-connection-123>: <[always-alive, priority-0]>
2023-04-26T11:42:08.520498993Z 2023-04-26 13:42:08,520 INFO  [bca4d2ce-99a8-4c9c-87be-9086b5ce9b3c] o.e.d.c.s.m.p.ConnectionPersistenceActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa - Successfully persisted Event <connectivity.events:connectionModified> w/ rev: <183>.
2023-04-26T11:42:08.725380276Z 2023-04-26 13:42:08,724 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$tb/c1 - Client <consumer:my-awesome-mqtt-client-id> disconnected by <USER>.
2023-04-26T11:42:08.728267055Z 2023-04-26 13:42:08,727 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$tb/c1 - Client <publisher:my-awesome-mqtt-publisher-client-id> disconnected by <USER>.
2023-04-26T11:42:08.729872717Z 2023-04-26 13:42:08,729 INFO  [bca4d2ce-99a8-4c9c-87be-9086b5ce9b3c] o.e.d.c.s.m.p.ConnectionPersistenceActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa - Starting ClientActor for connection <mqtt-example-connection-123> with <1> clients.
2023-04-26T11:42:08.731452896Z 2023-04-26 13:42:08,730 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$tb/c1 - Actor stopped, stopping clients.
2023-04-26T11:42:08.731560131Z 2023-04-26 13:42:08,730 DEBUG [] a.a.CoordinatedShutdown CoordinatedShutdown(akka://ditto-cluster) - Successfully cancelled CoordinatedShutdown task [service-unbind-akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$tb/c1] from phase [service-unbind].
2023-04-26T11:42:08.736021634Z 2023-04-26 13:42:08,735 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Using default client ID <mqtt-example-connection-123>
2023-04-26T11:42:08.743098788Z 2023-04-26 13:42:08,742 INFO  [] o.e.d.c.s.m.m.l.ConnectionLoggerRegistry  - Invalidating loggers for connection <mqtt-example-connection-123>.
2023-04-26T11:42:08.743355394Z 2023-04-26 13:42:08,743 INFO  [] o.e.d.c.s.m.m.l.ConnectionLoggerRegistry  - Initializing loggers for connection <mqtt-example-connection-123>.
2023-04-26T11:42:08.749055616Z 2023-04-26 13:42:08,748 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Configured for processing messages with the following MessageMapperRegistry: <DefaultMessageMapperRegistry [defaultMapper=WrappingMessageMapper [delegate=DittoMessageMapper [id=default, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json]]], customMappers={javascript=WrappingMessageMapper [delegate=JavaScriptMessageMapperRhino [id=javascript, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[], configuration=ImmutableJavaScriptMessageMapperConfiguration [properties=MergedJsonObjectMap[jsonObject={"incomingScript":"function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) {\r\n    const jsonString = String.fromCharCode.apply(null, new Uint8Array(bytePayload));\r\n    const jsonData = JSON.parse(jsonString); \r\n    const thingId = jsonData.thingId.split(':'); \r\n    const value = { \r\n        temp_sensor: { \r\n            properties: { \r\n                value: jsonData.temp \r\n            } \r\n        },\r\n        altitude: {            \r\n            properties: {                \r\n                value: jsonData.alt            \r\n            }        \r\n        }    \r\n    };    \r\n    return Ditto.buildDittoProtocolMsg(\r\n        thingId[0], // your namespace \r\n        thingId[1], \r\n        'things', // we deal with a thing\r\n        'twin', // we want to update the twin\r\n        'commands', // create a command to update the twin\r\n        'modify', // modify the twin\r\n        '/features', // modify all features at once\r\n        headers, \r\n        value\r\n    );\r\n}","outgoingScript":""},fallbackObject={}], incomingConditions={}, outgoingConditions={}], incomingMapping=org.eclipse.ditto.connectivity.service.mapping.javascript.ScriptedIncomingMapping@19db9cb0, outgoingMapping=org.eclipse.ditto.connectivity.service.mapping.javascript.DefaultOutgoingMapping@40fb3f02]]}, fallbackMappers={UpdateTwinWithLiveResponse=WrappingMessageMapper [delegate=UpdateTwinWithLiveResponseMessageMapper [id=UpdateTwinWithLiveResponse, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json], dittoHeadersForMerge=ImmutableDittoHeaders [{response-required=false, if-match=*}]]], Normalized=WrappingMessageMapper [delegate=NormalizedMessageMapper [id=Normalized, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[], jsonFieldSelector=null]], Ditto=WrappingMessageMapper [delegate=DittoMessageMapper [id=Ditto, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json]]], CloudEvents=WrappingMessageMapper [delegate=id=CloudEvents, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[]], RawMessage=WrappingMessageMapper [delegate=RawMessageMapper [id=RawMessage, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json], dittoMessageMapper=DittoMessageMapper [id=null, incomingConditions=null, outgoingConditions=null, contentTypeBlocklist=null], fallbackOutgoingContentType=ContentType [value=text/plain; charset=utf-8, parsingStrategy=TEXT], incomingMessageHeaders={ditto-message-direction={{header:ditto-message-direction|fn:default('TO')}}, ditto-message-subject={{header:ditto-message-subject}}, ditto-message-feature-id={{header:ditto-message-feature-id}}, content-type={{header:content-type|fn:default('application/octet-stream')}}, ditto-message-thing-id={{header:ditto-message-thing-id}}, status={{header:status}}}]]}]>
2023-04-26T11:42:08.752509902Z 2023-04-26 13:42:08,752 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Configured for processing messages with the following MessageMapperRegistry: <DefaultMessageMapperRegistry [defaultMapper=WrappingMessageMapper [delegate=DittoMessageMapper [id=default, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json]]], customMappers={javascript=WrappingMessageMapper [delegate=JavaScriptMessageMapperRhino [id=javascript, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[], configuration=ImmutableJavaScriptMessageMapperConfiguration [properties=MergedJsonObjectMap[jsonObject={"incomingScript":"function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) {\r\n    const jsonString = String.fromCharCode.apply(null, new Uint8Array(bytePayload));\r\n    const jsonData = JSON.parse(jsonString); \r\n    const thingId = jsonData.thingId.split(':'); \r\n    const value = { \r\n        temp_sensor: { \r\n            properties: { \r\n                value: jsonData.temp \r\n            } \r\n        },\r\n        altitude: {            \r\n            properties: {                \r\n                value: jsonData.alt            \r\n            }        \r\n        }    \r\n    };    \r\n    return Ditto.buildDittoProtocolMsg(\r\n        thingId[0], // your namespace \r\n        thingId[1], \r\n        'things', // we deal with a thing\r\n        'twin', // we want to update the twin\r\n        'commands', // create a command to update the twin\r\n        'modify', // modify the twin\r\n        '/features', // modify all features at once\r\n        headers, \r\n        value\r\n    );\r\n}","outgoingScript":""},fallbackObject={}], incomingConditions={}, outgoingConditions={}], incomingMapping=org.eclipse.ditto.connectivity.service.mapping.javascript.ScriptedIncomingMapping@e409f44, outgoingMapping=org.eclipse.ditto.connectivity.service.mapping.javascript.DefaultOutgoingMapping@40fb3f02]]}, fallbackMappers={UpdateTwinWithLiveResponse=WrappingMessageMapper [delegate=UpdateTwinWithLiveResponseMessageMapper [id=UpdateTwinWithLiveResponse, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json], dittoHeadersForMerge=ImmutableDittoHeaders [{response-required=false, if-match=*}]]], Normalized=WrappingMessageMapper [delegate=NormalizedMessageMapper [id=Normalized, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[], jsonFieldSelector=null]], Ditto=WrappingMessageMapper [delegate=DittoMessageMapper [id=Ditto, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json]]], CloudEvents=WrappingMessageMapper [delegate=id=CloudEvents, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[]], RawMessage=WrappingMessageMapper [delegate=RawMessageMapper [id=RawMessage, incomingConditions={}, outgoingConditions={}, contentTypeBlocklist=[application/vnd.eclipse-hono-empty-notification, application/vnd.eclipse-hono-device-provisioning-notification, application/vnd.eclipse-hono-dc-notification+json, application/vnd.eclipse-hono-delivery-failure-notification+json], dittoMessageMapper=DittoMessageMapper [id=null, incomingConditions=null, outgoingConditions=null, contentTypeBlocklist=null], fallbackOutgoingContentType=ContentType [value=text/plain; charset=utf-8, parsingStrategy=TEXT], incomingMessageHeaders={ditto-message-direction={{header:ditto-message-direction|fn:default('TO')}}, ditto-message-subject={{header:ditto-message-subject}}, ditto-message-feature-id={{header:ditto-message-feature-id}}, content-type={{header:content-type|fn:default('application/octet-stream')}}, ditto-message-thing-id={{header:ditto-message-thing-id}}, status={{header:status}}}]]}]>
2023-04-26T11:42:11.576415015Z 2023-04-26 13:42:11,575 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Connected client <consumer:my-awesome-mqtt-client-id>.
2023-04-26T11:42:11.576679244Z 2023-04-26 13:42:11,576 WARN  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - received unknown/unsupported message ReportConnectionStatusSuccess[] in state CONNECTING - status: unknown: initialized - sender: Actor[akka://ditto-cluster/deadLetters]
2023-04-26T11:42:11.988722076Z 2023-04-26 13:42:11,988 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Connected client <publisher:my-awesome-mqtt-publisher-client-id>.
2023-04-26T11:42:11.989105646Z 2023-04-26 13:42:11,988 WARN  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - received unknown/unsupported message ReportConnectionStatusSuccess[] in state CONNECTING - status: unknown: initialized - sender: Actor[akka://ditto-cluster/deadLetters]
2023-04-26T11:42:11.989193882Z 2023-04-26 13:42:11,988 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Starting publisher and consumers.
2023-04-26T11:42:13.579425788Z 2023-04-26 13:42:13,579 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Client actor registered: <mqtt-example-connection-123>
2023-04-26T11:42:17.005536859Z 2023-04-26 13:42:17,004 INFO  [] o.e.d.c.s.m.m.h.MqttClientActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1 - Initialization of consumers, publisher and subscriptions successful, going to CONNECTED.
2023-04-26T11:42:17.030616472Z 2023-04-26 13:42:17,030 INFO  [01a7852d-5d65-4b0b-805e-0fa3fbb73dfc] o.e.d.c.s.m.ConnectionIdsRetrievalActor akka://ditto-cluster/user/connectivityRoot/connectionIdsRetrieval - Retrieving all connection IDs ...
2023-04-26T11:42:26.783279360Z 2023-04-26 13:42:26,782 INFO  [0732e096-db37-4182-91a1-f84ec3019c9f] o.e.d.c.s.m.InboundDispatchingSink  - onMapped mappedHeaders ImmutableDittoHeaders [{ditto-reply-target=0, ditto-expected-response-types=["response","error"], ditto-origin=mqtt-example-connection-123, ditto-auth-context={"type":"pre-authenticated-connection","subjects":["nginx:ditto"]}, correlation-id=0732e096-db37-4182-91a1-f84ec3019c9f, mqtt.retain=false, mqtt.topic=ditto-tutorial/my.test:octopus, mqtt.qos=0, ditto-entity-id=thing:my.test:octopus, ditto-inbound-payload-mapper=javascript}]
2023-04-26T11:42:26.784468715Z 2023-04-26 13:42:26,784 INFO  [0732e096-db37-4182-91a1-f84ec3019c9f] o.e.d.e.s.d.EdgeCommandForwarderActor akka://ditto-cluster/user/connectivityRoot/edgeCommandForwarder - Forwarding thing signal with ID <my.test:octopus> and type <things.commands:modifyFeatures> to 'things' shard region
2023-04-26T11:42:26.784560623Z 2023-04-26 13:42:26,784 INFO  [0732e096-db37-4182-91a1-f84ec3019c9f] o.e.d.e.s.a.AcknowledgementAggregatorActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1/ackr0-0732e096-db37-4182-91a1-f84ec3019c9f - Starting to wait for all requested acknowledgements <[twin-persisted]> for a maximum duration of <PT1M5S>.
2023-04-26T11:42:29.804689163Z 2023-04-26 13:42:29,803 INFO  [137fe478-789a-4d30-a61d-79a8c746ca55] o.e.d.c.s.m.InboundDispatchingSink  - onMapped mappedHeaders ImmutableDittoHeaders [{ditto-reply-target=0, ditto-expected-response-types=["response","error"], ditto-origin=mqtt-example-connection-123, ditto-auth-context={"type":"pre-authenticated-connection","subjects":["nginx:ditto"]}, correlation-id=137fe478-789a-4d30-a61d-79a8c746ca55, mqtt.retain=false, mqtt.topic=ditto-tutorial/my.test:octopus, mqtt.qos=0, ditto-entity-id=thing:my.test:octopus, ditto-inbound-payload-mapper=javascript}]
2023-04-26T11:42:29.806825936Z 2023-04-26 13:42:29,806 INFO  [137fe478-789a-4d30-a61d-79a8c746ca55] o.e.d.e.s.d.EdgeCommandForwarderActor akka://ditto-cluster/user/connectivityRoot/edgeCommandForwarder - Forwarding thing signal with ID <my.test:octopus> and type <things.commands:modifyFeatures> to 'things' shard region
2023-04-26T11:42:29.806929090Z 2023-04-26 13:42:29,806 INFO  [137fe478-789a-4d30-a61d-79a8c746ca55] o.e.d.e.s.a.AcknowledgementAggregatorActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1/ackr1-137fe478-789a-4d30-a61d-79a8c746ca55 - Starting to wait for all requested acknowledgements <[twin-persisted]> for a maximum duration of <PT1M5S>.
2023-04-26T11:42:32.870336088Z 2023-04-26 13:42:32,869 INFO  [89a18a23-512c-4b28-8f99-7ff0b862b9a0] o.e.d.c.s.m.InboundDispatchingSink  - onMapped mappedHeaders ImmutableDittoHeaders [{ditto-reply-target=0, ditto-expected-response-types=["response","error"], ditto-origin=mqtt-example-connection-123, ditto-auth-context={"type":"pre-authenticated-connection","subjects":["nginx:ditto"]}, correlation-id=89a18a23-512c-4b28-8f99-7ff0b862b9a0, mqtt.retain=false, mqtt.topic=ditto-tutorial/my.test:octopus, mqtt.qos=0, ditto-entity-id=thing:my.test:octopus, ditto-inbound-payload-mapper=javascript}]
2023-04-26T11:42:32.872209118Z 2023-04-26 13:42:32,871 INFO  [89a18a23-512c-4b28-8f99-7ff0b862b9a0] o.e.d.e.s.d.EdgeCommandForwarderActor akka://ditto-cluster/user/connectivityRoot/edgeCommandForwarder - Forwarding thing signal with ID <my.test:octopus> and type <things.commands:modifyFeatures> to 'things' shard region
2023-04-26T11:42:32.872318681Z 2023-04-26 13:42:32,871 INFO  [89a18a23-512c-4b28-8f99-7ff0b862b9a0] o.e.d.e.s.a.AcknowledgementAggregatorActor akka://ditto-cluster/system/sharding/connection/1/mqtt-example-connection-123/pa/$Hb/c1/ackr2-89a18a23-512c-4b28-8f99-7ff0b862b9a0 - Starting to wait for all requested acknowledgements <[twin-persisted]> for a maximum duration of <PT1M5S>.
2023-04-26T11:42:43.677512942Z 2023-04-26 13:42:43,677 INFO  [] o.e.d.i.u.p.a.SubUpdater akka://ditto-cluster/user/thing-event-aware-sub-supervisor/subUpdater2 - Start to sync cluster state
2023-04-26T11:42:43.677945048Z 2023-04-26 13:42:43,677 INFO  [] o.e.d.i.u.p.a.SubUpdater akka://ditto-cluster/user/thing-event-aware-sub-supervisor/subUpdater2 - DData is in sync with cluster state

dittoScreenShot

thjaeckle commented 1 year ago

When you use a single connection for source and target, updates to twins which were caused by the source of the connection will not be published to the target - as Ditto makes the assumption that the source already knows of the change of device state. If you need to be informed, you would need to create a second MQTT connection with the target - and the first one would only contain the connection source.

Edit: However, be very careful if you use the same addresses you already have in your config - as your source consumes from MQTT address "ditto-tutorial/#" and you publish to "address": "ditto-tutorial/{{ thing:id }}", the source will consume MQTT messages from your target which could cause a loop.

rinelb commented 1 year ago

Hi @thjaeckle thanks for quick reply and explanation, much appreciated opening a second connection as a target worked for me

the scenario we need is, devices are sending data to ditto via MQTT with topic "ditto-tutorial/{{ thing:id }}" using "mqtt Broker1". When ditto recieves data from the device, it send out the data to a seperate MQTT broker "MQTT Broker2" with a topic for example "ditto/data/live/{{ thing:id }}" and there is a master device that subscribes from MQTT Broker2 with topic "ditto/data/+"

For now we just want to get it working on one broker.

if anyone interested this is the second connection templet that worked for me

{
  "id": "mqtt-example-connection-321",
  "name": null,
  "connectionType": "mqtt",
  "connectionStatus": "open",
  "uri": "tcp://test.mosquitto.org:1883",
  "sources": [],
  "targets": [
    {
      "address": "ditto/data/{{ thing:id }}",
      "topics": [
        "_/_/things/twin/events",
        "_/_/things/live/messages"
      ],
      "qos": 0,
      "authorizationContext": [
        "nginx:ditto"
      ],
      "headerMapping": {}
    }
  ],
  "clientCount": 1,
  "failoverEnabled": true,
  "validateCertificates": true,
  "processorPoolSize": 1,
  "tags": []
}