Azure / azure-uamqp-c

AMQP library for C
Other
58 stars 62 forks source link

How to provide receive offset or enqueued time filter? #388

Open jo-me opened 3 years ago

jo-me commented 3 years ago

Hi, I'm looking for a way to filter the messages received by providing an offset or time frame that I want to receive. As far as I can see the Event hub supports something like this in combination with AMQP. (https://azure.github.io/amqpnetlite/articles/azure_eventhubs.html) I could not find a hint on how to do it with this lib.

How can this be done in uAMQP?

My use case is that I want to limit the messages received to only the last 10 minutes as older messages are irrelevant.

Thanks!

jo-me commented 2 years ago

So, this thing seems to be dead, but answering to myself:

It is possible using amqp filter sets. In other AMQP implementations this is documented better and I was able to expand upon the uampq receive example to receive only messages arriving after starting the receiver.

Basically, the simple "source" definition (containing the amqp address) needs to be replaced with a more elaborate config as the source will also contain a filter set which contains a query string (see filterByEnqueuedTime below)

So it works like this:

auto filterSet = amqpvalue_create_filter_set(amqpvalue_create_map());

std::uint64_t timeNow = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::string filterByEnqueuedTime = "amqp.annotation.x-opt-enqueuedtimeutc > " + std::to_string(timeNow);
auto selectorFilterKey = amqpvalue_create_symbol("apache.org:selector-filter:string");
auto selectorKey = amqpvalue_create_symbol("apache.org:selector-filter:string");
auto filterEntryValue = amqpvalue_create_string(filterByEnqueuedTime.c_str());
auto filterEntry =  amqpvalue_create_described(selectorFilterKey, filterEntryValue);
amqpvalue_set_map_value(filterSet, selectorKey,filterEntry);

auto newSource = source_create();
source_set_address(newSource, amqpvalue_create_string(("amqps://.....).c_str()));
source_set_filter(newSource, filterSet);
auto newSourceValue = amqpvalue_create_source(newSource);

link = link_create(session, "receiver-link", role_receiver, newSourceValue, target);

Reference links: https://github.com/Azure/azure-event-hubs-c/blob/master/eventhub_client/devdoc/requirement_docs/eventhubreceiver_requirements_ll.md https://docs.microsoft.com/de-de/azure/iot-hub/iot-hub-amqp-support https://qpid.apache.org/releases/qpid-proton-0.18.0/proton/cpp/examples/selected_recv.cpp.html