We are migrating to a system where CEDA ingest drives the asset-generator. i.e.
file deposited -> enters Asset-queue -> processed to build facets for asset -> indexed as an asset
Then we rely on aggregating the metadata contained in all assets with the same itemID to generate items. THEN aggregate all items with the same collectionID to generate collections.
What are the triggers which start the item/collection workflow:
A new asset is indexed
The item-description is changed
It makes sense to use a queue-based system for this.
Workflow 1
new file is indexed -> place message in item Q. -> generate item -> place message in collection Q -> generate collection.
Workflow 2
item-description is changed -> trigger update to assets -> place message in item Q. -> generate item -> place message in collection Q -> generate collection.
For an example of changing configuration files triggering a re-index see:
As assets from similar directories/items are likely to arrive close temporally this will send duplicated messages to trigger item indexing with the same itemID and collectionID.
Assets 1 & 2 have item ID x, They push messages to itemID Q with header:
x-deduplication-header=x
The duplicate message will be removed before adding to the Q, as long as a message hit the exchange with the same header within the x-cache-ttl specified time or the key has not bee ejected due to x-cache-size .
Using the delayed delivery plugin, you can also delay the delivery of the message to the queue until x-delay has passed (probably matching x-cache-ttl so that once a message is delivered, a further receipt of the same header would trigger a new aggregation. This should mean that no assets are missed in the aggregation) further increasing the chance that the aggregation step will catch all possible values.
You could also potentially use q based deduplication and not use the delayed delivery. This would result in more repeat actions but, the aggregation step will not be instant so it will reduce the load on Elasticsearch and avoid wasting compute while still allowing "new" changes to be processed.
We are migrating to a system where CEDA ingest drives the asset-generator. i.e.
file deposited -> enters Asset-queue -> processed to build facets for asset -> indexed as an asset
Then we rely on aggregating the metadata contained in all assets with the same itemID to generate items. THEN aggregate all items with the same collectionID to generate collections.
What are the triggers which start the item/collection workflow:
It makes sense to use a queue-based system for this.
Workflow 1
new file is indexed -> place message in item Q. -> generate item -> place message in collection Q -> generate collection.
Workflow 2
item-description is changed -> trigger update to assets -> place message in item Q. -> generate item -> place message in collection Q -> generate collection.
For an example of changing configuration files triggering a re-index see:
General Notes
As assets from similar directories/items are likely to arrive close temporally this will send duplicated messages to trigger item indexing with the same itemID and collectionID.
This deduplication plugin would allow for exchange based de-duplication. e.g.
Assets 1 & 2 have item ID
x
, They push messages to itemID Q with header:x-deduplication-header=x
The duplicate message will be removed before adding to the Q, as long as a message hit the exchange with the same header within the
x-cache-ttl
specified time or the key has not bee ejected due tox-cache-size
. Using the delayed delivery plugin, you can also delay the delivery of the message to the queue untilx-delay
has passed (probably matchingx-cache-ttl
so that once a message is delivered, a further receipt of the same header would trigger a new aggregation. This should mean that no assets are missed in the aggregation) further increasing the chance that the aggregation step will catch all possible values.You could also potentially use q based deduplication and not use the delayed delivery. This would result in more repeat actions but, the aggregation step will not be instant so it will reduce the load on Elasticsearch and avoid wasting compute while still allowing "new" changes to be processed.