FIWARE / context.Orion-LD

Context Broker and CEF building block for context data management which supports both the NGSI-LD and the NGSI-v2 APIs
https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.06.01_60/gs_CIM009v010601p.pdf
GNU Affero General Public License v3.0
50 stars 43 forks source link

MQTT subscriptions with virtual host. #1576

Open bgramaje opened 8 months ago

bgramaje commented 8 months ago

Hello everyone.

First of all thanks for this awesome context broker. Second of all, I successfully have set up an mqtt subscription with a mqtt URI. "mqtt://{username}:{password}@{host}:{port}/{topic}". The issue I am having is adding a vhost to the uri to connect the user, since I am using RabbitMQ. Typically this is done specifying the user as "{vhost}:{username}" but when trying to create that subscription, I am having errors since there are two ':'. Also I have tried to decode it and returns me the follwing error "I20240308-11:00:01.560(1)? error | [cronTask]: InvalidResponse: Unexpected error code: 500".

Is it possible to specify the uri of the mqtt subscription to a defined vhost of the rabbitmq?

Thanks

kzangeli commented 8 months ago

Hola Borja,

To be honest, I don't have a clue :) But, let's look into this and see how we can fix the problem. So far, I have only tests this with mosquitto and frankly, haven't spent a whole lot of time on MQTT.

Not gonna be immediate though. Very busy today and next week I'll be out, working perhaps half-time. I'll put this on my ToDo (issue #280)

bgramaje commented 8 months ago

Okay thanks then.

Also, what is Broker MQTT error -3? It happens when I publish data. If i do it with mosquito_pub and mosquito_sub works perfectly fine.

Thanks or gracias!

kzangeli commented 8 months ago

"Broker MQTT error -3" ... is that something you see in the logfile? If so, post the exact text and I'll grep on it in the source code.

bgramaje commented 8 months ago

This is the log from Orion "time=2024-03-08T12:14:46.731Z | lvl=ERROR | corr=N/A | trans= | from=N/A | srv=N/A | subsrv=N/A | comp=Orion | op=mqttNotification.cpp[173]:mqttNotification | msg=MQTT Broker error -3"

kzangeli commented 8 months ago

Found it.

  int  mr = MQTTClient_publishMessage(mqttP->client, topic, &mqttMsg, &mqttToken);
  if (mr != MQTTCLIENT_SUCCESS)
  {
    LM_E(("MQTT Broker error %d", mr));
    // FIXME: Reconnect and try again                                                                                                                                                 
    return -1;
  }

And then found this in Paho MQTT Client docs:

#define     MQTTCLIENT_DISCONNECTED   -3
kzangeli commented 8 months ago

Seems like we've lost the connection between the client (Orion-LD) and the MQTT server. And, also, I've not implemented any attempt to reconnect ... I'll add that to my ToDo.

bgramaje commented 8 months ago

So do you have any guess why is it disconnecting?

bgramaje commented 8 months ago

Also this comes from " LM_T(LmtMqtt, ("Sending a notification over MQTT (topic: '%s')", topic)); int mr = MQTTClient_publishMessage(mqttP->client, topic, &mqttMsg, &mqttToken); if (mr != MQTTCLIENT_SUCCESS) { LM_E(("MQTT Broker error %d", mr)); // Reconnect and try again return -1; }" as you mentioned but I am not looking the log of " LM_T(LmtMqtt, ("Sending a notification over MQTT (topic: '%s')", topic)); ". Why would that happen?

kzangeli commented 8 months ago

If you start the broker with the proper trace levels, you'll see that one too. -t 0-255 turns on ALL trace levels

kzangeli commented 8 months ago

No clue as to why it's disconnecting. Perhaps a timeout? There's some kind of keep-alive things in MQTT connections. Sorry, not an expert on MQTT. Just took a library that implemented it for me.

bgramaje commented 8 months ago

Hey, It seems it was I already had a connection with the clientId 'Orion-LD' which was in a kind of standby mode, so the Orion connection to the broker was never successfully established. By the way, it is not related to the topic but when chaning the version of the OrionLD is data from the mongoDB erased? Cause we bump it the version and data was completely gone. Is that the behaviour we should expect when upgrading?

Thanks for all of this.

Also just hit me up whenever vhosts are avaible as URI parameter to the orion subscription.

bgramaje commented 8 months ago

By the way, I am still receiving errors of the connection to the broker:

bash MQTT disconnecting client "10.24.6.242:59148 -> 10.24.1.123:1884" with duplicate id 'Orion-LD'"

Is it possible to assign different clientID, it seems rabbitMQ does not allow multiple connections since I have one subscription per tenant, and it is recreating the connection. Could we parametrize the clientId in the subscription body? Or does it save the orion the connection and it does not recreated it again?

Thanks

bgramaje commented 8 months ago

Just to also mention is there anyway of keeping alive the connection? Is there any attribute declared for keeping alive the connection?

kzangeli commented 8 months ago

bash MQTT disconnecting client "10.24.6.242:59148 -> 10.24.1.123:1884" with duplicate id 'Orion-LD'"

That's probably a bug of mine. It was some time ago I implemented this, but I believe I implemented a way to first lookup already established connections before I create a new one.

kzangeli commented 8 months ago

Just to also mention is there anyway of keeping alive the connection? Is there any attribute declared for keeping alive the connection?

The paho mqtt library supposedly does that ... Is it not working?

kzangeli commented 8 months ago

By the way, it is not related to the topic but when chaning the version of the OrionLD is data from the mongoDB erased? Cause we bump it the version and data was completely gone. Is that the behaviour we should expect when upgrading?

No! :) That should never happen. It would be a complete disaster if the broker erased all the users data on upgrades! Not a clue what happened to you, but, I can assure you it wasn't the broker. If my broker did something like that, my boss would fire me the very same day :)

bgramaje commented 8 months ago

bash MQTT disconnecting client "10.24.6.242:59148 -> 10.24.1.123:1884" with duplicate id 'Orion-LD'"

That's probably a bug of mine. It was some time ago I implemented this, but I believe I implemented a way to first lookup already established connections before I create a new one.

Okay, it seems for me that it keeps the connection and does not reconnect if the keep alive expires

bgramaje commented 8 months ago

Just to also mention is there anyway of keeping alive the connection? Is there any attribute declared for keeping alive the connection?

The paho mqtt library supposedly does that ... Is it not working?

Which is the defualt keep alive interval you have set it?

bgramaje commented 8 months ago

By the way, it is not related to the topic but when chaning the version of the OrionLD is data from the mongoDB erased? Cause we bump it the version and data was completely gone. Is that the behaviour we should expect when upgrading?

No! :) That should never happen. It would be a complete disaster if the broker erased all the users data on upgrades! Not a clue what happened to you, but, I can assure you it wasn't the broker. If my broker did something like that, my boss would fire me the very same day :)

Okay, makes totally sense indeed. Are all of the version of OrionLD pointing to the same mongoDB database name? Just in case there was an update on that. Thanks!

bgramaje commented 8 months ago

Hey, here are some logs from today getting the disconnected status:

time=Tuesday 12 Mar 08:00:48 2024.869Z | lvl=DEBUG | corr=N/A | trans= | from=N/A | srv=N/A | subsrv=N/A | comp=Orion | op=senderThread.cpp[100]:startSenderThread | msg=Sending MQTT Notification for subscription 'urn:ngsi-ld:Subscription:103a41d0-dd4e-11ee-b89c-0242ac120105' time=Tuesday 12 Mar 08:00:48 2024.869Z | lvl=DEBUG | corr=N/A | trans= | from=N/A | srv=N/A | subsrv=N/A | comp=Orion | op=mqttConnectionLookup.cpp[52]:mqttConnectionLookup | msg=Found the MQTT connection, just, it's not connected! time=Tuesday 12 Mar 08:00:48 2024.869Z | lvl=DEBUG | corr=N/A | trans= | from=N/A | srv=N/A | subsrv=N/A | comp=Orion | op=mqttNotification.cpp[169]:mqttNotification | msg=Sending a notification over MQTT (topic: 'hwsensors/etra') time=Tuesday 12 Mar 08:00:48 2024.869Z | lvl=ERROR | corr=N/A | trans= | from=N/A | srv=N/A | subsrv=N/A | comp=Orion | op=mqttNotification.cpp[173]:mqttNotification | msg=MQTT Broker error -3

It seems that it know that the connection is not keeped alived but still I think it doest not try to reconnect. But Looking at the code

`

if (mqP->port != port)                                                  continue;
if (strcmp(host, mqP->host) != 0)                                       continue;
if ((mqP->username != NULL) && (strcmp(username, mqP->username) != 0))  continue;
if ((mqP->password != NULL) && (strcmp(password, mqP->password) != 0))  continue;
if ((mqP->version  != NULL) && (strcmp(version,  mqP->version)  != 0))  continue;

if (MQTTClient_isConnected(mqP->client) != true)
  LM_T(LmtMqtt, ("Found the MQTT connection, just, it's not connected!"));
else
  LM_T(LmtMqtt, ("Found the MQTT connection and it's connected"));

return mqP;

`

It seems that there is no reconnection attempt right?

Thanks

bgramaje commented 8 months ago

Having a look at the mqttNotify.cpp, at line 168

MqttConnection*  mqttConnectionP  = mqttConnectionLookup(mqttP->host, mqttP->port, mqttP->username, mqttP->password, mqttP->version);

Is looking for the connection if its established. If it returns NULL then the connection is not saved onto the mqttConnectionListIx variable, connecting to the broker and then assigned into the list.

Having a look at the mqttConnectionLookup.cpp, adding the following lines should solve the problem.

 if (MQTTClient_isConnected(mqP->client) != true)
      LM_T(LmtMqtt, ("Found the MQTT connection, just, it's not connected!"));
      // Remove mqP from the list since it is not connected.
      for (int j = ix; j < mqttConnectionListIx - 1; j++) {
        mqttConnectionList[j] = mqttConnectionList[j + 1];
      }
      // decrease by one the size of the list
      --mqttConnectionListIx; 
      // returning NULL since it is not connected
      return NULL;

With deleting the connection from the list I am not 100% sure if that is the correct way. I have never programmed on cpp so.

Returning NULL would cause on the mqttNotify.cpp to adding the connection as well.

  // line 172 of mqttNotify.cpp
  if (mqttConnectionP == NULL)
  {
    mqttConnectionP = mqttConnectionAdd(false, mqttP->username, mqttP->password, mqttP->host, mqttP->port, mqttP->version);
    if (mqttConnectionP == NULL)
    {
      orionldError(OrionldInternalError, "MQTT Broker Problem", "unable to connect to the MQTT broker", 500);
      notificationFailure(cSubP, "Unable to connect to the MQTT broker", notificationTime);
      return -1;
    }
  }

With this considering that the trigger function from the mqtt subscription is the mqttNotify function from the mqttNotify.cpp file

Let me know what do you think about that, and thanks for this awesome project.

bgramaje commented 6 months ago

Hello, have you had any time on reviewing this? Thanks in advance if you did.

bgramaje commented 5 months ago

Hello, any update regarding this? Thanks