Closed teh-cmc closed 3 months ago
Temporal batches provide a huge DX and performance gain on the logging side, but once the data gets exploded back into rows and indexed into the store, all the problems stated above still apply.
This is only true so long a we continue to explode things back out into separate rows on ingest. It's not articulated as such in the temporal-batch issue, but if the store itself had end-to-end support for temporal batches (admittedly a lot of work) I believe this would solve many of the issues for real-world happy-path use-cases.
By happy-path, I mean if the temporal batch is pre-sorted on the same timeline as your range query and potentially even non-overlapping with other temporal batches. In this case a store that worked natively with temporal batches should only need to carry overhead of O(batches) rather than O(scalars) and be able to just return direct scalar slices from the original arrow array without ever having needed to explode these batches into separate rows in the first place. I believe this constraint would be acceptable to most users that are really pushing the envelope of large scalar streams.
We don't need to make every possible edge-case fast. We need to provide some means for users to follow a happy-path to make a specific constrained edge-cases fast.
Good write-up!
I'd like to see a more detailed breakdown of where the overhead is once we start looking at this a little bit closer (i.e. how many bytes is the reverse index, how much is because of DataCell
, etc)
Having too many index entries / rows wrecks everything: from logging, to ingestion, to visualization and all the way to the memory footprint. Given this, here's the 1M$ question: how can Rerun help users index less rows while still somehow meeting all their needs without any (of very little) added complexity?
Whichever solution we come up with, it should work both with static known-ahead-of-time datasets (e.g. logging a scalar plot from parquet file) and real-time datasets (e.g. an high-frequency IMU).
I also forgot to mention that the solution should fit nicely with promises too. If one has a million promises and blindly creates 1 million index entries to store them, one is doomed before even resolving anything.
Looking a bit closer on the storage overhead:
In the DataTable
, for each component path and timeline we bucket the data into a DataCellColumn
.
Logically this is a Vec<Option<Vec<T>>>
: for each row, you optionally have a vector of values (one for each instance).
For instance: if you have a point cloud you log multiple times, you would get a [[positions at one time], [positions and the next time], …]
here.
For scalar plotting this would instead be something like [[0.1], [0.2], [0.3], …]
.
The actual implementation of DataCellColumn
is:
pub struct DataCellColumn(pub DataCellOptVec);
pub type DataCellOptVec = VecDeque<Option<DataCell>>;
// 8 bytes
pub struct DataCell {
// An Arc so we can share the same `DataCellInner` over different timelines,
// e.g. the same set of point cloud colors.
pub inner: Arc<DataCellInner>,
}
// 48 bytes
pub struct DataCellInner {
name: ComponentName,
size_bytes: u64,
// Internally, this is most likely a slice of another, larger array (one microbatch per LogMsg),
// so there is another Arc, offset, and indirection. Oh, and a DataType.
values: Box<dyn arrow2::array::Array>, // god knows how many extra bytes
}
Obviously this is a horrible way to store small things like scalars. I think in terms of memory overhead, this is where we should focus our attention, and then everything else will hopefully follow (🤞).
A quick test to measure the overhead: allocate 1M f64 scalars on a single timeline. Should be 32B/row (including RowId), i.e. 32MB in total, but actually:
That's just DataStore
with no secondary indices or caches.
In terms of storage, we should be able to always represent the entire DataCellColumn
with one arrow data type and two arrow buffers (the flattened data plus the offsets into it), but that would sometimes require duplicating data across timelines (unless they are in sync… which they almost always are!).
We have a multi-index problem (we index the data over many timelines), which can lead to a lot o overhead if we are not careful. However, I think we can focus on optimizing for the common cases and still get okish performance for pathological cases.
So that makes me wonder (like Jeremy above) if we should switch from each row/cell be the atomic unit to instead always consider a bucket of rows as the atomic thing. The store subscribers would get a notification for each new bucket, the GC would always drop one bucket at a time, etc.
struct DataStore {
component_streams: Map<ComponentPath, ComponentStream>
}
/// One per ComponentPath
struct ComponentStream {
// Note: buckets are shared across timelines.
// For each timeline, maps potentially overlapping ranges to buckets.
timelines: Map<TimeLine, IntervalMap<TimeRange, Arc<Bucket>>>
}
/// Many for each `ComponentPath`.
/// Stores some number of rows of data.
struct Bucket {
row_ids: Vec<RowId>,
// The time columns.
// The times are optional, because not all rows appear on all timelines.
// Could also be stored as arrow buffers.
times: Map<TimeLine, Vec<Option<TimeInt>>>,
// NOTE: it is not always possible to keep all timelines sorted!
// If we sort on one, another may get out-of-order.
// In most cases, they are monotonically increasing together, and we can stay sorted on all timelines.
// For pathological cases (one timeline increases as another decreases, for instance)
// we split the buckets so that they are small and do linear scans for unsorted timelines at query-time.
is_timeline_sorted: Map<TimeLine, bool>,
// The data column
components: DataCellColumn, // A type-erased `Vec<Option<Vec<T>>>` backed as raw Arrow data.
}
However, allowing overlapping buckets that are sometimes unsorted is quite complicated. Luckily there is a better way!
Extremely fast inserts, slower queries (requires a query cache). Very low memory overhead even when logging multiple components at a time.
struct DataStore {
entity_streams: Map<EntityPath, EntityStream>
}
struct EntityStream {
// Note: buckets are shared across timelines.
// For each timeline, maps potentially overlapping ranges to buckets.
timelines: Map<TimeLine, IntervalMap<TimeRange, BucketId>>,
/// If we use `Arc<Bucket>` we can return dense query results as `Set<(Arc<Bucket>, Range<usize>)>`
buckets: Map<BucketId, Arc<Bucket>>,
}
/// Many for each `ComponentPath`.
/// Stores some number of rows of data.
/// Split/new bucket on:
/// * different number of timelines
/// * change of datatype for any of the components
/// * if it is sorted and larger than 1M rows
/// * if it is unsorted and larger than 1k rows
struct Bucket {
// Always sorted
row_ids: Vec<RowId>,
// The time columns.
timelines: Map<TimeLine, BucketTimeline>,
// Future improvements: keep a per-timeline shuffle-index in case of unsorted.
// Each `Box<dyn Array>` is effectively a `Vec<Option<Vec<T>>>` backed as raw Arrow data.
// The outer option here is so that we can log only `Position` one time, but not a `Color`.
components: BTreeMap<ComponentName, Box<dyn Array>>,
}
struct BucketTimeline {
times: Vec<TimeInt>,
sorted: bool,
range: TImeRange,
}
Let's add the constraint that each bucket of data has all timelines monotonically increase together (no out-of-order).
What would this give us? This means that for each component and timeline we would store a BTreeMap of time to buckets (just like we do now), with the different that all timelines point to the same shared buckets (deduplicating data across timelines). Each such bucket would then consist of N rows, sorted by time (for every timeline).
struct DataStore {
component_streams: Map<ComponentPath, ComponentStream>
}
/// One per ComponentPath
struct ComponentStream {
component_name: ComponentName,
// For each timeline, maps the lowest time in each bucket to that bucket.
timelines: Map<TimelineName, BTreeMap<TimeInt, BucketId>>,
// Note: each piece of data is in exactly one bucket (deduplication).
buckets: Map<BucketId, Bucket>,
}
/// Never empty.
/// All timelines are dense (no holes!).
struct Bucket {
// This supports having hereogenous data types for the same ComponentName,
// as long as we split buckets if the data type change
// (e.g. sometimes logging a promise, somethimes logging the actual data).
data_type: re_arrow2::DataType,
// Future optimization: RLE
row_ids: Vec<RowId>,
// The time columns.
// We always keep these sorted (or sort-on-demand, but they should all be sortable).
// Could also be stored as arrow buffers.
// Future optimization: RLE
times: Map<TimelineName, Vec<TimeInt>>,
// The data column
// A type-erased `Vec<Option<Vec<T>>>` backed as raw Arrow data.
// Queries return slices into this.
components: DataCellColumn,
}
For the common case where timelines increase monotonically together (in-order logging) we can have very dense storage of data with very little overhead.
Bucket insertion is similar to what we have today, but with some additional causes for splits:
Pathological cases would degenerate into putting each row into its own bucket, (e.g. when log_time
increases as frame_nr
decreases, or every other log call to the same component path use a different data type).
row_id | log_tick | frame_nr | data |
---|---|---|---|
0x0 | 0 | red | |
0x1 | 1 | orange | |
0x2 | 2 | 1 | yellow |
0x3 | 3 | 2 | green |
0x4 | 4 | 3 | blue |
0x5 | 5 | 4 | indigo |
0x6 | 6 | 0 | violet |
(each color here could also be null
, and empty list, a list of many colors, etc - it doesn't matter for here)
ComponentStream.timelines = Map {
"log_tick": BTreeMap {
0: bucket_0,
2: bucket_1,
6: bucket_2,
}
"frame_nr": BTreeMap {
0: bucket_2,
1: bucket_1,
}
}
bucket_0 : |
row_id | log_tick | data |
---|---|---|---|
0x0 | 0 | red | |
0x1 | 1 | orange |
(split requiered because of new timeline)
bucket_1 : |
row_id | log_tick | frame_nr | data |
---|---|---|---|---|
0x2 | 2 | 1 | yellow | |
0x3 | 3 | 2 | green | |
0x4 | 4 | 3 | blue | |
0x5 | 5 | 4 | indigo |
(split requiered because of out-of-order timelines)
bucket_2 : |
row_id | log_tick | frame_nr | data |
---|---|---|---|---|
0x6 | 6 | 0 | violet |
Notice how each piece of data is only stored once, and how how each bucket is sorted, making queries easy and fast.
Notice also that the amortized overhead is minimal in the happy path (within each bucket): 16 bytes per RowId plus 8 bytes per timeline, plus 8 byte index per component (for the DataCellColumn). For a f64 scalar on 1 timeline, that's still a 1.5x memory overhead, which is much better than 1000x. There's also plenty of room to optimize this in the future by compressing the columns. For instance, we could encode each column (row_id/timeline) as a single base value plus an offset per row, where the offset could often be only 2 or 4 bytes.
Obviously there is a lot of details to figure out here, but we should consider this corner of the solution space and what its implications would be.
After a long meeting, we came up with and decided on Proposal B from above.
Internal slack thread containing real-world, third-party data and ingestion script that will be made much greater when this issue is resolved:
https://rerunio.slack.com/archives/C040X6J573Q/p1720458010590199
This is likely done, but needs some testing to verify. Maybe still blocked on:
This is now all implemented, and will be available in 0.18.
Logging many scalars is now orders of magnitudes faster and less memory intensive
Solutions
Context
Every
log
call in Rerun results in aDataRow
: Rerun's atomic unit of change.DataRow
s are indexed by the datastore, and that index ultimately powers all the row/time-based features in Rerun, such as latest-at & (visible-time-)range queries, change detection, the time panel and time cursor, garbage collection, etc.Each entry in this index (i.e. each row) carries of a bunch of metadata:
RowId
, that uniquely identifies the associated row and defines its global order (!).TimePoint
: a collection of timeline names and timestamps (generally around 3 of each in the common case).DataCell
s, each of which carry some metadata of their own:Array
), so that we can know whether the cell is:We also maintain a smaller, but still significant, reverse index, for when we need to fetch information about a specific
RowId
. Finally, in addition to these primary indices maintained by the datastore itself, many pieces of the app also maintain their own secondary indices via our change detection system (e.g. all the time panel stuff).All of this easily adds up to a few hundred bytes of overhead per index entry. Whether that becomes a problem or not from a memory standpoint depends on the amount of data that the user actually stores in each of these index entries.
For something like e.g. a point cloud or an image, the indexing overhead is dwarfed by the actual data, and everything is fine (for the most part). For something like a scalar, the indexing overhead is orders or magnitude larger than the data itself, which leads to situations like this one (#5904), where 20MiB of scalar data in a parquet file somehow ends up with 20GiB of memory footprint once logged.
Unfortunately the overhead issues don't stop at the memory footprint.
All row/time-based subsystems in the app must spend compute time to deal with these indices, which slows down all aspects of the viewer (including ingestion!) e.g.:
And, of course, the infamous worst of all: range queries that span a lot of indices will have to unpack as many rows, which quickly becomes prohibitively expensive and therefore requires a lot of complex caching machinery to pull off (including the dreaded deserialization cache!). Things once again get even worse from an overhead perspective if it turns out that the contents of the row was actually one single float value all along...
Having too many index entries / rows wrecks everything: from logging, to ingestion, to visualization and all the way to the memory footprint. Given this, here's the 1M$ question: how can Rerun help users index less rows while still somehow meeting all their needs without any (of very little) added complexity?
Whichever solution we come up with, it should work both with static known-ahead-of-time datasets (e.g. logging a scalar plot from parquet file) and real-time datasets (e.g. an high-frequency IMU).
Things that either wouldn't help, or not enough
Temporal batches
Temporal batches provide a huge DX and performance gain on the logging side, but once the data gets exploded back into rows and indexed into the store, all the problems stated above still apply. In fact, temporal batches are just way to make it easier to create more index entries!
Temporal batches also require the user to know their dataset ahead of time, while we want a solution that works both for AOT and RT use cases.
We do want temporal batches for other reasons, but they won't help with this issue.
Optimizing the memory footprint of index entries
Some possible avenues on that front:
log_tick
timeline.arrow::Array
?).RowId
(varints?!).While those would definitely be welcomed improvements, I don't see that kind of optimization getting us anywhere near a viable memory footprint for scalar use cases. This also only helps with the memory footprint, compute is still as much as an issue as before.
Others?
???
Related:
5904
4184
5622
5303