Open KanaHayama opened 1 year ago
One approach for getting to the bottom of this is to use Pipeline Diagnostics. Create your pipeline with enableDiagnostics
set to true, persist the Diagnostics stream to a store, and then visualize it in PsiStudio. You'll be able to visualize all stream connections to quickly pinpoint, e.g., if you're missing a LatestMessage
delivery policy anywhere. You can also inspect all the delivery queue sizes to see exactly where things are filling up. Take a look and let us know if that reveals anything.
Thank you. Missing a LatestMessage might be a reason. I will reply with my findings.
I think I have an idea of what might be going on. I suspect the images are getting queued inside the Join
component. In order to perform synchronization, join internally holds its own queues with the messages it receives on both the primary and secondary streams (you can take a look at the implementation here). This internal queue is inside the component, and different from the normal psi delivery queue on each stream (which is controlled with the delivery policy). This is necessary to be able to correctly synchronize based on originating times. Imagine you have images (your secondary stream) arriving with originating times 0, 1, 2, 3, ... 100, and the primary (clock in your case) stream has originating times 0, 10, 20, 30, ... 100. Then, after first pairing (0, 0), join will internally queue all the images for originating time 1, 2, 3, ... and so on up until it receives the next clock message. That would be the clock message with originating time 10 (so we need to hold on to messages 1, 2, 3, etc. since we don't know what the next clock that arrives is). Theoretically the clock message with originating time 10 could arrive even later than "time 10", b/c of latencies. Join has to queue and operate that way in order to correctly synchronize. It will release the secondary messages from its internal queue only once it can prove that those messages will never be needed to synchronize with any other primary message.
Now, as to how to change your code to address this increasing memory issue, my suggestion would be to use a dense clock stream. (And this is a more general comment, generally you want to join dense streams). From your comments, it looks like your clock stream is a sparse stream perhaps signaling that a face was detected. Instead, can you construct a dense boolean stream (false when no face, true when face is detected) and do the join, and then filter out on the resulting tuples the ones that do not correspond to a face by using a Where()
operator.
Another alternative you can use, but this loses the exact match synchronization based on originating times is the Pair
operator, or a Fuse
with the Available.LastOrDefault
interpolator. You can read more about it here, if you look for explanations about Pair
and Available.LastOrDefault
. In this case the secondary messages are not queued internally, but rather only the last secondary messages is memorized. However, that also means that the pairing of messages that happens is no longer a guaranteed synchronization on originating times, as the results of the pairing will depend on the wall clock time of arrival (and hence latency) of the messages.
Hope this helps, but let us know what you find.
Hello, I am writing a composite component. The memory usage of the shared memory pool grows when a Join operator does not receive data from its primary stream.
The secondary stream of that Join operator is a video stream. I set delivery policies to
LatestMessage
so nothing should be queued at receivers' queues. The growth of memory usage stops when a new data frame is available on the primary stream. That means, images on the secondary stream are queued, and the shared image pool is requesting more memory from the operating system.I do not know why this happens, please help. Here is my code:
Thank you