Open petersilva opened 4 years ago
this is part of #392
I thought of a grammar for this. We add a subscribe keyword to the config grammar. for AMQP:
subscribe *broker* *exchange* *topic*
an example would be:
subscribe amqps://hpfx.collab.science.gc.ca xpublic v02.post.*.WXO-DD.bulletins.alphanumeric.#
subscribe amqps://dd.weather.gc.ca xpublic v02.post.bulletins.alphanumeric.#
This would subscribe to different topics on different brokers.
the same verb for MQTT would be missing the exchange... just have a topic.
@reidsunderland you mentioned publishing to multiple ... who was interested in multiple subscribes?
actually, we don't need to change anything in the grammar... we just have multiple exchange and subtopic declares and make relate them (so that each subtopic refers to the last exchange...) could do the same thing with broker... just repeat it.
the same way directory & accept work for scoping.
That sounds good to me -- making it work the same way as directory and accept.
I don't know of anyone with an actual need for publishing or subscribing to multiple exchanges/brokers
use case:
Sample use case:
if you're doing HA, then it would be really cool to define two upstream sources, and use duplicate suppression on them. vs:
What we do today: we define two shovels that feed a winnow, and subscribe to the winnow output. ... so four configurations to one... and you need a local broker.
so not absolutely essential... but a heck of a lot simpler.
reading the source... gathering from multiple exchanges already works in the python one...
exchage zzz
topicPrefix ...
subtopic A
exchange yyy
subtopic B
will result in bindings of: subtopic B to exchange yyy, and subtopic A to exchange zzz. but it all has to be to a single upstream broker. That was hard to fix in v2, but in sr3, it should be easy.
I tried to use multiple exchanges from the same broker today, and it didn't work, on v.3.00.52 :(
sr3 show shows:
'exchange': 'xs_wxofeed_marine_source2,
the config:
nodupe_ttl 1h
#nodupe_basis path
broker amqps://username@broker.com/
exchange xs_wxofeed_marine_source1
subtopic #
exchange xs_wxofeed_marine_source2
subtopic #
queueName q_${BROKER_USER}_${PROGRAM}.${CONFIG}.pxatx
post_broker amqps://out-user@outputbroker.com/
post_exchange xs_out-user_winnow_marine_out
instances 1
identity arbitrary
exchange
is a string option:
I could work around it by manually binding my queue to both exchanges 🤔
Try changing exchange into a list option, and see what happens? I recall trying it long ago... It worked at one point.
I looked into trying this, but there are quite a few places where exchange is expected to be a string.
The test isn't the exchange values, it's what ends up in "bindings". Bindings is what is fed (in amqp) to the amqp bind verb... in mqtt it's fed to the "subscribe" verb. You test should a line with "exchange", then a line with "subtopic" then a second line with a different exchange, and what triggers the binding is the second "subtopic" occurrence.
This already should work and has been this way for a long time. I've not tested it, but I looked at the bindings on the broker, and they are what I expected. Would be worth testing. Do sr3 show component/config |& grep bindings
sample config:
broker amqps://dd.weather.gc.ca/
topicPrefix v02.post
# instances: number of downloading processes to run at once. defaults to 1. Not enough for this case
instances 5
expire 10m
exchange xpublic
subtopic bulletins.alphanumeric.#
exchange xs_anonymous
subtopic bulletins.alphanumeric.#
directory /tmp/dd_amis
It already works... you just need to repeat the subtopic verbs... as long as you stay with the same broker, it should be ok:
SSC-5CD2310S60% sr3 show subscribe/dd_amis.conf |& grep -A2 binding
'bindings': [('xpublic', ['v02', 'post'], ['bulletins.alphanumeric.#']),
('xs_anonymous', ['v02', 'post'], ['bulletins.alphanumeric.#'])],
'broker': 'amqps://anonymous@dd.weather.gc.ca/ None True True False False '
SSC-5CD2310S60%
It works now, using the same config that I tested with unsuccessfully before. I'm not sure what the problem was, but multiple exchanges on the same broker works.
I was using v.3.00.52 and 3.00.53 now, but I don't think there were any relevant changes.
fwiw, I just tested with v2... it works there also:
2024-08-01 21:28:19,926 [INFO] AMQP broker(dd.weather.gc.ca) user(anonymous) vhost(/)
2024-08-01 21:28:19,926 [INFO] Using amqp module (AMQP 0-9-1)
2024-08-01 21:28:20,182 [ERROR] standard queue name based on: prefix=q_anonymous program_name=sr_subscribe exchange_split=False no=-1
2024-08-01 21:28:20,182 [INFO] Binding queue q_anonymous.sr_subscribe.dd_amis.46684231.54951590 with key v02.post.bulletins.alphanumeric.# from exchange xpublic on broker amqps://anonymous@dd.weather.gc.ca/
2024-08-01 21:28:20,182 [INFO] Binding queue q_anonymous.sr_subscribe.dd_amis.46684231.54951590 with key v02.post.bulletins.alphanumeric.# from exchange xs_anonymous on broker amqps://anonymous@dd.weather.gc.ca/
2024-08-01 21:28:20,228 [INFO] declared queue q_anonymous.sr_subscribe.dd_amis.46684231.54951590 (anonymous@dd.weather.gc.ca)
2024-08-01 21:28:20,268 [INFO] reading from to anonymous@dd.weather.gc.ca, exchange: xs_anonymous
2024-08-01 21:28:20,300 [INFO] report_back to anonymous@dd.weather.gc.ca, exchange: xs_anonymous
2024-08-01 21:28:20,301 [INFO] sr_retry on_heartbeat
2024-08-01 21:28:20,301 [INFO] No retry in list
just to recall, the ability to bind to multiple exchanges was assumed from the beginning. This issue is about being able to do the same, but with multiple brokers... meaning multiple connections to different brokers... being active. It's still an outstanding enhancement opportunity.
OK, I have a branch ( https://github.com/MetPX/sarracenia/tree/issue339 ) that does Step 1: ability to set up two brokers as upstream sources to consumers, and pass all flow tests.
Things to worry about:
so looking at queueName becoming a binding relative option... then it drags along with it:
So all of that has to be added to the binding data structure. hmm...
submitted part2 ( https://github.com/MetPX/sarracenia/tree/issue339_part2_queueNames ) puts all the new settings in a bindings dict. so now one can set different queueNames for different brokers, but it the settings don't quite stick because of .qname files, and dealing with state.
part3 requires changing how things are persisted... options:
After part3, the settings for queues should be per binding scoped.
stuff that remains after that (part 4+):
multiple bindings for the same broker should be grouped into a single queue declare. current methods treat every binding as individual... if the settings are the same, and only the subtopic differs, it should be grouped in the same binding.
Elegantly tolerate individual broker failures (keep working with good ones.)
After the above, one could say that this issue is truly resolved. however it gives an opportunity for improvement:
another way for quenames would be to make the quename file a JSON dictionary indexed by broker, instead of just a name.... call that option 3 for that aspect.
for documentation of existing behaviour:
update: there is a long parallel branch issue339_feature_multibroker_subscriptions and it passes all the tests and works with multiple subscriptions, but:
Will probably do a PR after the cleanup stuff is working, then do a separate PR for the broken broker work-around stuff.
working on the cleanup stuff for now.
abandoned first branch... This is a second branch that is more about #35, but it is a pre-cursor to doing this feature later.
with v3 and use of plugins for gather... can we just subscribe to x different brokers and get messages from all of them with a single flow?
this probably means that the error recovery needs to change so the we retry the broken gather sources, but also come back, and use the others...
one could just list multiple brokers, and the subscribe would connect to all of them at once, checking each one in turn.
(implementation for that case would be adding plugin.gather.message multiple times, each with a different broker argument...) would require some other changes... perhaps specify different exchanges, per server?)
Can also do a winnow in a single subscriber process... (using cache, and two upstream sources.)
would be cool, and not that hard to get done in v3.