probcomp / bayeslite

BayesDB on SQLite. A Bayesian database table for querying the probable implications of data as easily as SQL databases query the data itself.
Apache License 2.0
918 stars 64 forks source link

Make Loom backend interrupt safe with ctrl+c by clearing broken cached query servery #627

Open fsaad opened 5 years ago

fsaad commented 5 years ago

Interrupting a SIMULATE query using ctrl+c causes an IOError: [Errno 32] Broken pipe, presumably due to the cached server breaking as a result of the interrupt.

The IOError can be repaired by removing the offending query server from the cache:

del bdb.backends['loom']._cache[bdb][1]

We should wrap the line server._predict(reader, num_samples, writer, False) in the stack trace below around a try-except block which captures the IOError and resets the server in the event that the pipe is broken.

============ Full stack trace

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in execute(self, string, bindings)
    230             bindings = ()
    231         return self._maybe_trace(
--> 232             self.tracer, self._do_execute, string, bindings)
    234     def _maybe_trace(self, tracer, meth, string, bindings):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in _maybe_trace(self, tracer, meth, string, bindings)
    238         if tracer:
    239             tracer(string, bindings)
--> 240         return meth(string, bindings)
    242     def _qid(self):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in _do_execute(self, string, bindings)
    279         else:
    280             raise ValueError('>1 phrase in string')
--> 281         cursor = bql.execute_phrase(self, phrase, bindings)
    282         return self._empty_cursor if cursor is None else cursor

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in execute_phrase(bdb, phrase, bindings)
     92             ifnotexists = 'IF NOT EXISTS ' if phrase.ifnotexists else ''
     93             out.write('CREATE %sTABLE %s%s AS ' % (temp, ifnotexists, qt))
---> 94             compiler.compile_query(bdb, phrase.query, out)
     95             winders, unwinders = out.getwindings()
     96             with compiler.bayesdb_wind(bdb, winders, unwinders):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in compile_query(bdb, query, out)
    212     :param Output out: output accumulator
    213     """
--> 214     _compile_query(bdb, query, BQLCompiler_None(), out)
    216 def _compile_query(bdb, query, bql_compiler, out):

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in _compile_query(bdb, query, bql_compiler, out)
    252         compile_infer_auto(bdb, query, out)
    253     elif isinstance(query, ast.Simulate):
--> 254         compile_simulate(bdb, query, out)
    255     elif isinstance(query, ast.SimulateModels):
    256         compile_simulate_models(bdb, query, bql_compiler, out)

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in compile_simulate(bdb, simulate, out)
    716                 bdb, population_id, generator_id, modelnos,
    717                 constraints, colnos, numpredictions=nsamples,
--> 718                 accuracy=simulate.accuracy
    719             ):
    720             out.winder(insert_sql, row)

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in bayesdb_simulate(bdb, population_id, generator_id, modelnos, constraints, colnos, numpredictions, accuracy)
    571     else:
    572         counts = []
--> 573     rowses = map(simulate, generator_ids, backends, counts)
    574     all_rows = [row for rows in rowses for row in rows]
    575     assert all(isinstance(row, (tuple, list)) for row in all_rows)

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/ in simulate(generator_id, backend, n)
    547         return backend.simulate_joint(
    548             bdb, generator_id, modelnos, rowid, colnos, constraints,
--> 549             num_samples=n, accuracy=accuracy)
    550     generator_ids = _retrieve_generator_ids(bdb, population_id, generator_id)
    551     backends = [

/scratch/fsaad/bayeslite/build/lib.linux-x86_64-2.7/bayeslite/backends/ in simulate_joint(self, bdb, generator_id, modelnos, rowid, targets, constraints, num_samples, accuracy)
    804         # Obtain the prediction.
--> 805         server._predict(reader, num_samples, writer, False)
    807         # Parse the CSV output.

/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/preql.pyc in _predict(self, reader, count, writer, id_offset)
    331                 to_sample,
    332                 conditioning_row,
--> 333                 count)
    334             for sample in samples:
    335                 sample = self.decode_row(sample, header)

/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/query.pyc in sample(self, to_sample, conditioning_row, sample_count)
    152         request.sample.to_sample.dense[:] = to_sample
    153         request.sample.sample_count = sample_count
--> 154         self.protobuf_server.send(request)
    155         response = self.protobuf_server.receive()
    156         if response.error:

/scratch/fsaad/.pyenv2.7/local/lib/python2.7/site-packages/loom/query.pyc in send(self, request)
    334         request_string = request.SerializeToString()
    335         protobuf_stream_write(request_string, self.proc.stdin)
--> 336         self.proc.stdin.flush()
    338     def receive(self):

IOError: [Errno 32] Broken pipe