Closed mrutid closed 1 year ago
Last Data current feature: https://fiware-cygnus.readthedocs.io/en/latest/cygnus-ngsi/flume_extensions_catalogue/last_data_function.html Cygnus is capable to perform a upsert operation on the following Sinks:
PostgisSink
PostgreSQLSink
MySQLSink
In order to perform this operation Cygnus needs five keys.
last_data_mode This is the mode of operation: upsert, insert or both. Default is insert mode
last_data_table_suffix This is the suffix that will be added to the table name to perform the upsert operation.
last_data_unique_key This is the reference to indicate to the database engine which is the reference key (or list of keys separed by a comma) to perform the upsert.
last_data_timestamp_key This is the timestamp reference to know which record is the newest.
last_data_sql_timestamp_format This is the timestamp format to indicate to the database how to cast the text timestamp to know if the stored record is older than the one trying to insert.
The upsert mode performs a transaction where runs upsert querys, if any one of them fails, then the other one is rollbacked. This means they have to be run successfully by Cygnus to store on the database.
New Attr received in subscription should not be stored, just used by lastData sinks
Example of orion subscritpion with alteratinoType entityDelete:
{
"subject": {
"entities": [
{
"idPattern" : ".*",
"type": "T"
}
],
"condition": {
"alterationTypes": [ "entityDelete" ]
}
},
"notification": {
"http": {
"url": "http://127.0.0.1:'${LISTENER_PORT}'/notify"
},
"attrs": [ "alterationType", "*" ]
}
}
Example of event received by cygnus with an entityDelete (after delete a entity in orion):
time=2023-05-09T07:57:19.210Z | lvl=INFO | corr=bd678eff-bd84-4bd4-aa24-f76ff9e35508; cbnotif=1 | trans=ac681c4d-943c-4e9c-b04f-c7ebcf92d0e4 | srv=smartcity | subsrv=/ | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[327] : [NGSIRestHandler] Received data ({"subscriptionId":"6458d7f796c36cf539052b5a","data":[{"id":"thing:dispAB","type":"thing","TimeInstant":{"type":"DateTime","value":"2023-05-09T07:56:48.640Z","metadata":{}},"a":{"type":"num","value":null,"metadata":{}},"alterationType":{"type":"Text","value":"entityDelete","metadata":{}}}]})
{
"subscriptionId":"6458d7f796c36cf539052b5a",
"data":[
{
"id":"thing:dispAB",
"type":"thing",
"TimeInstant":{
"type":"DateTime",
"value":"2023-05-09T07:56:48.640Z",
"metadata":{}
},
"a":{
"type":"num",
"value":null,
"metadata":{}
},
"alterationType":{
"type":"Text",
"value":"entityDelete",
"metadata":{}
}
}
]
}
alterationType is adding as another attribute of entity and should be processed by cygnus before sent to sink upsert (lastdata) with the logic related to alterationType (create, update, delete...)
Issue in Orion repo: https://github.com/telefonicaid/fiware-orion/issues/4325
PR https://github.com/telefonicaid/fiware-cygnus/pull/2237
As a side note, Orion issue is also fixed in its repo
Maybe with a flag at LastData SINK config, provided the NGSIv2 sub gives information regarding Entity Deletion it should be propagated to SQL repositories so Entities and registers remain synchronized