pescadores / pescador

Stochastic multi-stream sampling for iterative learning
https://pescador.readthedocs.io
ISC License
76 stars 12 forks source link

Should streamers add metadata to exception traces? #135

Closed bmcfee closed 9 months ago

bmcfee commented 6 years ago

Pescador sometimes makes it tricky to debug systems. As a recent example, we had a setup where some streamers in a mux were failing due to a data error, which crashed the whole system, but it was not easy to determine which streamer caused the exception.

Here's how the stack looked:

ValueError                                Traceback (most recent call last)
<ipython-input-40-447dbd8814f3> in <module>()
----> 1 train('/scratch/yw3004/projects/deepunet/pump/000/', 1, 6.0, 8, 32, 100, 512, 10, 20, 10, seed)

<ipython-input-14-d40def4f1c66> in train(working, max_samples, duration, rate, batch_size, epochs, epoch_size, validation_size, early_stopping, reduce_lr, seed)
    105                         validation_data=gen_val,
    106                         validation_steps=validation_size,
--> 107                         callbacks=cb)
    108 
    109 

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/keras/legacy/interfaces.py in wrapper(*args, **kwargs)
     89                 warnings.warn('Update your `' + object_name +
     90                               '` call to the Keras 2 API: ' + signature, stacklevel=2)
---> 91             return func(*args, **kwargs)
     92         wrapper._original_function = func
     93         return wrapper

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/keras/engine/training.py in fit_generator(self, generator, steps_per_epoch, epochs, verbose, callbacks, validation_data, validation_steps, class_weight, max_queue_size, workers, use_multiprocessing, shuffle, initial_epoch)
   2242                                 workers=workers,
   2243                                 use_multiprocessing=use_multiprocessing,
-> 2244                                 max_queue_size=max_queue_size)
   2245                         else:
   2246                             # No need for try/except because

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/keras/legacy/interfaces.py in wrapper(*args, **kwargs)
     89                 warnings.warn('Update your `' + object_name +
     90                               '` call to the Keras 2 API: ' + signature, stacklevel=2)
---> 91             return func(*args, **kwargs)
     92         wrapper._original_function = func
     93         return wrapper

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/keras/engine/training.py in evaluate_generator(self, generator, steps, max_queue_size, workers, use_multiprocessing)
   2360 
   2361             while steps_done < steps:
-> 2362                 generator_output = next(output_generator)
   2363                 if not hasattr(generator_output, '__len__'):
   2364                     raise ValueError('Output of generator should be a tuple '

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/keras/utils/data_utils.py in get(self)
    791             success, value = self.queue.get()
    792             if not success:
--> 793                 six.reraise(value.__class__, value, value.__traceback__)

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/keras/utils/data_utils.py in _data_generator_task(self)
    656                             # => Serialize calls to
    657                             # infinite iterator/generator's next() function
--> 658                             generator_output = next(self._generator)
    659                             self.queue.put((True, generator_output))
    660                         else:

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/pescador/maps.py in keras_tuples(stream, inputs, outputs)
    155                             '`inputs` or `outputs`')
    156 
--> 157     for data in stream:
    158         try:
    159             x = list(data[key] for key in inputs) or None

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/pescador/maps.py in buffer_stream(stream, buffer_size, partial)
     57     n = 0
     58 
---> 59     for x in stream:
     60         data.append(x)
     61         n += 1

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/pescador/mux.py in iterate(self, max_iter)
    496                 try:
    497                     # Then yield the sample
--> 498                     yield six.advance_iterator(active_mux.streams_[idx])
    499 
    500                     # Increment the sample counter

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/pescador/core.py in iterate(self, max_iter)
    197         # Use self as context manager / calls __enter__() => _activate()
    198         with self as active_streamer:
--> 199             for n, obj in enumerate(active_streamer.stream_):
    200                 if max_iter is not None and n >= max_iter:
    201                     break

<ipython-input-39-d3b38cf7815d> in data_sampler_2(fname, max_samples, duration, pump, seed)
      3     sampler = make_sampler(max_samples, duration, pump, seed)
      4 
----> 5     for datum in sampler(load_h5(fname)):
      6         #print(fname)
      7         #print(duration)

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/pumpp/sampler.py in __call__(self, data)
    158             counter = count(0)
    159 
--> 160         for _, start in six.moves.zip(counter, self.indices(data)):
    161             yield self.sample(data, slice(start, start + self.duration))
    162 

~/miniconda3/envs/deepunet/lib/python3.6/site-packages/pumpp/sampler.py in indices(self, data)
    137         while True:
    138             # Generate a sampling interval
--> 139             yield self.rng.randint(0, duration - self.duration + 1)
    140 
    141     def __call__(self, data):

mtrand.pyx in mtrand.RandomState.randint()

ValueError: low >= high

As you can see, at the last three levels of the stack (pesc->custom code->pumpp), it shows the header of the Streamer function that caused the error: data_sampler_2(fname, ...), but it does not show the values of the parameters into the function. If you have a mux that fans out a grid of parameters over a common function -- a pretty typical use pattern -- then this information is basically useless for debugging.

This got me thinking: would it be possible for streamers to add metadata to the exception stack? That is, intercept the exception, modify the message, and re-raise it so that the parameters of the streamer are made explicit? Anyone care to look into the proper way to do this sort of thing?

cjacoby commented 6 years ago

I have definitely run into this too.

Yeah, I think that should be doable, and desirable. I've been doing things somewhat like this with scripts when I run them in docker / AWS ECS, because I always want to know exactly where the script failed, since all I have is the log to go by. python is pretty good for this sort of behavior.

bmcfee commented 5 years ago

Circling back: this could be much cleaner after we drop 2.7 #149 so that we can use the raise from ... syntax for exception chaining.

cjacoby commented 5 years ago

Yessssssssssssssss