engine.compose_cgpm with multiprocessing fails with vscgpm #504

Open fsaad opened 7 years ago

fsaad commented 7 years ago

Using A Causal Probabilistic Program & Non-Parametric Bayes To Model Kepler's Law

In this notebook, we are going to write a custom probabilistic program in VentureScript to implement at CGPM which models the conditional distribution of period_minutes given apogee_km and perigee_km. In particular, we are going to (non-parametrically) learn a clustering of satellites, based on the magnitude of deviation of their actual period_minutes (given their apogee_km and perigee_km) to their theoretical period_minutes implied by Kepler's Law.

Our database is the satellites dataset from the Union of Concerned Scientists as the population of interest.

Prepare the notebook and .bdb file.

%load_ext iventure.magics
%matplotlib inline
The iventure.magics extension is already loaded. To reload it, use:
  %reload_ext iventure.magics
import os;
if os.path.exists('bdbs/satellites_kepler.bdb'):
%bayesdb -j bdbs/satellites_kepler.bdb
u'Loaded: bdbs/satellites_kepler.bdb'

__Create a table satellites_t containing satellite data records from "satellites.csv".__

CREATE TABLE satellites_ucs FROM '../../resources/satellites.csv'
.nullify satellites_ucs 'NaN'

Write a VentureScript program to represent the CGPM for period | apogee, perigee. Also expose the latent variables of the program, namely the cluster identity of each satellites and its deviation from the "true" Keplerian period.

// Kepler CGPM.
define kepler = () -> {
  // Kepler's law.
  assume keplers_law = (apogee, perigee) -> {
    let GM = 398600.4418;
    let earth_radius = 6378;
    let a = (abs(apogee) + abs(perigee)) *
        0.5 + earth_radius;
    2 * 3.1415 * sqrt(a**3 / GM) / 60
  // Internal samplers.
  assume crp_alpha = .5;
  assume cluster_sampler = make_crp(crp_alpha);
  assume error_sampler = mem((cluster) ->
        make_nig_normal(1, 1, 1, 1));
  // Output simulators.
  assume sim_cluster_id =
    mem((rowid, apogee, perigee) ~> {
      tag(atom(rowid), atom(1), cluster_sampler())
  assume sim_error =
    mem((rowid, apogee, perigee) ~> {
      let cluster_id = sim_cluster_id(
        rowid, apogee, perigee);
      tag(atom(rowid), atom(2),
  assume sim_period =
    mem((rowid, apogee, perigee) ~> {
      keplers_law(apogee, perigee) +
        sim_error(rowid, apogee, perigee)
  // List of simulators.
  assume simulators = [
    sim_period, sim_cluster_id, sim_error];
// Output observers.
define obs_cluster_id =
  (rowid, apogee, perigee, value, label) -> {
    $label: observe sim_cluster_id(
      $rowid, $apogee, $perigee) = atom(value);
define obs_error =
  (rowid, apogee, perigee, value, label) -> {
    $label: observe sim_error(
      $rowid, $apogee, $perigee) = value;
define obs_period =
  (rowid, apogee, perigee, value, label) -> {
    let theoretical_period = run(
      sample keplers_law($apogee, $perigee));
      rowid, apogee, perigee,
      value - theoretical_period, label);
// List of observers.
define observers = [
  obs_period, obs_cluster_id, obs_error];
// List of inputs.
define inputs = ["apogee", "perigee"];
// Transition operator.
define transition = (N) -> {mh(default, one, N)};

MML program for a create a hybrid CGPM, which composes crosscat with the kepler VentureScript defined in the cell above, as well as other CGPMs available in the cgpm library.

CREATE GENERATOR satellites_hybrid FOR satellites WITH BASELINE crosscat(

    GIVEN apogee_km, perigee_km
        kepler_cluster CATEGORICAL,
        kepler_residual NUMERICAL
        venturescript(mode=venture_script, sp=kepler);


Initialize a model and run inference transitions.

%mml INITIALIZE 1 MODELS FOR satellites_hybrid;


RuntimeError                              Traceback (most recent call last)

<ipython-input-32-4136f672b2c0> in <module>()
----> 1 get_ipython().magic(u'mml INITIALIZE 1 MODELS FOR satellites_hybrid;')

/scratch/fs/.pyenv2.7.6/local/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2144         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2145         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2146         return self.run_line_magic(magic_name, magic_arg_s)
   2148     #-------------------------------------------------------------------------

/scratch/fs/.pyenv2.7.6/local/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2065                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2066             with self.builtin_trap:
-> 2067                 result = fn(*args,**kwargs)
   2068             return result

/scratch/fs/iventure/iventure/ in logged_cell_wrapper(self, line, cell)
    145             raw = self._retrieve_raw(line, cell)
    146             try:
--> 147                 output = func(self, line, cell)
    148             except:
    149                 exception = traceback.format_exc()

<decorator-gen-127> in mml(self, line, cell)

/scratch/fs/.pyenv2.7.6/local/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    190         if callable(arg):

/scratch/fs/iventure/iventure/ in mml(self, line, cell)
    289             return self._cmd(cmd_q)
    290         if bql_q:
--> 291             return self._bql(bql_q)
    293     @logged_cell

/scratch/fs/iventure/iventure/ in _bql(self, lines)
    320             if out.getvalue() and bql_string_complete_p(out.getvalue()):
    321                 ok = True
--> 322         cursor = self._bdb.execute(out.getvalue())
    323         return bqu.cursor_to_df(cursor)

/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in execute(self, string, bindings)
    213             bindings = ()
    214         return self._maybe_trace(
--> 215             self.tracer, self._do_execute, string, bindings)
    217     def _maybe_trace(self, tracer, meth, string, bindings):

/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in _maybe_trace(self, tracer, meth, string, bindings)
    221         if tracer:
    222             tracer(string, bindings)
--> 223         return meth(string, bindings)
    225     def _qid(self):

/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in _do_execute(self, string, bindings)
    262         else:
    263             raise ValueError('>1 phrase in string')
--> 264         cursor = bql.execute_phrase(self, phrase, bindings)
    265         return self._empty_cursor if cursor is None else cursor

/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in execute_phrase(bdb, phrase, bindings)
    490             # Do metamodel-specific initialization.
    491             metamodel = core.bayesdb_generator_metamodel(bdb, generator_id)
--> 492             metamodel.initialize_models(bdb, generator_id, modelnos)
    493         return empty_cursor(bdb)

/scratch/fs/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/metamodels/ in initialize_models(self, bdb, generator_id, modelnos)
    200             cgpms = [self._initialize_cgpm(bdb, generator_id, cgpm_ext)
    201                 for _ in xrange(n)]
--> 202             engine.compose_cgpm(cgpms, multiprocess=self._multiprocess)
    204         # Store the newly initialized engine.

/scratch/fs/cgpm/cgpm/crosscat/ in compose_cgpm(self, cgpms, multiprocess)
    132                 ())
    133                 for i in xrange(self.num_states())]
--> 134         self.states = mapper(_compose, args)
    136     def logpdf(self, rowid, query, evidence=None, accuracy=None, multiprocess=1):

/scratch/fs/cgpm/cgpm/utils/ in parallel_map(f, l, parallelism)
     97         while 0 < ctr[0]:
     98             j = le32dec(, 4))
---> 99             process_output(fl, ctr, outq[j][0].recv())
    101         # Cancel all the worker processes.

/scratch/fs/cgpm/cgpm/utils/ in process_output(fl, ctr, output)
     60         (i, ok, fx) = output
     61         if not ok:
---> 62             raise RuntimeError('Subprocess failed: %s' % (fx,))
     63         fl[i] = fx
     64         ctr[0] -= 1

RuntimeError: Subprocess failed: Traceback (most recent call last):
  File "/scratch/fs/cgpm/cgpm/utils/", line 55, in process_input
    outq_wr.send((i, ok, fx))
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed