TimelyDataflow / differential-dataflow

An implementation of differential dataflow using timely dataflow on Rust.
MIT License
2.58k stars 184 forks source link

Understanding memory footprint of a simple recursive program #151

Open ryzhyk opened 5 years ago

ryzhyk commented 5 years ago

I am trying to understand heap memory usage of a simple differential computation. It uses way more memory than I can explain and I want to figure out if there is a way to reduce its footprint.

Specifically, I have the following trivial Datalog program compiled to DD using DDlog:

Span(entity, label) :- Labeled(entity, label).
Span(parent, label) :- Dependency(child, parent), Span(child, label).

It starts with a set of labeled nodes and propagates labels along Dependency edges. I am not showing DD code, as it is automatically generated and not easy to read. I use the recursive variable that I borrowed from Frank's code some time ago for the recursive Span collection. If my cost model of DD is correct, the resulting dataflow only has three operators that consume memory:

There is also an arrangement of the Dependency relation from the seconds rule, but it is negligibly small compared to Span.

I use a simple profiler to measure peak size of these operators throughout the run on a customer dataset. Here is what I get:

The size of the data type I use to represent values is 16 bytes. The by-key arrangement has two values (k,v), so, not counting metadata, there are approximately 26,000,000 16-byte values or 400MB of actual data in the program at the peak.

Using massif, I observe that the actual peak amount of memory that DD allocated on the heap is 1.6GB, including

(note, this is the actual memory requested by DD, not counting padding and malloc metadata).

The former is particularly interesting. Assuming my interpretation of profiling data is correct, DD uses 800MB to store 8,5M records, or ~100 bytes per record in group_arranged.

I would appreciate any help in:

Thanks!

frankmcsherry commented 5 years ago

Hi Leo,

The implementations of the storage are available in src/trace/implementations/ord.rs, with the data stored either in

  1. ArrangeByKey: OrdValBatch
  2. ArrangeBySelf: OrdKeyBatch

The two are represented as tries, which you can think of as

struct Trie<K, V, T, R> {
    keys: Vec<K>,
    key_offs: Vec<usize>,  // same length as above
    vals: Vec<V>,
    val_offs: Vec<usize>,  // same length as above
    time_diffs: Vec<(T, R)>,
}

In the ArrangeBySelf arrangement the val* fields do not exist (and the K type is taken to be the record, which may still have a (key,val) structure). In the calls to distinct(), ArrangeBySelf is used in both the input and output arrangement.

The amount of memory required will depend a bit on the number of values per key, and what you are using for your time (T) and difference (R) types. If you use 16 byte time (8 for each dimension) and 8 byte differences, each of your 16 byte (k, v) pairs could in principle become 64 bytes (due to alignment). If we multiply this by 26M, we get 1.664GB, which is almost what you are seeing. There is a factor of two that lives in the progressive merging logic, because we do not reconstruct data structures for every update, but you usually only see this after some amount of update churn (your footprint may grow with time, by up to 2x, which I would check before deploying!).

I have a bit of a hard time squaring the idea that one of the three arrangements is consuming half of the memory. And given that the other attributions are definitely not where the other arrangement allocations are made, .. is it possible that massif reports what is on the stack when a process goes to the kernel for more memory? That would report those calls responsible for growing the working set, which is different from those occupying the working set.


For helpful suggestions:

  1. It's hard to know if you need distinct() or not in your computation. If your Dependency relation is as acyclic as the parent and child bindings suggest, you could just skip the distinct() and use a normal differential Variable. This would remove 2/3 of your arrangements. Alternately, you could add a distinct_total outside the loop, which would return one of the arrangements. If Dependency has a cycle in it you should not do either of these, as the computation may not terminate.

    If Dependency has cycles, you could determine its strongly connected components, reduce the graph to the acyclic graph over the components, and then use a distinct()-free propagation on the components.

  2. If you need the distinct, but you have relatively few values per key, you could implement the distinct with a reduce using the key of (key, val), and performing the distinct in the reduction logic. The resulting arrangement, which you could capture, can then be used as a pre-arranged input for the join. This has the downside that each time any value associated with a key is changed, the entire group must be re-reduced for distinctness. But, it removes one of the arrangements at the expense of more computation.

  3. If each entity may have multiple labels then you might save memory by determining the distinct set of "entities of interest", performing the propagation just of "entities of interest" and then afterwards join the labels with the results.

  4. If you are not already doing so, you can use smaller types for time and difference (e.g. 4byte integers).

ryzhyk commented 5 years ago

Thanks for the detailed reply, I have much more clarity now!

The amount of memory required will depend a bit on the number of values per key, and what you are using for your time (T) and difference (R) types. If you use 16 byte time (8 for each dimension) and 8 byte differences, each of your 16 byte (k, v) pairs could in principle become 64 bytes (due to alignment). If we multiply this by 26M, we get 1.664GB, which is almost what you are seeing. There is a factor of two that lives in the progressive merging logic, because we do not reconstruct data structures for every update, but you usually only see this after some amount of update churn (your footprint may grow with time, by up to 2x, which I would check before deploying!).

Makes sense. I currently use: 8 bytes for outer timestamp, 4 for inner timestamp, and 8 for difference. Now that I think about it, I should be able to do much better. Since the diameter of the graph, which, I think, determines the number of inner iterations, is small, 2 bytes should be enough for the inner timestamp. In this particular program, due to using distinct on each iteration differences should be bounded by the maximal degree of a graph node, for which 2 bytes should also be enough. Does this plan make sense?

The outer timestamp is the part that I am not quite sure about. The way I use DD is that every time I get a bunch of input records, I advance the timestamp for all input handles by 1, so at any given point in time the current and the previous timestamps are the only ones that matter. So in principle one byte or even one bit should be enough to encode the outer timestamp. But I don't think this will work right now, as things will break once the outer timestamp wraps around to 0. Is this correct? Is there a way to save some memory by using a smaller outer timestamp?

I have a bit of a hard time squaring the idea that one of the three arrangements is consuming half of the memory. And given that the other attributions are definitely not where the other arrangement allocations are made, .. is it possible that massif reports what is on the stack when a process goes to the kernel for more memory? That would report those calls responsible for growing the working set, which is different from those occupying the working set.

Massif reports code locations that malloc'd the data currently on the heap. It is part of valgrind, i.e., it executes the program in a VM and should be pretty accurate. So if I understand correctly, Buffer::flush(), Exchange::flush(), Counter::push() are not the functions that allocate data that ends up in arrangements? What would be the correct locations? Any why would so much data allocated by these functions end up live on the heap?

For helpful suggestions:

1. It's hard to know if you need `distinct()` or not in your computation. If your `Dependency` relation is as acyclic as the `parent` and `child` bindings suggest, you could just skip the `distinct()` and use a normal differential `Variable`. This would remove 2/3 of your arrangements. Alternately, you could add a `distinct_total` outside the loop, which would return one of the arrangements. If `Dependency` has a cycle in it you should not do either of these, as the computation may not terminate.

Unfortunately, it does have cycles.

   If `Dependency` has cycles, you could determine its [strongly connected components](https://github.com/TimelyDataflow/differential-dataflow/blob/master/src/algorithms/graphs/scc.rs), reduce the graph to the acyclic graph over the components, and then use a `distinct()`-free propagation on the components.

Thanks for bringing this up. SCC is actually one optimization I wanted to implement, but I could not think of a memory-efficient way to compute it in DD. I will study your implementation and see if I can replicate it in DDlog.

3. If each `entity` may have multiple `label`s then you might save memory by determining the distinct set of "entities of interest", performing the propagation just of "entities of interest" and then afterwards join the labels with the results.

Unfortunately, this also does not work. There is <=1 label per key initially. I might be able to do something similar if I can identify a special structure in customer graphs.

ryzhyk commented 5 years ago

So I started experimenting with changing difference and timestamp types and discovered that distinct_total() and distinct() insist on returning isize weights, which means that I have to apply explode() to the result if I want to use, e.g., i32 for collections, which seems like a potential waste of cycles.

Edit: My bad, I can just use threshold and threshold_total directly to get collection with a custom difference type.

Another slight inconvenience I ran into is that lattice.rs does not implement Lattice for u16, and since I cannot implement the trait outside of the module, I had to create an awkward wrapper type. Is there a reason you don't want people to use 16-bit timestamps? Likewise, Diff is not implemented fir u16.