HazyResearch / deepdive

DeepDive
deepdive.stanford.edu
1.95k stars 539 forks source link

FG sharding: process a 100TB FG on one box or many #592

Closed alldefector closed 7 years ago

alldefector commented 7 years ago

Fixes #478 and getting ready for scale-out (with numbskull).

The factor graph of a typical DD app consists of many connected components (e.g., one per doc) that each fits in memory. The entire FG however sometimes is (much) larger than one box's memory size. This change enables a user to specify whether and how the FG should be sharded so that DD can process a 100TB graph with one box with 16GB of RAM, or on a cluster in the near future (say with numbskull).

To use sharding, only two lines of change is needed (see examples/census):

  1. In app.ddlog, annotate partition key columns of a variable relation with @partition.
  2. In deepdive.conf, specify deepdive.sampler.partitions.

With these changes, this is what DD does that's different from before:

  1. Before, the var IDs take range [0...$num_vars). With sharding, each shard i has its own var ID space [0..$num_vars_for_shard_i). Internally, the dd_variables_... table has a new field dd__part taking value i, and the var ID takes value i << 48 + var_id_for_shard_i. The ID assignment code has been revamped and actually simplified to support this multi-namespace change. Grounding SQL is the same as before (i.e., not broken down for each shard), with the factor tables taking the multi-namespace var IDs.
  2. We assume that no factor ever crosses multiple user-specified partition keys (with @partition). The FG's variables and factors are hence cleanly partitioned. But each shard shares all the weights. For now, we assume that all weights fit in memory. When dumping the FG into the DW binary format, we dump each shard in turn, in the process clearing the first 16 bit of the 64-bit var IDs (for both var dump and factor dump) so that the sampler doesn't need to accommodate. After each shard's inference, when loading the marginal probabilities into the DB, we add back the first 16 bit shard ID.
  3. If deepdive.sampler.partitions = 1 (which is default), the learning / inference flow is same as before. If deepdive.sampler.partitions > 1, right now, we simply run learning on each shard in turn (shard k+1 starting with weights from shard k), and then run inference on each shard in turn (using weights produced by the last shard during learning). We defer systematic multi-shard learning algo design to the near future (numbskull in particular).
  4. The new ID assignment code now requires the DB to have plpythonu enabled.

I'm going to do more testing in the next few days. In the meantime, would like to hear everyone's feedback about the design and details.

chrismre commented 7 years ago

Grounding could be a bottleneck. Is the code that generates a partition on the fly too fragile or something? As it seems like being able to generate the partitions on demand (or not) would be helpful. That could be a down the road feature, but I'm curious about this design choice (e.g., we could avoid ever touching disk).

alldefector commented 7 years ago

Right, once we have sampler scale-out, the big joins and disk IO can be the bottleneck. It's actually easy to do per-partition on-demand grounding with the current code and never have the factors hit the disk. To make that happen, there are two things we need to change:

  1. Right now, for each inference rule, we materialize the factor table F that's essentially the rule's conjunctive query joined with the variable ID assignment tables. F is then used in two ways: 1) to generate distinct weight parameters for ID assignment; 2) to generate DW binary format factors for the sampler (joining F with weight ID assignment and category ID assignment). Having these two use cases was the reason that we wanted to materialize F. But in hindsight, it probably makes more sense to have F as a view for the first use case... For the second use case, a view can probably save a lot of IO as well... @netj thoughts?
  2. The current DD execution mode is a self-contained workflow. Almost by definition of "on demand", we need to turn the shell scripts driven learning / inference process into a cleaner model-control architecture where data exchange, scheduling, and callbacks can happen between the DB and sampler services.
alldefector commented 7 years ago

@chrisdesa let me know if this change is in harmony with your partitioning algo.

netj commented 7 years ago

@alldefector This is awesome. Will take a closer look at the code later, but only changing the id assignment sounds neat. So, the id assignment keeps each shard's lower 48bits ids compact/sequential, right?

  1. I remember that materializing F (or at least the user's input_query) was beneficial even for just the first case, finding all distinct weights, so back then when I was switching default to views, I gave up turning everything into views except the ones that need id assignment. However, this may have changed since the flattening (PR #589). Maybe a simple heuristic-based optimizer might work that depends on the existence of join in F.
  2. I'm a strong believer of execution plans, so I think we can/should compile a series of commands for the scale-up/out execution as well. This will be a bad idea if the algorithm is going to become completely stochastic or data driven, but explicitly writing down what the system will do has been very helpful for understanding and debugging than analyzing the logs and control flow to understand what the system was trying to do in the past.

    • We can keep everything as views, define a few key operations for the on-demand partition grounding and construct a sampler driver in terms of those, e.g., dump or stream V/F/W/D for partition i. The operations can be codegen by DD as we create the companion views.
    • Scale-up driver will look like a script with a simple for-loop invoking the sampler for each partition.
    • Scale-out could also look like a for-loop with a slightly expanded body, interspersed with extra ops, such as merging weights and broadcasting them.
    • The low-level scheduling/callback can just rely on ssh remote execution as opposed to reinventing a custom protocol. The sampler can simply expose a command-line entrypoint to each step/sub-step of the algorithm, checkpointing/resuming or mmap'ing inputs. FG shard and weights could be streamed over ssh as well or just rely on a shared filesystem. Easy to repeat/reproduce a range of epochs and debug.

    This is how we were planning scale-up/out with C++ DW in mind prior to NS. Some parts may not be feasible for Python (e.g., mmap), so long living processes/services might be a must for holding the loaded FG shard. However, I'd still push the idea for having a layer of abstraction between the algorithm and the low-level bits (reusing as much as possible) and a functional architecture to make it easy to run/test each step of a distributed setup that can easily become complicated and awkward to debug. (Sorry if NS is already on this direction, I'm not fully following the developments there)

alldefector commented 7 years ago

Yes, 16 bits for shard ID, 48 bits for compact var ID.

Re F materialization, the issue is that when you have a rule like @weight(type) Honest(p) :- PeopleLocation(p, city), CityType(city, type) where PeopleLocation is a huge table and CityType is a small table, to find distinct weights from the materialized table, we'd scan a huge table. But if F is a view, the DB might be smart enough to ignore the join and get them from the small table directly. Of course, there are more complicated weight params that would require the join -- in which case we'd evaluate the join twice and materialization may be more efficient. Maybe we could use EXPLAIN to decide?

Re sampler interface, numbskull is actually currently using mmap for data loading: https://github.com/HazyResearch/numbskull/blob/master/numbskull/numbskull.py#L257

alldefector commented 7 years ago

Tested on real-world work loads, and all work as expected (linearly reduced memory, linearly faster epoch speed, similar weights, probs, and quality numbers).

Piggybacked a bunch of nasty bug fixes (destroying database, data unloading race condition due to text2bin >(pbzip2 ...)).

Chatted with @thodrek briefly yesterday, who's going to make it really fly on multiple hosts. @thodrek changing this line allows you to avoid materializing the factors: https://github.com/HazyResearch/deepdive/blob/30aeaafc41aaec856bb49e35cbe0fba319a65fef/compiler/compile-config/compile-config-2.01-grounding#L378

thodrek commented 7 years ago

This is awesome! I'll test it once with distributed NS and let you know. Once done with the tests I'll go over the pull request carefully :)

alldefector commented 7 years ago

@thodrek Can I land this so you can build other fancy things on top?

thodrek commented 7 years ago

Yep go ahead :)