Closed cboulay closed 7 months ago
@cboulay This one was a doozie. I wish I had made a few more comments for myself here when I wrote this.
Each publisher maintains a number of message buffers -- I believe the default number is 32. When a unit publishes a message, the publisher writes data to the next available buffer, and that buffer is flagged as "in-use" until all currently connected subscribers have been notified/serviced, their coroutines have concluded, and they've all replied back to the publisher RX_ACK
(or disconnected). At that point is that buffer marked as available and can be overwritten.
Imagine now that we have a passthrough unit that subscribes to a stream using zero_copy = True
. It receives a message from an upstream publisher with no additional copy, and returns that same object. This subscriber has now been serviced and the upstream publisher's buffer is now marked as available. Because this passthrough has returned the same object (which is referencing memory in an upstream publisher's buffers), it now passes this reference along to the next subscriber, but the publisher that originally wrote that object could overwrite that buffer at any moment.
In short -- this extra deepcopy is performed if and only if our subscriber is receiving data with zero_copy = True
, and our coroutine is attempting to publish exactly the same object that was received. It is required to enable the upstream publisher the ability to overwrite that buffer without corrupting downstream subscribers.
and our coroutine is attempting to publish exactly the same object that was received
Ah, that's what the and obj is msg
part means. OK that makes sense to me.
What if a subscriber has zero_copy = True
, and its method yields a new AxisArray (obj is msg
should be False), but the .data payload is just a view of the input AxisArray's .data. My intuition is that when the upstream buffer is overwritten, e.g. with a new AxisArray, then the buffer's reference on the ndarray should be invalidated giving ownership to the downstream viewarray. Does that sound right? Maybe I should test it explicitly.
I'll also test some zero_copy = True scenarios when I'm 100% confident the method is not mutating the input.
Yeahhh your intuition is correct here, at the end of the day, zero_copy = True
opens us up to a lot of possible issues with memory invalidation. Admittedly, this little check is really only a half measure to protect from a very specific issue that I thought might come up more frequently if units had passthrough modes, but in practice I don't find myself building passthroughs that often.
I'll also add that originally ezmsg was shared-memory only; that is, there was no concept of local reference storage or a message cache. No matter where our subscribers were, we'd always push to shared memory and zero_copy = True
only ever revealed a read-only buffer view of shared memory to the subscribers; and when they'd publish, there'd always be a guaranteed copy on publish to shared memory.
We needed a bit more performance so I came up with the MessageCache that first stores a reference to the published object in a dictionary for other local (same process) subscribers to access. If a subscriber exists in another process, the MessageCache doesn't have that object, and there's a CacheMiss -- then the publisher pushes the message to shared memory and its reconstituted on the other side from shared memory.
I hadn't been considering downstream effects when I implemented this, but MessageCache stores mutable references and its certainly possible for one subscriber to mutate an object in the MessageCache and for other subscribers to receive mutated data.. I don't believe there's a way to yield a non-mutable reference to an object in the general case.. I only realized this is a possibility while reading through the code last night. I think I need to make a new issue about this behavior and figure out ways to handle message caching more safely.
What is the goal of MessageCache[].put
in this line?
I see the MessageCache is then used later to push
if the pub and sub are on the same node but not the same process.
But what about when they are on the same process, does put
serve a purpose?
Edit: Re-reading your previous post I see that this is probably the "first stores a reference". If it's indeed just a reference then it's probably cheap and it's OK to do this step even if none of the subs are in a different process or node.
Aye, you've got it. I'm a bit terrified about the implications of handing this mutable reference to every subscriber..
I think I understand this enough now that I can close the Question. I'm sure I'll revisit this topic at some point when doing an optimization pass.
In the following code block, we are considering a Unit with both
@ez.subscriber
and@ez.publisher
, and the subscriber decorator 9aszero_copy=True
.https://github.com/iscoe/ezmsg/blob/b6c1c9462d23e976b0ced43dc96c58c36a647024/src/ezmsg/core/backendprocess.py#L314-L320
I believe what's happening is that it is skipping the copy on the input, then on the output -- if the output message is the input message -- it creates a deepcopy of the output. (units that successfully modify the output do not need a copy)
If a unit is operating in passthrough mode for whatever reason, why do we need to make copy before passing the msg? Isn't it the responsibility of the downstream unit's subscriber methods to copy its input (which is done automatically with the default with zero_copy=False) before it modifies in-place?
Overall I think this is quite a minor issue because most units do now have a passthrough mode and if they do the passthrough should be infrequent.
Is there something else that requires the copy? Like network dispatch or shmem usage?