Open spetz opened 10 months ago
Is this still something we want? If yes, may I take a shot?
Yes, this would be a great feature to have!
Here is the high-level idea. Let me know if it makes sense to you.
Since the messages are partially ordered (i.e. comparing offsets between partitions doesn't make sense), we should do compaction at the partition level. For each partition, we can iterate the messages from the end to the begin, and keep all offsets whose message id we have met before. For these offsets, we should still keep them in segments so that we won't affect other logic too much. However, we would update the message states to MarkedForDeletion
(or Unavailable
? Not sure the meaning), and clean the payload. Now, we should have a list of updated messages in memory. With all the updated messages, we can call Persister.overwrite
to overwrite all segment files, including the log and indices.
I think we can trigger the whole logic in the existing CleanMessagesExecutor
. Otherwise, we may need to handle synchronizations.
Still need to think about how message polling should handle these "deleted" messages. One option is skipping the non-Available
messages in the server. The other option is letting clients handle them, since the message state is already in the SDK's interface.
@spetz If this makes sense to you, I can start writing some prototype.
Apologies for the delayed response, as I didn't read the previous notification.
Yes, your idea sounds good! I think, we could try to achieve something similar like in Kafka for example. Once you append the message with the same ID
(in such case provided by the client) with an empty payload (or something else like a flag or so, to be discussed), the previous messages using the same ID
would be discarded and overwritten by the new value (in case of an empty payload, simply deleted or marked as deleted).
The message states have been added some time ago, for the possible future usage, so feel free to consider any states names that'd make sense for this feature. Speaking of polling the messages, I was also thinking that the client could decide e.g. poll only the available ones, or also include the others with some different state.
Since we need to overwrite the three segment files (log, index, time_index) while doing compaction, and the segment files are still being read at the same time, we need some synchronization. I'm thinking this for closed segments:
Segment
.${offset}.log.swp
) first. After the writing is done, we acquire a write lock. We then rename these new swp
files to the "normal" files.
swp
files are ready. For example, before writing to swp
files, we create a file called ${offset}.compacting
. After the writing is done, we remove the compacting
files first before renaming. While recovering from crash, we check if the compacting
file exists. If yes, we remove all swp
files. If no, we rename all remaining swp
files.It is even more tricky for open segments since there can be new messages. I think we can skip compaction on open segments to avoid the complexity. The storage saving from open segments is small anyway.
Good points! I think that we could start with the simpler approach, being compaction only on the closed segments - this way, such operation could be done somewhere in the background job without interfering with the active segments.
As described here, log compaction could be a new kind of retention policy, allowing to override (and even delete for the null/empty payload) the old values for the same ID, which could be also helpful for both - storage optimization and GDPR compliance.