Closed ryzhyk closed 4 years ago
There is not an API, as such, but that might be interesting. We are about to land (I hope) some improved timely logging infrastructure, and this would be a natural thing for the arrange operator to produce on a differential logging channel.
Let's see, compaction happens automatically as new batches are minted and merged, but only if the computation is certain that no observers of the collection can distinguish between the times it would coalesce. There are some details in the k-pg submission, page 5 and to some extent Figure 5c, but informally as time ticks forward, as long as you are not holding on to a trace handle (i.e. the result of arrange().trace
) this should happen mostly automatically.
The compaction is designed so that at all times you have at most a 2x overhead, which has kept us from needing to expose a compaction API. Perhaps once we land the logging changes and start producing some data for you, we can learn a bit more about whether differential is doing a bad job at this. :)
This logging stuff worked out pretty well. I've hacked a bit at a logging differential branch, and instrumented trace maintenance (merging mainly) with logging events. They are written to a "differential/arrange"
logging stream, which you can tap in to in a differential program by writing:
// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args(), move |worker| {
use differential_dataflow::logging::DifferentialEvent;
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", |_time, data|
data.iter().for_each(|x| println!("ARRANGE: {:?}", x))
);
// more code here ...
This will print all of the events to the screen, and they look a bit like this:
Echidnatron% cargo run --example bfs -- 100 100 100 100 no
Compiling differential-dataflow v0.6.0 (file:///Users/mcsherry/Projects/differential-dataflow)
Finished dev [unoptimized + debuginfo] target(s) in 19.29s
Running `target/debug/examples/bfs 100 100 100 100 no`
performing BFS on 100 nodes, 100 edges:
10.083416ms loaded
ARRANGE: (13.206062ms, 0, Batch(BatchEvent { operator: 6, length: 1 }))
ARRANGE: (15.368504ms, 0, Batch(BatchEvent { operator: 7, length: 97 }))
ARRANGE: (16.898892ms, 0, Batch(BatchEvent { operator: 10, length: 2 }))
ARRANGE: (17.379407ms, 0, Batch(BatchEvent { operator: 11, length: 2 }))
ARRANGE: (18.841584ms, 0, Batch(BatchEvent { operator: 6, length: 1 }))
ARRANGE: (19.221636ms, 0, Merge(MergeEvent { operator: 6, scale: 0, length1: 1, length2: 1, complete: None }))
ARRANGE: (19.256074ms, 0, Merge(MergeEvent { operator: 6, scale: 0, length1: 1, length2: 1, complete: Some(2) }))
ARRANGE: (19.945621ms, 0, Batch(BatchEvent { operator: 10, length: 1 }))
ARRANGE: (20.289103ms, 0, Batch(BatchEvent { operator: 11, length: 1 }))
ARRANGE: (21.889896ms, 0, Batch(BatchEvent { operator: 6, length: 1 }))
22.489327ms stable
ARRANGE: (24.118652ms, 0, Batch(BatchEvent { operator: 7, length: 200 }))
ARRANGE: (24.533275ms, 0, Merge(MergeEvent { operator: 7, scale: 8, length1: 200, length2: 97, complete: None }))
ARRANGE: (25.01207ms, 0, Merge(MergeEvent { operator: 7, scale: 8, length1: 97, length2: 200, complete: Some(295) }))
ARRANGE: (25.69055ms, 0, Batch(BatchEvent { operator: 10, length: 2 }))
It is reporting an elapsed duration (from start of computation), the worker id (out of the different worker threads) and then either a BatchEvent
for batch creation or a MergeEvent
, which you can check out in differential's src/logging.rs
. The rough gist is that there is an operator id (the same as reported in timely logs; can get you started with those too), the "scale" which is the order of magnitude in the geometric merging, the two lengths of the source batches (number of update tuples) and if complete the length of the merged batches.
If you use a different logger (check out timely's execute::execute()
in the logging branch, to see an example), you can dump this into a socket as typed data and write an analysis program that will listen as things run. You can check out an example of this in the timely-viz project, whose bin/dashboard.rs
example connects to a timely logging stream and present aggregate information about elapsed times and messages. Doing something similar for differential logging streams (e.g. tracking the total number of tuples, as merges start and complete) should be possible.
Awesome, thanks heaps for the pointers and the explanation! I will start playing with the tracing facility. Do you think you could give me a quick pointer to the interfaces I can use to track the number of tuples in differential?
I'll write you an example program either tonight or tomorrow morning. What it will do is consume the stream of Batch and Merge events, which collectively will tell us how many tuples there are at any point in time (Batches create tuples, Merges have the potential to reduce them).
Ok, if you pull the current version of the logging branch, you can do the following in shell number one:
%cargo run --release --example logging-recv -- 1
and then in shell number two:
%TIMELY_WORKER_LOG_ADDR="127.0.0.1:8000" DIFFERENTIAL_LOG_ADDR="127.0.0.1:9000" cargo run --release --example bfs -- 100 100 100 100 no
You should see things start to spill out in both shells, but in the first shell you will see:
% cargo run --release --example logging-recv -- 1
Finished release [optimized + debuginfo] target(s) in 0.07s
Running `target/release/examples/logging-recv 1`
OPERATES: ((0, ([0, 0, 1], "Input")), (Root, 1s), 1)
OPERATES: ((1, ([0, 0, 2], "Input")), (Root, 1s), 1)
OPERATES: ((3, ([0, 0, 3], "Map")), (Root, 1s), 1)
OPERATES: ((6, ([0, 0, 4, 1], "Map")), (Root, 1s), 1)
OPERATES: ((7, ([0, 0, 4, 2], "Feedback")), (Root, 1s), 1)
OPERATES: ((10, ([0, 0, 4, 3], "Concatenate")), (Root, 1s), 1)
OPERATES: ((13, ([0, 0, 4, 4], "Map")), (Root, 1s), 1)
OPERATES: ((16, ([0, 0, 4, 5], "Map")), (Root, 1s), 1)
OPERATES: ((18, ([0, 0, 4, 6], "Arrange")), (Root, 1s), 1)
OPERATES: ((20, ([0, 0, 4, 7], "Arrange")), (Root, 1s), 1)
OPERATES: ((23, ([0, 0, 4, 8], "Join")), (Root, 1s), 1)
OPERATES: ((26, ([0, 0, 4, 9], "Concatenate")), (Root, 1s), 1)
OPERATES: ((28, ([0, 0, 4, 10], "Arrange")), (Root, 1s), 1)
OPERATES: ((30, ([0, 0, 4, 11], "Group")), (Root, 1s), 1)
OPERATES: ((32, ([0, 0, 4, 12], "AsCollection")), (Root, 1s), 1)
OPERATES: ((34, ([0, 0, 4, 13], "MapInPlace")), (Root, 1s), 1)
OPERATES: ((37, ([0, 0, 4, 14], "Concatenate")), (Root, 1s), 1)
OPERATES: ((39, ([0, 0, 4, 15], "Map")), (Root, 1s), 1)
OPERATES: ((43, ([0, 0, 5], "Map")), (Root, 1s), 1)
OPERATES: ((44, ([0, 0, 4], "Subgraph")), (Root, 1s), 1)
OPERATES: ((46, ([0, 0, 6], "Filter")), (Root, 1s), 1)
OPERATES: ((48, ([0, 0, 7], "Map")), (Root, 1s), 1)
OPERATES: ((50, ([0, 0, 8], "Map")), (Root, 1s), 1)
OPERATES: ((52, ([0, 0, 9], "Arrange")), (Root, 1s), 1)
OPERATES: ((54, ([0, 0, 10], "AsCollection")), (Root, 1s), 1)
OPERATES: ((56, ([0, 0, 11], "InspectBatch")), (Root, 1s), 1)
OPERATES: ((58, ([0, 0, 12], "Probe")), (Root, 1s), 1)
MEMORY: (6, (Root, 1s), 4)
MEMORY: (7, (Root, 1s), 498)
MEMORY: (10, (Root, 1s), 50)
MEMORY: (11, (Root, 1s), 204)
((6, ([0, 0, 4, 1], "Map")), (Root, 1s), 4)
((7, ([0, 0, 4, 2], "Feedback")), (Root, 1s), 498)
((10, ([0, 0, 4, 3], "Concatenate")), (Root, 1s), 50)
MEMORY: (6, (Root, 2s), 26)
MEMORY: (7, (Root, 2s), -200)
MEMORY: (10, (Root, 2s), 14)
MEMORY: (11, (Root, 2s), -47)
((6, ([0, 0, 4, 1], "Map")), (Root, 2s), 26)
((7, ([0, 0, 4, 2], "Feedback")), (Root, 2s), -200)
((10, ([0, 0, 4, 3], "Concatenate")), (Root, 2s), 14)
MEMORY: (6, (Root, 3s), -29)
MEMORY: (7, (Root, 3s), 200)
MEMORY: (10, (Root, 3s), 81)
MEMORY: (11, (Root, 3s), -40)
((6, ([0, 0, 4, 1], "Map")), (Root, 3s), -29)
((7, ([0, 0, 4, 2], "Feedback")), (Root, 3s), 200)
((10, ([0, 0, 4, 3], "Concatenate")), (Root, 3s), 81)
%
What this is showing us is first the nodes in the dataflow graph, with address and names, and then the changes related to numbers of allocated tuples in each arrange operator. The intent was that they would both be indexed by operator id, but it turns out the memory stuff is currently indexed by local operator id (the last element of the address). You can kinda see where nodes 6,7,10,11 are, inside the nested scope there (ids 18,20,28,30).
Anyhow, I have it rigged to update once per second; you can change that in logging-recv.rs
. It was the easiest thing to do though vs rounding up some numbers of nanoseconds or whatever. You can also turn off the rounding, which is done in each operator in this step
let ts = Duration::from_secs(ts.as_secs() + 1);
just by commenting it out, but that will produce an obscene amount of output (nanosecond accurate updates in tuple allocations for each allocating operator.
This is incredible, thanks!
I am re-reading the k-pg paper to better understand what is going on, and will then try applying your logging framework to my code to see if I can use it to explain its memory consumption. I will report how that goes, hopefully by tomorrow.
I just pushed a version that i. uses the correct identifiers (so you should see the right operator names and addresses) and ii. fixes an issue with blocking reads on sockets (process could easily hang while trying to read from two sockets).
Edit: Only changes were in logging-recv.rs
and timely/differential, not in bfs.rs
so shouldn't be other code you need to change. Fwiw, you should just be able to copy/paste the logging header from bfs.rs
into your program, and then use the existing logging-recv.rs
which should work for all differential programs, I think.
Thanks for the fix! I already ran into this problem, but thought it was an issue in my code.
Yep, just copying the code from bfs.rs
works. However, now that I know how to intercept DifferentialEvent
I am inclined to do something simpler and maintain the counters inside my application without going through sockets and building another dataflow for performance events on the other side.
That makes sense too! And I'm glad the infrastructure lets you do that (phew!).
I am not sure I understand the semantics of timestamp bounds in log events. Here is your modified code where I simply print all DifferentialEvent
s and increment their timestamps by one second in the hope that I won't hear from the same operators until one second in the future:
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data| {
eprintln!("event: {:?}", data);
for (ts, _, _) in data.iter_mut() {
*ts = Duration::from_secs(ts.as_secs() + 1);
}
});
Below is the output I obtain (note: this is not in bfs.rs
; I copied the code to my program). The first thing that seems a bit surprising is that I get some events without any data. But also, it seems that the lower timestamp bound is ignored. For example the two events from operator 37 are less than 1s apart.
event: []
event: [(Duration { secs: 0, nanos: 469060051 }, 0, Batch(BatchEvent { operator: 3, length: 10000 })), (Duration { secs: 0, nanos: 659732340 }, 0, Batch(BatchEvent { operator: 5, length: 10000 })), (Duration { secs: 0, nanos: 676640024 }, 0, Batch(BatchEvent { operator: 12, length: 10000 })), (Duration { secs: 0, nanos: 863260139 }, 0, Batch(BatchEvent { operator: 14, length: 10000 }))]
event: [(Duration { secs: 1, nanos: 95285438 }, 0, Batch(BatchEvent { operator: 20, length: 10000 })), (Duration { secs: 1, nanos: 107990887 }, 0, Batch(BatchEvent { operator: 25, length: 10000 }))]
event: []
event: []
event: []
event: []
event: [(Duration { secs: 1, nanos: 195094922 }, 0, Batch(BatchEvent { operator: 37, length: 10000 }))]
event: [(Duration { secs: 1, nanos: 379119983 }, 0, Batch(BatchEvent { operator: 39, length: 10000 }))]
event: []
event: []
event: []
event: []
event: []
event: [(Duration { secs: 1, nanos: 510860247 }, 0, Batch(BatchEvent { operator: 3, length: 0 }))]
event: []
event: [(Duration { secs: 1, nanos: 516784086 }, 0, Batch(BatchEvent { operator: 12, length: 0 }))]
event: []
event: [(Duration { secs: 1, nanos: 520245079 }, 0, Batch(BatchEvent { operator: 20, length: 0 })), (Duration { secs: 1, nanos: 520346732 }, 0, Batch(BatchEvent { operator: 25, length: 0 }))]
event: []
event: [(Duration { secs: 1, nanos: 521167088 }, 0, Batch(BatchEvent { operator: 37, length: 0 }))]
event: []
event: []
event: []
event: []
So, the event ticking, which is what you've tied in to, does some things that the logging-recv.rs
cleaned up.
First, it may emit empty event statements, as a way of advancing the time
argument to the closure. This indicates that the logging has moved forward, and if anyone was awaiting logging records they can be unblocked. Otherwise, in the absence of logging records there would be no information that indeed everything has been heard up to a certain time. The time
argument should be a lower bound for all future events.
Second, .. I'm not sure what you are doing with the timestamps. :) You are printing them out, and then rounding them up a second. At the end of that closure, those records go away and no one sees them again, so incrementing things post-printout is probably not useful, unless I greatly misunderstand.
I suspect there is a mismatch between what the logging does and what you expect it to do, which makes sense because it isn't documented or anything. :)
That closure gets called for each batch of differential events that are logged. No feedback from that closure returns to the logging subsystem; it will just get called again on the next batch of logging events (either when the buffer is filled, or flushed).
Probably the right thing to do is have a shared hashmap indexed by operator id, in which you stash the change in record counts, similar to what the logging-recv.rs
does (increment for batch, record the delta for merge). Then you could peek at the accumulated totals at any time.
Does this make sense? I have a bunch of built in context about the logging, and I'm sure I haven't explained it all well yet.
Makes sense, thanks for the explanation! The following comment in logging/src/lib.rs
confused me into thinking that I have control over the flow of notifications, but now that I re-read it, it does not actually say anything like that :)
/// The action should respond to a sequence of events with non-decreasing timestamps
/// (Durations) and well as a timestamp that lower bounds the next event that could be
/// seen (likely greater or equal to the timestamp of the last event). The end of a
/// logging stream is indicated only by dropping the associated action, which can be
/// accomplished with `remove` (or a call to insert, though this is not recommended).
I was able to use DifferentialEvents
to compute the memory profile of my program (yay! and thanks again for your help).
I ran into a couple of issues, though:
First, I am finding it tricky to correlate operator identifiers with the source code. I am using your method to extract operator names from timely events, but these are just generic names like "Arrange" or "Group". In a small program I am able to match them to the program locations that create those arrangements based on the order in which they appear in the log, but in a large program with hundreds of arrangement (I am working on some like that) this gets extremely tedious. Ideally, I would like to specify custom names for arrangements or, alternatively, to have an API to obtain worker-local arrangement ID after I've created an arrangement. Do you think one of those options is feasible? Is there a better way that does not require changing the API?
One thing I learned from looking at the profile is that the distinct()
operator creates two arrangements, as it first arranges the input collection and then groups it, which creates another arrangement. This more than doubles the memory footprint of my program. This gets particularly bad when I use the Variable
type I stole from your dataflog example. When the variable's drop()
method is called, it calls distinct()
on the variable's inner collection. It appears that the variable's inner collection always contains two copies of each element, since its size before grouping is exactly twice the size of the grouping.
I wonder if it is possible to implement distinct using only one arrangement.
Cool, this is good information.
This is a problem we have too. We probably want to add some more ability to name things, though that could add a lot of ergo noise. One possibility is to name subscopes, and have you create named lexical regions around logically grouped things. At the moment, subscropes can't be named, but that would be an easy thing to fix.
Wrt your suggestions: when you hold an Arranged
, we should be able to let you know about the id of the arrangement. It isn't surfaced at the moment, but it is known and we should be able to tell you without problems. We could almost certainly also let you name arrangements (that sounds good, like with scopes) but many of these will still be automatically constructed for you if you are using join and group and distinct and such.
2a. You may be interested in distinct_total
which comes from operators/threshold.rs
. It only uses an arrangement for its input. The trade-off is that it only applies to totally ordered timestamps, usually meaning outside of loops (or in loops where the input cannot change).
2b. More generally, it is hard to implement distinct
without tracking what has previously been produced as output for general timestamps, and in particular in the middle of iteration. The mathematics are a bit subtle, and it is more than just "have I seen this before?" If you can distinguish between uses of distinct where distinct_total
would apply and those where the more expensive version should be used, that could help.
Another random thought: in datafrog, we gave folks the option of defining "indistinct" variables, which would not be subjected to distinct each iteration. It was then up to the user to ensure that any recursive definition would have no cycles of indistinct operators, to ensure convergence, but there were many opportunities to remove the distinctness from e.g. intermediate variables.
I am not sure subscope naming would help that much, as it would still be hard to distinguish arrangements within a scope.
I understand complications with automatically constructed arrangements. If a clean API support is problematic, I will probably implement some hacky scheme, e.g., I can set a global variable before creating an arrangement and read this variable in the log event handler.
I will start using distinct_total
wherever it is safe. Unfortunately, in the specific program I am trying to optimize distinct
blows up inside a loop, and the indistinct optimization does not seem to apply either.
In fact, all I am trying to do is a very basic graph reachability computation. Given a subset of labeled nodes in a graph, we need to compute sets of labels reachable from each graph node. In Datalog terms it is just:
Reachable(x,l) :- Labeled(x,l).
Reachable(y,l) :- Edge(y,x), Reachable(x,l).
With my current implementation, if the size of Reachable
is N
, the distinct
operator ends up storing 3N
records.
The 3N
seems not unreasonable, though we could maybe optimize that down to 2N
. Generally, differential needs to track the number of derivations of a fact in each round of derivation. A fact that is produced in the third round is different from one produced in the fifth round, and differential needs to track them to know when a fact goes away was it the important one in the third round or the redundant one in the fifth. Generally, there could be a lot more than 3N
records maintained in the input; as many times N
as there are rounds of iteration, in the worst case.
As best as we know, this is part of the cost of incrementally maintaining these computations, for which there aren't a lot of other options.
There is the potential to implement a version of distinct
that doesn't maintain an output trace, which I think would reduce the cost from 3N
to 2N
if I understand correctly. I'm not sure I understood which arrangement has double that of the other; I would guess the input arrangement is larger than the output arrangement, but I may have misunderstood. This would be some non-trivial engineering, and I'd rather just buy you some more memory. :)
Maybe another point that could help, there are several types of "optimization" you can do for differential computations. For the case of reachability, you could write it two ways:
roots.iterate(|inner| {
let edges = edges.enter(&inner.scope());
edges
.semijoin(&inner)
.map(|(_s,d)| d)
.concat(&inner)
.distinct()
})
Or
roots.filter(|_| false) // <-- different
.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let roots = roots.enter(&inner.scope());
edges
.semijoin(&inner)
.map(|(_s,d)| d)
.concat(&roots) // <-- different
.distinct()
})
Although they produce the same output, the second one may have a smaller distinct
footprint, because the input to distinct
"changes less". The first version feeds every newly derived fact back into the distinct
, just for good measure, and so probably holds at least two derivations of every fact (the initial derivation, and the round immediately after it).
In the case of Variable
, this may require a bit of re-thinking about the implementation so that it doesn't concatenate its own definition in each iteration. It only makes sense for certain types of computations (perhaps monotonically increasing fact sets are one of them), and I don't have on the tip of my brain the best way to do this.
In the "highly experimental" category, if you dive into the definition of Variable
, there is
pub fn from(source: &Collection<Child<'a, G, u64>, D>) -> Variable<'a, G, D> {
let (feedback, cycle) = source.inner.scope().loop_variable(u64::max_value(), 1);
let cycle = Collection::new(cycle);
let mut result = Variable { feedback: Some(feedback), current: cycle.clone(), cycle: cycle };
result.add(source);
result
}
which creates a variable. If you change this so that current
is not initialized to contain cycle.clone()
, perhaps like
pub fn from(source: &Collection<Child<'a, G, u64>, D>) -> Variable<'a, G, D> {
let (feedback, cycle) = source.inner.scope().loop_variable(u64::max_value(), 1);
let cycle = Collection::new(cycle);
let mut result = Variable { feedback: Some(feedback), current: cycle.clone().filter(|_| false), cycle: cycle };
result.add(source);
result
}
I think you might get a reduction in tuples. What this does is not base the next round of iteration on the existing facts, but rather relies on the (assumed) monotonicity of your computation to re-produce the same facts each iteration. The differential execution will make sure that this work is not actually re-done, but in the same sense above that we do not concatenate inner
but rather just roots
, we are writing a rule that says
After one step, re-introduce only the base facts.
which should be sufficient to drive the computation to fixed point (modulo me being wrong).
Hi Frank,
I am trying to carefully work through your suggestions (differential computation can be confusing!!):
In some cases, the output of distinct is used as input to e.g. a join. In those cases, the output arrangement can be directly supplied to the join as input, preventing its re-arrangement. This requires a bit of surgery to get the arrangement out of distinct to avoid having it be flattened back into a collection, but it is definitely doable. Do you know if that would often be the case?
I am not sure this happens often, as the join is likely to require a different arrangement.
Another random thought: in datafrog, we gave folks the option of defining "indistinct" variables, which would not be subjected to distinct each iteration. It was then up to the user to ensure that any recursive definition would have no cycles of indistinct operators, to ensure convergence, but there were many opportunities to remove the distinctness from e.g. intermediate variables.
This sounds very promising. It seems that some form of this might work for my example. Is it possible to keep Reachable
as a collection and only arrange it when passing to the join? I could then return the collection from the inner scope and distinct_total
it in the outer scope. Is this completely wishful thinking or is there some form of this that could work?
Just to give you some context on why I am so desperately trying to reduce the memory footprint: I am working with potential users who would like to use differential in a very large system. They have a tight CPU and memory budget for the specific task where they want to use differential. They also have a custom implementation that fits in the budget, but they would like to replace it with something that is easier to maintain (programming incremental computations by hand is tricky!) as long as the performance and memory usage are acceptable. It appears that we are already in the right ballpark performance-wise, and should also be good memory-wise if I could solve the distinct
problem.
I will now try your experimental Variable
implementation.
Other random optimizations: you can make the iteration count be a u32
and the frequency count be an i32
; should be a 2x reduction in that part of the state. Plenty of fun optimizations to do! :D
Just to confirm, the new Variable
implementation does reduce the number of records from 3N
to 2N
. And it does not seem to break anything (my tests still pass).
I am not sure this happens often, as the join is likely to require a different arrangement.
That's a good point. It comes up for me with semijoin
and antijoin
, as those take "set-valued" collections as input. Also, in some cases it makes sense to skip distinct
and use group
with the key you plan to join with, and have the per-key logic be "distinct" on the values. It's not the best way to do distinct, but sometimes it makes things better elsewhere due to the sharing.
This sounds very promising. It seems that some form of this might work for my example. Is it possible to keep Reachable as a collection and only arrange it when passing to the join? I could then return the collection from the inner scope and distinct_total it in the outer scope. Is this completely wishful thinking or is there some form of this that could work?
In the graph reachability case this is probably not going to work. You need the distinct to quiesce the iteration, otherwise the frequencies just keep growing. No, we saw it more with mutually recursive rules, where there was a relatively small "cut set" of relations that would sever the mutual recursion. We made those be distinct, and left the others to just grow. It would probably be a light bit of program analysis to see what collections could be made indistinct (avoiding cycles in the derivation graph).
Also, I may be confused by the specific example, but reachability is exactly a case where you could use sharing between a group
-based distinct and the next join. If you have
Reachable(x,l) :- Labeled(x,l).
Reachable(y,l) :- Edge(y,x), Reachable(x,l).
then you could write (roughly)
proposals
.group_arranged(|_key, input, output| {
for (&source, _count) in input.iter() {
output.push((source, 1));
}
})
.join_core(edges_arranged, |node, source, neighb| Some((neighb, source)))
This does the distinct first, using a pretty crumby distinct (if you receive one new (source, whatev)
you will re-eval the whole key), but it would be more memory efficient due to the sharing.
The 3N seems not unreasonable, though we could maybe optimize that down to 2N. Generally, differential needs to track the number of derivations of a fact in each round of derivation. A fact that is produced in the third round is different from one produced in the fifth round, and differential needs to track them to know when a fact goes away was it the important one in the third round or the redundant one in the fifth. Generally, there could be a lot more than 3N records maintained in the input; as many times N as there are rounds of iteration, in the worst case.
So let's say we are running an iterative computation, keeping track of derivation numbers at every round. This lets differential incrementally update the computation if an update arrives for one of the inputs to the recursive fragment for a timestamp in the past. Now, assume that the iterative computation is notified that all its inputs have moved to the next epoch (not sure this is the right terminology, but basically this is the point where no more inputs are guaranteed to arrive for older timstamps). At this point the first of the two arrangements produced by distinct
becomes redundant, but it still stores at least N
records in memory, even after compaction. Is this how it works? (I am not arguing this is easy or possible to fix; just still trying to wrap my head around how things work inside differential).
I'm not sure any updates become redundant in this example.
Let's imagine that some fact becomes true in epoch t
round 3. We would use a timestamp (t,3)
, and write down something like
(fact, (t,3), +1)
Even if the input advances to t+1
or beyond, we need to hold on to this because the future changes could involve retractions of facts. When retractions happen, we need (or seem to need) to keep the iteration around, so that were it the case that we had
(fact, (t,3), +1)
(fact, (t,5), +1)
and we receive a retraction of fact
, we need to be able to tell apart whether this is a retraction at (t+1,3)
or (t+1,5)
, and the latter will not result in a net retraction of fact
whereas the first might (it's changes will cascade forward and could result in a subsequent additional retraction of fact
).
I may have misunderstood your question, but I don't see a case where either arrangement is redundant. Perhaps you could say that the second arrangement in a distinct is redundant in that it can be derived from the input arrangement, and I think you could write a more expensive distinct operator that doesn't maintain its output, but .. yeah not yet. :)
Understood, I wasn't thinking about computation at time t+1
correctly.
Also, I may be confused by the specific example, but reachability is exactly a case where you could use sharing between a
group
-based distinct and the next join. If you haveReachable(x,l) :- Labeled(x,l). Reachable(y,l) :- Edge(y,x), Reachable(x,l).
then you could write (roughly)
proposals .group_arranged(|_key, input, output| { for (&source, _count) in input.iter() { output.push((source, 1)); } }) .join_core(edges_arranged, |node, source, neighb| Some((neighb, source)))
This does the distinct first, using a pretty crumby distinct (if you receive one new
(source, whatev)
you will re-eval the whole key), but it would be more memory efficient due to the sharing.
Neat! This should work for any recursive variable that is always used with the same arrangement inside the loop, right?
I am observing interesting memory allocation behavior in differential and was wondering if by any chance you know what is going on.
I run the same differential program (the same compiled binary) with the same input on two machines and observe that the peak resident set size (measured with /usr/bin/time -v
) is 25% larger on the machine with more physical memory (0.5GB vs 0.4GB). This could be caused by different OS paging behavior, but when I look more closely into memory profile over time, I notice that as soon as I push the first value to differential (which should only take a few bytes), its resident set size increases by ~40MB on the larger machine, but not on the smaller one.
Is differential (or timely) allocating some large memory pools? If so, where in the source code should I look to understand this behavior?
Not that I'm aware, no. It intentionally tries to keep its footprint bounded by the amount of data it thinks it needs to hold, and tries to avoid keeping more than that around. There is some pooling in the network stack, but I don't think you've engaged that (1MB binary slabs for serialization).
DD definitely doesn't pay attention to the available physical memory itself, but it is possible that jemalloc does.
Fair enough. It is most likely something down the stack.
This should be fixed by #115, which is about to land (tests pending).
Thanks, @frankmcsherry , this has been a really helpful discussion. Here is the summary of things I've done so far to improve memory footprint:
distinct_total
where it is safe to do so.Variable
implementation.Variable
instances for non-recursive collections used inside subscopes).There are a couple of things I haven't tried yet, but will soon try:
distinct
you proposed that can reduce the number of arrangements in some recursive examples.Finally, if you ever have a chance to implement a version of distinct
that does not maintain the output collection, that should also be really helpful.
Thanks!
Another random thought, that might help. If you have collections that are used multiple times, even if the distinct output is not the correct arrangement it may help to re-arrange the collection for each "way" the collection is used, and share those arrangements between e.g. joins and groups.
For example, in graph processing we have collections of (src, dst)
pairs, and use them either indexed by src
, or indexed by dst
, or indexed by (src, dst)
as distinct would produce as output. We do many joins with these relations, but we only maintain three arrangements no matter how many times and ways we use the edges.
If you have large or often changing collections that are re-used you should be able to optimize these down to at most one copy for each way each is used.
Hmm, this makes sense. I already have an optimization that produces an arrangement per every type of join per collection. But I guess you are saying that I do not need to produce an additional arrangement for those joins that arrange the collection by self, as I can reuse the arrangement computed inside distinct
. Is this correct?
Erm, I'm not sure I understand well enough to confirm. :)
For each collection, you only need to arrange each keyed by any subset of attributes at most once. And, distinct gives you the "all attributes" arrangement. If you are already doing this, great!
Right, but distinct()
returns a collection. Is there a way to access its internal arrangement?
Yes, kinda. You would need to crack open the definition and just use its parts. For example, distinct uses threshold
internally, which is implemented for collections and arrangements as
impl<G: Scope, K: Data+Hashable, R1: Diff> Threshold<G, K, R1> for Collection<G, K, R1>
where G::Timestamp: Lattice+Ord {
fn threshold<R2: Diff, F: Fn(R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
self.arrange_by_self()
.group_arranged::<_,_,DefaultKeyTrace<_,_,_>,_>(move |_k,s,t| t.push(((), thresh(s[0].1))))
.as_collection(|k,_| k.clone())
}
}
impl<G: Scope, K: Data, T1, R1: Diff> Threshold<G, K, R1> for Arranged<G, K, (), R1, T1>
where
G::Timestamp: Lattice+Ord,
T1: TraceReader<K, (), G::Timestamp, R1>+Clone+'static,
T1::Batch: BatchReader<K, (), G::Timestamp, R1> {
fn threshold<R2: Diff, F: Fn(R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
self.group_arranged::<_,_,DefaultKeyTrace<_,_,_>,_>(move |_k,s,t| t.push(((), thresh(s[0].1))))
.as_collection(|k,_| k.clone())
}
}
In each of these, you could just directly call group_arranged
, which results in the arrangement, and skip the as_collection()
call which only flattens arrangements to collections. More ergonomic ways to vary the result type work for me too. :)
Right, that makes sense.
I'm going to close this out as resolved, but I'm always up for more chatter about understanding memory use in differential.
I would like to understand the memory usage of my differential-dataflow-based program, including the size of each individual collection, arrangement, history, etc., and how these sizes change over time (basically, whatever it takes to correlate the overall memory footprint of the program to specific nodes in the dataflow graph). Is there currently an API/example code I could use to accomplish this?
A related question: I understand that differential's memory footprint grows with the number of trace entries it stores, and that it now has the ability to compact the trace. When does is trace compaction triggered and is there a way to control it directly or indirectly?