streambed / streambed-rs

Event driven services toolkit
Apache License 2.0
32 stars 5 forks source link

Record compaction #26

Closed huntc closed 1 year ago

huntc commented 1 year ago

The "logged" commit log project requires log compaction, and a means for an application to convey what records are to be compacted.

Kafka compaction is limited. Two types of compaction are offered: time-based; and topic based. The time-based approach will remove records created prior to some configurable cutoff. This may result in removing records that are important for the purposes of sourcing events to build a new state e.g. a "name changed" event occurs when an IoT end device is provisioned, but this happens once in the lifetime of an end device - this earlier event may be removed and the name is therefore lost.

Other data retention policies can be set, such as one for the size of all the records in a topic.

A problem with these approaches in Kafka is that they are an operational concern, and not an application one. Yet, an application best understands how many records for a given key should be retained. For example, in the case of a "meter read" event for an energy metering system, perhaps only the last year's worth of events should be retained. These events may appear in the same topic as other events and so a fine-grained compaction is required to remove them. Topic compaction for a given key is inadequate as it will remove all but the latest record.

We would like a means of compacting a log based on the application conveying which record is the oldest one to be retained for a given key. Compaction should also run with reasonable regularity so to avoid: a) hundreds of records needing to be scanned to source state, and b) keeping the storage requirements of the logged project to a minimum. Logged is able to run on single-core devices running at 512MHz with just 10MB of flash storage and so it is important to uphold these performance goals.

huntc commented 1 year ago

The current behavior is that when a commit log is successfully appended it will return the offset it was produced at.

Logged could adopt an approach where the produced offset is passed to an application function that reads the commit log up to that produced offset for a given topic and key. This function would typically be invoked on a separate task from the one that has been appended to facilitate unimpeded writes.

The application function's goal is to convey the earliest offset to be retained.

The process of compaction would be performed for a number of distinct record keys that are accumulated in a map known as the "compaction map". That number of keys would be decided on by the application as it impacts the amount of memory to be allocated. Compaction could be triggered either by the number of distinct record keys that have been accumulated or by an idle timeout, again configurable by the application.

A commit log represents a single topic and is held in one file presently. Going forward, a commit log will still represent one topic but it will be able to be split. Upon having been split, there are two commit log files; an "old" one and a "new" one. The old commit log is immutable and so is able to be safely traversed for the purposes of compaction. The new commit log is available for other threads to continue writing to.

Given a compaction map of topics + keys -> offsets, the compaction process can split each commit log and walk through the old commit log files copying records to new files where they: a) do not appear in the aforementioned map, or b) appear in the map but have an offset greater or equal to the offset value. Once the new file has received its cleansed log, the old file is atomically replaced by it.

huntc commented 1 year ago

Would be interested in your thoughts if you have some @breckcs. :-)

stevewillcock commented 1 year ago

This is an interesting problem to solve! Trying to approach this from the perspective of an application developer using logged to build a stateful application and now tasked with writing the specific compaction function for the application, a few thoughts come to mind:

huntc commented 1 year ago

This is an interesting problem to solve! ...

Thanks @stevewillcock! There's stuff to discuss here for sure. Meanwhile, I might need to lay out some of other things, some that you touch on:

So, for a "name changed" event, the record's key would represent both the "name changed" record type AND the entity that it relates to e.g. use the topic 8 bits of the u64 to represent NameChanged and the bottom 54 bits to hold 105 where 105 is the id of the entity. We may also have a BatteryLevelRead event for recording that a battery level has been read for entity id 105 and held in the same manner. Both of these records of events would be in the same topic and, given the same partition-id, the two records would appear in the order they are appended.

Thus, if I append another NameChanged-105 event record and my compaction function requires only the latest one, then the previous NameChanged-105 event record will be effectively removed, but not records for other events; not until their next event record is appended.

Here's a stab at what the function could look like (there's a lot of room for improvement):

let key = NAME_CHANGED_EVENT << 56 | entity_id;
cl
    .produce(ProducerRecord {
      key,
      ...
    })
    .and_then(|offset| compactor.send((offset, key, || (), |state, _record| (state, CompactionAction::Discard)))
    .await;

The idea of the above code is to show that a key can be constructed of both a record type and entity id, and also that an FnOnce(ConsumerRecord) + Send -> (S, CompactionAction) is sent to a compaction task across a channel. The compaction then invokes the function up to the provided offset, or to the first CompactionAction::Keep, and only where the record_mask matches. In this case, all records of the same event type will be discarded up to, but not including, the newly written offset.

I've included state also so that more sophisticated algorithms could be implemented. In my example, the initial state is unit as nothing sophisticated is required.

Given that the above strategy of "keep latest" might be super common, we could even have a convenience function like this:

    .and_then(|offset| compactor.send(Compactor::with_keep_latest(offset, key))
stevewillcock commented 1 year ago

I wish to avoid snapshotting for this specific use case as it adds complexity given that you have to deal with events and snapshots, instead of just events (I see snapshotting being more useful for maintaining projections)

Sure, I don't think there needs to be specific support for this in logged but there's nothing to prevent an application developer doing this if they are trying to limit the size of their log. For example, I could create a SnapshotCreated event in my application code with an associated entity that encapsulates the important state of the thing I'm dealing with up to that point (or some part of the state), send that off to logged, and once that has been processed and I've done some sort of validation to ensure the state is valid and there were no events in the meantime which would invalidate the state I just saved then I'm free to delete any earlier events for that "entity id" if I wanted to, without losing any of the state I care about. This wouldn't be appropriate for all use cases but it's something that an application developer may want to implement for some applications. I think the approach you've outlined for compaction would already support this use case.

I might need to lay out some of other things...

That all sounds good. Using event key and entity id as the compaction key is interesting, that should allow for fairly fine-grained deletion. I'm not sure what the state in your example function is - is this the application state for an entity or some internal logged state?

huntc commented 1 year ago

I'm not sure what the state in your example function is - is this the application state for an entity or some internal logged state?

it’s something that function can use for any purpose eg count the number of specific events and retain just the last 10.

huntc commented 1 year ago

Capturing a conversation between Arnold and I...

arnolddevos commented 1 year ago

Elaborating on the 2nd point above:

arnolddevos commented 1 year ago

The concept of a compaction key is derived from the Kafka design. In our design the key is the interface between the application-specific compaction strategy and the log machinery that actually rewrites the log.

Keys are stored in the log alongside events. This is an optimisation. They could be computed from events on-the-fly. The optimisation saves some compute. It would also save IO if we used a separate file for the keys.

There is a disadvantage to storing keys. It weakens our security. Keys are derived in a deterministic way from the unencrypted events. At least part of the event can be learned from the key bypassing the encryption. Moreover, the encryption can in principle be attacked using this knowledge.

huntc commented 1 year ago

Keys are stored in the log alongside events. This is an optimisation. They could be computed from events on-the-fly. The optimisation saves some compute. It would also save IO if we used a separate file for the keys.

I'm unsure how much IO would be saved but accept that there may be some. If we're keen to have a separate index file (I think Kafka does) then we should measure performance in this respect. Also, the trade-off in complexity in managing an index should be considered. Code size and memory usage is important to us.

There is a disadvantage to storing keys. It weakens our security. Keys are derived in a deterministic way from the unencrypted events. At least part of the event can be learned from the key bypassing the encryption. Moreover, the encryption can in principle be attacked using this knowledge.

This is a good point and we should encourage the user to be considerate in terms of exposing an entity id as part of the key (for example), or indeed any other part of a record given the unencrypted state of a key. That said, exposing the type of record and an entity id is perhaps as reasonable as exposing the topic name (which we do). We are still avoiding exposing the record as a whole.

breckcs commented 1 year ago

Would be interested in your thoughts if you have some @breckcs. :-)

Very interesting discussion. I'm keen to follow along as it intersects with some of my current needs.

In the systems I've worked on, I've never used a log as the backing store for all state changes. For example, as already mentioned, the "name changed" event only happens once, so that would just be persisted in a file or a lightweight database (and considered metadata, not event data). I've used logs for 1) sharing data (e.g., another process subscribes to the log to write the events to an embedded historian) or 2) replicating data (e.g., replicating data to the cloud). There is no need for compaction of these logs and a combination of time-based and size-based retention works well (just delete the oldest file segment in a topic).

I'm curious what it would look like to back all state changes with the log and what compaction would demand in terms of IO, CPU, storage degradation, and predictable performance.

I would also keep in mind what this pattern looks like scaled up. Perhaps it works well for a few events/signals on a single device, but does it scale when taking the same model to a gateway device with thousands of events/signals?

It is probably worth looking at the NATS Key Value Store, built on top of the JetStream, for inspiration and prior art.

huntc commented 1 year ago

Very interesting discussion. I'm keen to follow along as it intersects with some of my current needs.

Thanks! Great to have you along for the ride!

In the systems I've worked on, I've never used a log as the backing store for all state changes. For example, as already mentioned, the "name changed" event only happens once, so that would just be persisted in a file or a lightweight database (and considered metadata, not event data). I've used logs for 1) sharing data (e.g., another process subscribes to the log to write the events to an embedded historian) or 2) replicating data (e.g., replicating data to the cloud). There is no need for compaction of these logs and a combination of time-based and size-based retention works well (just delete the oldest file segment in a topic).

I've always been weary of these other more simplified approaches to compaction. You may persist snapshots in a store that can typically be shared in itself, and so you may now be managing two distributed systems. If not then you may have an incomplete set of events to source the new state from given, say, given a new node starting up and needing to source state. I acknowledge that this may not be a problem in practice given a vast amount of storage.

And then there's the issue of scanning through many events to get to the state you're interested in... I've had errant sensors that continuously attempt to join the network despite the network accepting them each time. I then ended up with thousands of events that weren't useful to have around and slowed down log consumption when sourcing state at a later point; perhaps the last 10 occurrences would have been useful to retain. This is actually what got me thinking more about compaction when working in Agtech and where resources are less of an issue given the cloud-edge approach I'm now using.

I'm curious what it would look like to back all state changes with the log and what compaction would demand in terms of IO, CPU, storage degradation, and predictable performance.

Great. Me too! In terms of IO at least, I'm thinking that regular compaction should perform well, and most likely with data still cached in memory. I've no hard data on this though. Also, I feel that if I can handle hundreds of events on a tiny MIPS32 processor with slow flash memory then it should scale well. Then, there's the reduced IO from not having to deal with other means of state persistence. Lots of measurements are to be done in this area though.

I would also keep in mind what this pattern looks like scaled up. Perhaps it works well for a few events/signals on a single device, but does it scale when taking the same model to a gateway device with thousands of events/signals?

It may not and the use case is always important, but I think the approach suggested could reproduce the conventional Kafka-style compaction approaches quite easily too. That said, the use-case for logged is most definitely an embedded-edge scenario with limited resources, particularly storage. As its README states, use Kafka when you have it (at least for now!!). :-)

It is probably worth looking at the NATS Key Value Store, built on top of the JetStream, for inspiration and prior art.

Thanks. I see JetStream offers compaction and it looks a lot like Kafka.

arnolddevos commented 1 year ago

The compaction approach we are discussing is almost the same as reducing a log to an in-memory KV store then streaming that back to a log again. In effect we are implementing both sides of a database system (log and index) but persisting only the log.

We like the log bc we do event sourcing. Even though time needed to play back the log on startup is an obv. disadvantage.

Less obvious is the write amplification due to compaction cycles. That is, the same data get written and re-written repeatedly. This can be a killer in some applications. My own negative experience in this regard is with early versions of Cassandra.

(Most databases just truncate the log and don't compact. Cassandra is a hybrid.)

huntc commented 1 year ago

An update on progress. Most of the compaction is done!!! Well, I've actually got to write the code around running the compactor, but all of the foundational stuff is done.

How does it look? (I hear you ask!). For many scenarios, once a commit log has been created, it is simply a case of registering a built-in strategy for a given topic:

commit_log.register_compactor(
    MY_TOPIC, 
    KeyBasedRetention::new(MY_TOPIC_NUM_RECORD_TYPES_PER_COMPACTION_RUN)
);

...where MY_TOPIC_NUM_RECORD_TYPES_PER_COMPACTION_RUN isn't the total number of types of records for a given topic, just the number we're going to process per compaction run. If there are more types discovered than this then I plan to run the compactor again until all types have been processed.

We now have the ability to compose compaction strategies and use two off-the-shelf ones: KeyBasedRetention aka Kafka's, and one that I've founded needed over the years: NthKeyBasedRetention where the n keys can be retained. I've also provided an example that provides a custom compaction strategy that is a composition of these two built-in ones: https://github.com/streambed/streambed-rs/pull/28/files#diff-641029faa3314096f0248587bf6278084ee55b58baf122d558e8f76f15e8b025R348-R485.

The changes that permit a commit log to be spread over a read-only history file and a write-able file for appending have also been done. Along the way, I've replaced the Tokio async file IO with the std lib's as non-blocking doesn't get us far, and most likely slows things down given the ultimate deferral to blocking IO by Tokio.

So all that is remaining is to write the compactor task logic itself. This should now be straightforward and all things going well, I should be done by mid-next week.

Reviews and comments on the PR are very welcome!

P.S. I also checked out the Linux jffs2 file system we're using just to see whether there's anything we can do to optimise the commit log for flash storage. There isn't and jffs2 appears to take care of everything for us, including compression. We may consider block IO with the flash device in the future and bypass jffs2. We might get improved IO, but I would rather wait to hear of problems there. I think we're already reasonably performant.

huntc commented 1 year ago

Another update. After chatting with @arnolddevos yesterday, the commit log has received a massive overhaul. While many of the changes are to accommodate compaction, the end result should yield faster performance when consuming and producing. Prior to these changes, logged would funnel all writes through a single channel. I think I did that originally to support multiple topics going into one file. Given that we've got one set of files per topic though, we can have a producer task per topic.

The history file logic has also been enhanced to support two types of history: history representing the last/current compaction, and ancient history representing current compaction. We're now ready to finally write the compaction task logic. :-)

huntc commented 1 year ago

Big couple of days. Got most of the compactor logic in place. In fact, sans some tests, all that remains to write is to go through the history files and create a new one given the offsets to retain.

I'm quite pleased with the approach to scheduling compaction. We have a parameter that indicates a threshold of active commit log file size (active being the one that gets written to). This param equates to the erase size on flash for our hardware (64KB) - I'm thinking that'll be a common size and so it serves as a default. It can be overridden. So, compaction occurs around that threshold. If we're compacting and we reached that threshold again then we put back-pressure on the producer. That's it. :-)

Lots of performance improvements too. We've now got read and write buffers with the latter being flushed if there are no further writes pending.

I've also been doing some benchmarking and I can pretty much see 1 million events per second being read and written. You can check out the benchmarks on your machine via cargo bench. I feel we're comparable to other commit logs like https://github.com/OpenHFT/Chronicle-Queue, and we take care of disk space also...

huntc commented 1 year ago

2,407 LoC and 3 weeks later, the compactor is functionally complete. :-P There are lots of tests in there, but we now need some real-world testing. I'll embark on that tomorrow. Meanwhile, it is ready for review for those who are brave enough! I'm quite keen to get the PR in now as there is lots of new code and we need to start using it.

One last thing: the IoT example has been updated to use the compactor and I can see we're only taking up 140KiB for the entire logged implementation - happy with that!

huntc commented 1 year ago

Compaction is done. Happy with the testing so far. I've enabled compaction for the example IoT service and documented how it can be observed.

I'm really thrilled to have got this done and thank the help I've received here. I think it is one of the best bits of software I've had the pleasure of putting together. This is a fast and resource-friendly commit log.

Onward!