Open petersilva opened 4 months ago
DMS messages depart from the Sarracenia convention ( #1017 ) in that they insert an extra topic between the prefix and the path. you have v02.post.msc-dms-dev.path.
sr3 components, following the algorithm described above build the outbound topic as v02.post.path This behaviour is different from v2. Analysts have said they expected the topic to be preserved.
Digging in...
Briefly:
If we want the DMS case to work as-is:
I don't know what to do... I think I'm happy with the way it already works, but if we need to make it work as v2 does, then we can add the topicPreserve switch... that would let sarra work with arbitrary topic schemes. Yeah topic schemes is one of the things that are different on different networks, so might be ok.
A reason you cannot keep the "topic" whole and just copy it to the end:
So if we want to review the overall topic processing... need to keep these cases in mind.
Another option, build the topic as new something, the new somethings only replace the old ones when download=True. that would get "correct" processing without an option... now... new_ what?
A weird thing is that the deriveTopic() pays NO attention to the subtopic value in the message, that's strange. It only uses relPath... so subtopic and maintenance of it's value, is completely useless... hmm...
Change the name topicPreserve, to topicCopy... to match timeCopy, and permCopy.
I actually think that the current behaviour is optimal, and what the v2 does is kind of wrong... so I'm thinking maybe topicCopy=False everywhere, but in the v2 converter, we stick a topicCopy=True for compatibility.
One of the things we found bad and confusing in v2 was too many options having different defaults in different components, and people wanting to set everything because they didn't know what the default is here. So I'm thinking default False everywhere.
It would be good to also emphasize this in the docs once implemented.
Maybe even have sr3 convert
add a note saying # adding for v2 compatibility
or something of the sorts.
topicCopy isn't working. I added a log at the end of deriveTopics. Here's the message by the time deriveTopics returns:
{
"_format": "v02",
"_deleteOnPost": {"_format", "subtopic", "source", "topic"},
"rename": "remove",
"sundew_extension": "removed",
"to_clusters": "ALL",
"x-delay": 180000,
"source": "feeder",
"pubTime": "20240821T174944.945",
"baseUrl": "removed",
"relPath": "/data/dir1/dir2/dir3/dir4/dir5/dir6/202408211749/4012403/dir7/orig/auto",
"subtopic": [
"",
"data",
"dir1",
"dir2",
"dir3",
"dir4",
"dir5",
"dir6",
"202408211749",
"4012403",
"dir7",
"orig",
"auto",
],
"identity": {"method": "arbitrary", "value": "1ef587f960964ad50c80c01cb45110e3"},
"size": 10737418240,
"exchange": "xs_removed",
"topic": "v02.post.dms-stage.dir1.dir2.dir3.dir4.dir5.dir6",
}
The topic is correct (assuming it's supposed to be a string) at this point. The subtopic is wrong, and I don't know where subtopic has come from. When topicCopy is True, deriveTopics doesn't set subtopic in the message.
Something that seems odd is that deriveTopics adds the topic header to be deleted on post.
putNewMessage in moth.amqp deletes all the delete on posts fields, I assume that includes topic. So we're setting topic, then deleting it before using it?
After deleting topic from the deepcopy of the message named body, this is how putNewMessage gets the topic it uses to post:
Then that looks for topic in the message it's passed, and if it's not there, will use the subtopic:
https://github.com/MetPX/sarracenia/blob/development/sarracenia/postformat/__init__.py#L78
I think if we just don't have topic in the _deleteOnPost set, it will probably work.
But I'm also confused about why we have two different topic derivation methods and wonder if it would be possible to get rid of one?
sarracenia.__init__
https://github.com/MetPX/sarracenia/blame/087c5e52ab5db043cbcb651834dec71e991df367/sarracenia/__init__.py#L521 used when receiving messagesI tooks some time to look through the code a bit... yeah it's hard to read. and I tried testing it, and yeah it doesn't work right.
observations so far:
The messages we receive on the wire are not the same as the messages in the sr3 application. A v02 message is quite different from the sarracenia.Message() dictionary used in sr3. the 'topic' and 'subtopic' fields are not what is literally on the wire, but what is processed, and they aren't literally what is sent either. That's why those fields are tagged for deletion. Sarracenia.Message is the internal sr3 representation of a protocol neutral message.
including them in the published message wouldn't help anyways, because:
The topic recognized by the broker is not a field in the body of the message that is published. msg['topic'] is just a field in the internal application representation. It is used to build the topic for the message to send.
The topic of a message passing protocol message is a property in a header.... so looking at the message body and it not being in there is correct. The topic goes in the header dictionary... not the body. That applies to both AMQP and MQTT messages. The topic is NOT in the body when a message is published. the postformat class creates actual representations of messages to feed to protocol libraries, or decode from them.
the line is: headers = { 'topic': PostFormat.topicDerive(body,options) } ... and then exportMine returns headers as a separate return value. (the export returns a tuple: ( body, headers , content_type ) (TOPIC is in the headers)
The deriveTopic in the sarracenia.Message is a way of setting the 'topic' and 'subtopic' fields in the internal message representation... based on settings like topicCopy. This is for creating messages' internal representation.
The topicDerive routine in postformat is to help with on the wire representations, what will actually be posted in a message (header) and it uses those 'topic' and 'subtopic' internal representation fields to build the actual topic for the message, as a list of words (topic separator depends on protocol.)
The naming isn't great, and the documentation sucks, but I have not found anything wrong in what I have reviewed so far.
Steps for reproduce:
The topic override seems to work, but I'm not sure if the topicCopy does yet. have to play some more.
I need to do more testing to confirm this, but I think the problem is that the topicDerive
method from PostFormat is used to derive the topic that gets posted. topicDerive is passed a message and is supposed to determine the topic to be posted from the message.
The first case in that method is if 'topic' in msg:
... it will use that topic. The problem is that before topicDerive is called, topic
has already been deleted from msg
. So topicDerive falls through to its next cases, first checking if topic is in the config, then deriving it from the relPath or subtopic fields in the message.
ok the logic was messed up. I think I fixed it in issue1023_take2 branch.
anyways... this branch might be ok. have not tested yet myself.
I just tested on dev, but the topic being posted is just v02.post
. I think that's actually a step in the right direction :D
- if 'topic' in msg:
- if type(msg['topic']) is list:
- topic = msg['topic']
+ if topicOverride in msg:
+ if topicOverride is list:
+ topic = topicOverride
i think it should be if topicOverride:
instead of if topicOverride in msg
i made that change locally and it works!
Please push to the branch!
nvmd I did it. submitting PR.
sorry, I had made and tested the change directly on ddsr dev where Git isn't set up for pushing. Then you made the commit faster than me 😄
no worries, I figured it was something like that.
so... please re-test to confirm when you have a chance.
Yeah, I did more testing and confirmed that with messages from DMS, where the incoming topic (routing key) doesn't match the path, the incoming topic is always being used.
Even with topicCopy set to False, in the DMS case, we're hitting this if statement:
msg_topic does not equal path_topic, so it sets topicOverride True.
I think there are only these cases:
1) topicCopy is True: use the incoming topic from the message (this part in the code is fine) 2) topicCopy is False: derive the topic from the relPath. (I don't think we care if the path_topic matches the incoming msg_topic? But maybe there's a reason for checking) 3) Maybe a third case: topicCopy is False and there is no relPath in the message. This is technically an invalid message because our documentation says relPath is a required field, but we could fall back to using the incoming msg_topic.
It seems like the method could be simplified down to this:
def deriveTopics(msg,o,inbound_topic,separator='.'):
"""
derive subtopic, topicPrefix, and inbound_topic fields based on message and options.
This is used on message receipt to set up fields prior to processing.
"""
# when topicCopy is off derive subtopic from msg, don't need to set full topic
if 'relPath' in msg and (not ('topicCopy' in o and o['topicCopy'])):
# i'm not sure if it's better to set subtopic to a string or a list. String works. List might work too.
msg['subtopic'] = os.path.dirname(msg['relPath'].lstrip('/')).replace('/', separator)
# topicCopy is ON or relPath not in msg
# deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
else:
msg['topic'] = inbound_topic
msg['_deleteOnPost'] |= set( ['topic'] )
The subtopic in the message is being set as expected, but something else later is ignoring it. With topicCopy off, the messages get posted with just topic v02.post
. WIth topicCopy on, the source topic is used like it should be.
I tested this change using the development branch, and with topicCopy off it does work as expected, so whatever is causing the subtopic field in the message to not be processed correctly is in the issue1023_take2 branch.
But I'm still confused about the topic derivation happening in two places.
I wonder if it would be easier to just store the incoming "wire" topic inside the message and do all the topicCopy/topic/subtopic derivation using the topicDerive method in postformat?
In that method, if topicCopy is enabled, we'd use the wire topic field from the message.
The topicCopy really affects when the message is read. without topicCopy, the "topic" field in the internal message should not be set. Instead there should only be a subtopic set.
OK there is a other cases you are not seeing in your experiments:
when someone does a -t v02.post.override on the command line of a post, or in the configuration file, they do topic v02.post.override then this has the effect of setting the topic field in the message, and it does have the effect of being a topicOverride, without necessarily being a topicCopy.
When we receive a message and the beginning of the topic does not match the configured topicPrefix, then it is clear that the received topic does not match sarracenia expectations, so it turns on topicOverride, and effectively implements topicCopy, to avoid corrupting an invalid one (kind of Postel's maxim kind of thing.) We can't tell where the topic prefix ends, so we can't set the subtopic properly, so just set a topic override.
Is it possible the DMS, when topicCopy is false, runs into the latter case (a topicPrefix mismatch)?
In terms of not transforming the topic: sr3 is expected to take an input in AMQP and output in MQTT and vice versa. Just change a broker setting and go. The resulting topic trees are completely different though. As an example, receiving an v02 message in AMQP, and publishing to MQTT results in:
what's going on...
sr3 also supports v02, v03, and perhaps other formats in the future, and their topic conventions are different... (v02 has .post in the normal prefix, where v03 is just v03 ... but in a lot of tests we set the prefix to v03.post just to make testing easier.) so when changing the message format on input and output, the topic also needs to be changed... typically.
It seemed simplest to transform the received topic on input into a neutral format, with only the subtopics as a list, and the topicPrefix separate, and the separator also, and then assemble a correct topic for a correct protocol and postformat at publication time.
I think that's all there is to it. If you have a better way to do it that correctly implements the various cases... that's fine, I have no attachment to how it works currently.
There isn't any "correct" way to handle topics when a client consciously decides to deviate from conventions. We just have trade offs. in this example, if we receive a message with an invalid topic, the safest thing to do is barf. and reject the message. We implemented code to effectively set topicCopy when a bad topic is received in order "save" it from being discarded.
The right thing to do... is likely to discard it. That would force topicCopy to be set, and then the intention is clearer.
I guess the thing to do is develop a suite of tests for all the different cases. Also please note that the very common/default use case is for topics to be generated automatically from files downloaded... where the relpath changes, and the topic needs to be updated to match.
developing tests would make it a lot easier to replace.
options ... discussed:
Proposed behaviour:
In sr3 convert the topicCopy on option will be added for best compatibility with v2, but default in sr3 will be off, as the sr3 behaviour is likel more correct than what v2 does.
Starting point:
so whenever a message is received/parse it is split to have msg['subtopic'] set, and that is combined with the post_topicPrefix on publication.
The above is how it works now.