mit-pdos / noria

Fast web applications through dynamic, partially-stateful dataflow
Apache License 2.0
4.99k stars 244 forks source link

Initial Snapshotting Implementation #54

Closed ekmartin closed 6 years ago

ekmartin commented 6 years ago

Summary

This is an initial implementation of snapshot based recovery, where snapshots of each materialized node's state is persisted locally at each domain.

Most of this was done before Christmas, but I hadn't gotten around to pulling in the latest controller (ZooKeeper etc.) changes until now. Sorry about the big pull request - this probably should've been done in smaller steps!

Snapshotting

The snapshot process is initiated by a specific packet, TakeSnapshot, tagged with a numeric snapshot_id. The packet is sent to each base node and forwarded through the entire query graph.

Whenever a domain receives a TakeSnapshot packet, it immediately clones the state of all its materialized nodes and its readers. This is gathered up into a PersistSnapshotRequest, which is then sent to the domain's snapshotting worker. The TakeSnapshot packet is also forwarded through each domain's egress nodes.

Each worker pool has a single snapshotting worker, a SnapshotPersister. These are responsible for serializing and persisting each node's cloned state to disk. The serialization is currently done using bincode.

Finally, each SnapshotPersister notifies the controller after completing a snapshot, through a CoordinationMessage. The controller keeps track of the latest snapshot_id for each domain shard, making sure to only consider a snapshot as completed when all shards have taken a snapshot. Whenever this happens, the latest snapshot_id is persisted to a file (this should probably be stored in ZooKeeper though).

Recovery

Recovery is still initiated with a StartRecovery packet, which now also includes the ID of the latest fully completed snapshot - if there is one. Whenever a domain receives such a packet it attempts to read and deserialize the previously snapshotted state of each of its materialized nodes or readers. This packet is then forwarded through the entire graph, instead of just the domains containing base nodes.

Each log entry now also includes the latest snapshot ID of its domain. This is used during recovery to ensure that log entries contained in completed snapshots are ignored. In the future Soup should probably garbage collect these entries somehow, e.g. by creating a new log file for each incremented snapshot ID and deleting old files.

Benchmarks

So far I've primarily benchmarked two areas: the difference in recovery time using snapshots compared to logs, and the impact snapshotting has on write performance. The benchmarks were run on a server with an Intel Xeon E5-2620 CPU (6 physical and 12 logical cores) and 64GB RAM.

Recovery benchmark

vote-recovery populates the database with a set of articles and votes, and then measures the recovery time from scratch. The benchmark relies on either logs or snapshots in isolation, to clearly see the difference between the two methods.

Two numbers are measured: the time it takes for .recover() to complete and the total time until all recovery writes have propagated through the graph. The latter is measured at the point when readers no longer return stale data, by continuously polling using getters. This is far from optimal, and improvement suggestions are more than welcome. I do think the number is interesting to measure in general however, as it highlights the improvement from snapshotting every materialized node in the graph.

Logarithmic graphs measuring initial and total recovery time:

screen shot 2018-01-19 at 3 03 59 pm

The initial results are pretty decent: snapshot recovery spends about 40% of the time blocking .recover() compared to using only logs, while also completely propagating recovery updates 10 times faster than with log based recovery.

Write-throughput benchmark

The existing vote benchmark was used to measure the penalty induced by performing periodical snapshots, with a newly added --snapshot-timeout option. The benchmark was run with --avg --mixers=1 --runtime=60, while increasing the snapshot-timeout value in steps of 5 seconds, from 0 to 30.

Write throughput measured with different snapshot timeouts:

screen shot 2018-01-19 at 3 09 40 pm

As the graph shows, the different snapshot timeouts do not results in wildly different results, with snapshotting in general introducing a ~2% decrease in throughput. This makes sense since the bulk of the snapshotting work is performed by a single snapshotting worker, regardless of the timeout.

To simulate the case where the snapshot worker would have to compete with other threads for CPU cores, I also ran the same benchmark with a gradually increasing amount of parallel writers. This also didn't have too much of an impact on the throughput though, so might need further investigation:

Write throughput measured with increasing parallel writers:

screen shot 2018-01-19 at 3 38 58 pm

Benchmark improvements

I think it would be interesting to look at how long it takes to complete a snapshot, and at what point a single snapshotting worker becomes overloaded: i.e. what combination of data size, query graph node count and snapshot timeout would prevent the snapshotting worker from completing a snapshot before the next one is initialized? I'm sure there are other cases that should be investigated as well - let me know! There are probably also other benchmarks that should be run to better measure the penalty induced by simultaneous snapshots across the graph.

TODOs

As we've talked about before this is mostly a baseline implementation of snapshotting, and probably not the method that will end up being used. Regardless it'd be nice to get this merged so that future work can be done on top of it. There's a few things that should be fixed before that however:

ms705 commented 6 years ago

Great work! :+1:

A couple of questions on first read (will digest this more and respond in more detail later):

  1. What guarantees does the scheme make with regards to consistency of snapshots for different operators? Specifically, since the controller inserts the TakeSnapshot packet at some arbitrary point in the write stream coming into each base and diamonds in the data-flow graph can reorder the TakeSnapshot packet with regards to others, I would assume that it cannot make any guarantees about snapshots being taken at the same point in time. This may be fine, however, because on recover, whatever writes aren't in the snapshot at a base will be replayed from the log, and thus eventually the system reaches a consistent state again. (We also have some changes in the pipeline that will eliminate reordering at diamonds, which might help.)

  2. The performance results are great! However, I'm a little surprised to see that both lines have the same slope -- wouldn't you expect the cost of snapshot recovery to level off at some point as the number of votes increases? The size of the VoteCount aggregation view, for example, would certainly remain constant once all article keys have votes. Maybe what's happening here is that the benchmark measures the end-to-end recovery of the whole vote graph, which includes the Vote base table whose size increases monotonically. The benefits of snapshots might be much clearer if you measure the recovery time for internal operators like the aggregation for VoteCount only.

  3. What do you believe is imposing the 2% overhead? As I understand the implementation, you have a separate snapshot worker thread, and on receiving a TakeSnapshot packet, the domain copies the entire state of an operator and sends the copy to the snapshot worker, who then asynchronously writes it to disk while normal write processing continues. If that is correct, long-term throughput is perhaps not the ideal metric: instead, what matters is the pause time (i.e., latency penalty) imposed by the state copy. A back-of-the-envelope estimate suggests that your pause time in this experiment is probably around 100ms for the 5s snapshot interval, but it's a bit hard to estimate since this is end-to-end throughput, which is affected by the separate pause times at several operators. An interesting, educational graph to plot might be a timeline of end-to-end latencies until writes become visible -- we would expect to see occasional spikes that correspond to snapshots.

I agree that the results with thread contention seem curious -- at 15 parallel writers, the machine should presumably be choking under thread contention. It might be a good plan to take a look at top during this experiment and see which threads are actually hitting the CPU hard, and whether there is any headroom with the data points towards the right of the writer count x-axis.

jonhoo commented 6 years ago

Re @ms705's point number 3 above, the new open-loop benchmark might help with that. In theory, this should show up in 99th percentile batch processing latency and in the 50th or so percentile of sojourn time.

Finally, each SnapshotPersister notifies the controller after completing a snapshot, through a CoordinationMessage.

Does this mean that the controller won't start the next snapshot until the previous one has fully completed? With only one writer per worker pool, I'd worry that it'd be difficult for the controller to satisfy any but the most liberal snapshot timeouts. Anything shorter and the snapshot writers wouldn't be able to keep up. Having more than one might help, but as you say this will also increase demand for cores at the server.

Diamond cases where a domain might receive two snapshot packets need to be handled, as described in this TODO.

I believe this will be resolved with @fintelia's vector-timestamp changes. In particular, I think we should resist the urge to special-case the handling of edge-cases like this one (we already do this kind of thing with unions too), and try to see if we can solve it with a single mechanism like vector times.

ekmartin commented 6 years ago

@ms705:

  1. What guarantees does the scheme make with regards to consistency of snapshots for different operators? Specifically, since the controller inserts the TakeSnapshot packet at some arbitrary point in the write stream coming into each base and diamonds in the data-flow graph can reorder the TakeSnapshot packet with regards to others, I would assume that it cannot make any guarantees about snapshots being taken at the same point in time.

With the case where a single domain receives a TakeSnapshot from multiple parents I was thinking it could be solved by delaying the snapshot until the last marker has been received, but diamonds definitely make this even harder. I'll have to investigate further though and see how the vector-timestamp changes might affect it. Part of the idea behind the marker packet is definitely to be able to create completely consistent snapshots though, even if it's not there yet.

  1. The performance results are great! However, I'm a little surprised to see that both lines have the same slope -- wouldn't you expect the cost of snapshot recovery to level off at some point as the number of votes increases? The size of the VoteCount aggregation view, for example, would certainly remain constant once all article keys have votes.

That's a very good point! I forgot to mention that I also increased the amount of articles in step with the votes (starting at 10,000 articles and 100,000 votes, and doubling both at each step), which in hindsight probably wasn't a good idea. I ran the benchmarks again with a fixed number of articles (100,000), which seems to better show the effect of recovering VoteCount. It'd definitely be interesting to try to look at the recovery time of a single node though - as you're suggesting.

image

  1. What do you believe is imposing the 2% overhead? As I understand the implementation, you have a separate snapshot worker thread, and on receiving a TakeSnapshot packet, the domain copies the entire state of an operator and sends the copy to the snapshot worker, who then asynchronously writes it to disk while normal write processing continues.

I was thinking that the 2% overhead mostly came from the fact that the state of nodes and readers had to be cloned when the packet arrives, which probably isn't completely non-trivial for larger amounts of data. I definitely agree that there's probably other benchmarks that might be more interesting here though - would be useful to try the open-loop benchmark as suggested.

@jonhoo:

Does this mean that the controller won't start the next snapshot until the previous one has fully completed? With only one writer per worker pool, I'd worry that it'd be difficult for the controller to satisfy any but the most liberal snapshot timeouts.

Indeed - a new snapshot is only initialized if all the acknowledgments from the previous one have been received. I was worried of what you're saying as well, and it would probably be useful to somehow make sure that the benchmarks actually produce an expected level of snapshots (as otherwise it'd be quite pointless to test at different snapshot timeouts). I did some anecdotal testing while performing the benchmarks though, and decreasing the snapshot timeout did at least cause the amount of snapshots created to go up. The time it takes for bincode to serialize and deserialize large snapshots is pretty significant, so this is definitely something that should be looked into.

ms705 commented 6 years ago

@ekmartin Do you think we should work towards merging this to avoid continual rebasing onto master, or is it too likely to break things at the current point in time?

ekmartin commented 6 years ago

@ms705: I think it'd be nice to get it reviewed, and possibly merged. The snapshotting is completely turned off by default, so hopefully it shouldn't introduce bugs in the core functionality. As we've discussed previously there's definitely still issues to resolve in terms of diamonds and snapshot consistency, but I'm not sure if that should be a blocker.

I've made a few minor changes since my last comment:

ms705 commented 6 years ago

After discussion, we decided not to merge this for now. However, we'll still want to merge it into a branch under mit-pdos/distributary.

jonhoo commented 6 years ago

Adopted in https://github.com/mit-pdos/distributary/tree/snapshotting