tensorflow / probability

Probabilistic reasoning and statistical analysis in TensorFlow
https://www.tensorflow.org/probability/
Apache License 2.0
4.22k stars 1.09k forks source link

TypeError: __init__() missing 1 required positional argument: 'distribution' #1711

Open bumie-e opened 1 year ago

bumie-e commented 1 year ago

I tried to use Ray Tune with with tfp.NoUTurn Sampler but I got this error TypeError: __init__() missing 1 required positional argument: 'distribution'. I tried it with HMC and VI and got the same error. It seems tfp can't find Ray tune's values for the specified hyperparameter.

params = {'num_burnin_steps': tune.randint(30, 100)}

def train model(config):
    #preceding code here...
    num_burnin_steps =config["num_burnin_steps"]
    sampler = tfp.mcmc.TransformedTransitionKernel(
        tfp.mcmc.NoUTurnSampler(
            target_log_prob_fn=pinned_model.unnormalized_log_prob,
            step_size=0.1),
        bijector=constraining_bijector)

   remaining code here...

return {'score': score}

import ray tune and define your parameters

trainable_with_resources = tune.with_resources(train_model, {"cpu": 1})
tuner = tune.Tuner(
    trainable_with_resources,
    param_space=trial_space,
    tune_config=tune.TuneConfig(num_samples=10)
)
results = tuner.fit()`

Tensorflow probability version: 0.19.0 Ray Tune version: 2.3.1

brianwa84 commented 1 year ago

What is in constraining_bijector? Consider using tfp.experimental.mcmc.windowed_adaptive_nuts(..) instead. It's not clear how to further debug this without a stack trace or more code.

Brian Patton | Software Engineer | @.***

On Tue, Apr 11, 2023 at 4:41 AM bumie @.***> wrote:

I tried to use Ray Tune with with tfp.NoUTurn Sampler but I got this error TypeError: init() missing 1 required positional argument: 'distribution'. I tried it with HMC and VI and got the same error. It seems tfp can't find Ray tune's values for the specified hyperparameter.

params = {'num_burnin_steps': tune.randint(30, 100)}

def train model(config): preceding code here...

num_burnin_steps =config["num_burnin_steps"] sampler = tfp.mcmc.TransformedTransitionKernel( tfp.mcmc.NoUTurnSampler( target_log_prob_fn=pinned_model.unnormalized_log_prob, step_size=0.1), bijector=constraining_bijector)

remaining code here...

return {'score': score} import ray tune

trainable_with_resources = tune.with_resources(train_model, {"cpu": 1}) tuner = tune.Tuner( trainable_with_resources, param_space=trial_space, tune_config=tune.TuneConfig(num_samples=10) ) results = tuner.fit()

— Reply to this email directly, view it on GitHub https://github.com/tensorflow/probability/issues/1711, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFJFSIZAFJ4JHEBFSPXTVMDXAUKKLANCNFSM6AAAAAAWZ6TBSA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

bumie-e commented 1 year ago

Hi,

Thank you for the quick response.

Here's the full code

  def build_model(observed_time_series, data):

      smooth_seasonal = sts.SmoothSeasonal(
          period=365.25,  # Annual period
          observed_time_series= observed_time_series,
          frequency_multipliers=[1, 2, 3],
          name='smooth_seasonal'
      )
      autoregressive = tfp.sts.Autoregressive(
          order=7,
          observed_time_series=observed_time_series,
          coefficients_prior=None,
          level_scale_prior=None,
          initial_state_prior=None,
          name='autoregressive')

      model_components = [
          smooth_seasonal,
          autoregressive,
      ]

      model = tfp.sts.Sum(model_components,
                      observed_time_series=observed_time_series)
      return model

  sales_model = build_model(sales_y_data, sales_data)

  positive_bijector = tfb.Softplus()
  approximate_unconstrained_rates = positive_bijector.inverse(
      tf.convert_to_tensor(sales_y_data) + 0.01)

  def sts_with_poisson_likelihood_model():

    param_vals = []
    for param in sales_model.parameters:
      param_val = yield param.prior
      param_vals.append(param_val)

    unconstrained_rate = yield sales_model.make_state_space_model(
        14, param_vals)
    rate = positive_bijector.forward(unconstrained_rate[..., 0])
    observed_counts = yield tfd.Poisson(rate, name='observed_counts')

  model = tfd.JointDistributionCoroutineAutoBatched(sts_with_poisson_likelihood_model)

  def train_model(config):

      start_time = time.time()

      num_burnin_steps =config["num_burnin_steps"]
      sampler = tfp.mcmc.TransformedTransitionKernel(
          tfp.mcmc.NoUTurnSampler(
              target_log_prob_fn=pinned_model.unnormalized_log_prob,
              step_size=0.1),
          bijector=constraining_bijector)

      adaptive_sampler = tfp.mcmc.DualAveragingStepSizeAdaptation(
          inner_kernel=sampler,
          num_adaptation_steps=int(0.8 * num_burnin_steps),
          target_accept_prob=0.75)

      initial_state = constraining_bijector.forward(
          type(pinned_model.event_shape)(
              *(tf.random.normal(part_shape)
                for part_shape in constraining_bijector.inverse_event_shape(
                    pinned_model.event_shape))))

      num_results = 14
      @tf.function(autograph=False, jit_compile=True)
      def do_sampling():
        return tfp.mcmc.sample_chain(
            kernel=adaptive_sampler,
            current_state=initial_state,
            num_results=num_results,
            num_burnin_steps=num_burnin_steps,
            trace_fn=None)

      t0 = time.time()
      samples = do_sampling()
      t1 = time.time()

      return {"time": t1-t0}

  # Define trial parameters 
  trial_space = {
      "num_burnin_steps": tune.randint(30, 100),
  }
  trainable_with_resources = tune.with_resources(train_model, {"cpu": 1})
  tuner = tune.Tuner(
      trainable_with_resources,
      param_space=trial_space,
      tune_config=tune.TuneConfig(num_samples=10)
  )
  results = tuner.fit()
brianwa84 commented 1 year ago

Looks OK to me (apart from the raytune part, which I dropped). Suggests the problem is actually in tune. constraining_bijector was still undefined, so I defined it. https://colab.research.google.com/gist/brianwa84/3c0c6859b07607416380a1e83be5e430/untitled47.ipynb

Brian Patton | Software Engineer | @.***

On Wed, Apr 12, 2023 at 9:11 AM bumie @.***> wrote:

Hi,

Thank you for the quick response.

Here's the full code

def build_model(observed_time_series, data):

  smooth_seasonal = sts.SmoothSeasonal(
      period=365.25,  # Annual period
      observed_time_series= observed_time_series,
      frequency_multipliers=[1, 2, 3],
      name='smooth_seasonal'
  )
  autoregressive = tfp.sts.Autoregressive(
      order=7,
      observed_time_series=observed_time_series,
      coefficients_prior=None,
      level_scale_prior=None,
      initial_state_prior=None,
      name='autoregressive')

  model_components = [
      smooth_seasonal,
      autoregressive,
  ]

  model = tfp.sts.Sum(model_components,
                  observed_time_series=observed_time_series)
  return model

sales_model = build_model(sales_y_data, sales_data)

positive_bijector = tfb.Softplus() approximate_unconstrained_rates = positive_bijector.inverse( tf.convert_to_tensor(sales_y_data) + 0.01)

def sts_with_poisson_likelihood_model():

param_vals = []
for param in sales_model.parameters:
  param_val = yield param.prior
  param_vals.append(param_val)

unconstrained_rate = yield sales_model.make_state_space_model(
    14, param_vals)
rate = positive_bijector.forward(unconstrained_rate[..., 0])
observed_counts = yield tfd.Poisson(rate, name='observed_counts')

model = tfd.JointDistributionCoroutineAutoBatched(sts_with_poisson_likelihood_model)

def train_model(config):

  start_time = time.time()

  num_burnin_steps =config["num_burnin_steps"]
  sampler = tfp.mcmc.TransformedTransitionKernel(
      tfp.mcmc.NoUTurnSampler(
          target_log_prob_fn=pinned_model.unnormalized_log_prob,
          step_size=0.1),
      bijector=constraining_bijector)

  adaptive_sampler = tfp.mcmc.DualAveragingStepSizeAdaptation(
      inner_kernel=sampler,
      num_adaptation_steps=int(0.8 * num_burnin_steps),
      target_accept_prob=0.75)

  initial_state = constraining_bijector.forward(
      type(pinned_model.event_shape)(
          *(tf.random.normal(part_shape)
            for part_shape in constraining_bijector.inverse_event_shape(
                pinned_model.event_shape))))

  num_results = 14
  @tf.function(autograph=False, jit_compile=True)
  def do_sampling():
    return tfp.mcmc.sample_chain(
        kernel=adaptive_sampler,
        current_state=initial_state,
        num_results=num_results,
        num_burnin_steps=num_burnin_steps,
        trace_fn=None)

  t0 = time.time()
  samples = do_sampling()
  t1 = time.time()

  return {"time": t1-t0}

Define trial parameters

trial_space = { "num_burnin_steps": tune.randint(30, 100), } trainable_with_resources = tune.with_resources(train_model, {"cpu": 1}) tuner = tune.Tuner( trainable_with_resources, param_space=trial_space, tune_config=tune.TuneConfig(num_samples=10) ) results = tuner.fit()

— Reply to this email directly, view it on GitHub https://github.com/tensorflow/probability/issues/1711#issuecomment-1505253989, or unsubscribe https://github.com/notifications/unsubscribe-auth/AFJFSIZ3M7IENBCQ73U37DTXA2SYFANCNFSM6AAAAAAWZ6TBSA . You are receiving this because you commented.Message ID: @.***>

bumie-e commented 1 year ago

Oh, I see. Perhaps the error is with Raytune then. Thank you!

bumie-e commented 1 year ago

Hi,

It seems the error happens when I try to use distributed training with tfp. I replaced RayTune with Optuna, ran the model on a Dask Cluster with 2 GPUs and 2 CPUs, and got the same error. I tried to investigate the issue with distributed training by following an example implementation of Sharded Distribution with Jax but got stuck trying to share the training data across devices. How do I resolve this?

Here's the link to the Jax Implementation https://www.tensorflow.org/probability/examples/TensorFlow_Probability_on_JAX

brianwa84 commented 1 year ago

Do you get a stack trace with the exception?

bumie-e commented 1 year ago

Yes. Stack trace with RayTune and Optuna as the optimizer algorithm

I updated the notebook you sent me with the code.

https://colab.research.google.com/gist/bumie-e/4af2d03e9c446d2de374ebb03ccd41dd/untitled47.ipynb

/usr/local/lib/python3.9/dist-packages/ray/tune/search/optuna/optuna_search.py:685: FutureWarning: IntUniformDistribution has been deprecated in v3.0.0. This feature will be removed in v6.0.0. See https://github.com/optuna/optuna/releases/tag/v3.0.0. Use :class:`~optuna.distributions.IntDistribution` instead.
  return ot.distributions.IntUniformDistribution(
[I 2023-04-20 15:35:06,736] A new study created in memory with name: optuna
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
[<ipython-input-6-7f2989bce142>](https://localhost:8080/#) in <cell line: 15>()
     13     )
     14 )
---> 15 results = tuner.fit()

14 frames
[/usr/local/lib/python3.9/dist-packages/ray/tune/tuner.py](https://localhost:8080/#) in fit(self)
    290         if not self._is_ray_client:
    291             try:
--> 292                 return self._local_tuner.fit()
    293             except TuneError as e:
    294                 raise TuneError(

[/usr/local/lib/python3.9/dist-packages/ray/tune/impl/tuner_internal.py](https://localhost:8080/#) in fit(self)
    453         if not self._is_restored:
    454             param_space = copy.deepcopy(self._param_space)
--> 455             analysis = self._fit_internal(trainable, param_space)
    456         else:
    457             analysis = self._fit_resume(trainable)

[/usr/local/lib/python3.9/dist-packages/ray/tune/impl/tuner_internal.py](https://localhost:8080/#) in _fit_internal(self, trainable, param_space)
    570             **self._tuner_kwargs,
    571         }
--> 572         analysis = run(
    573             **args,
    574         )

[/usr/local/lib/python3.9/dist-packages/ray/tune/tune.py](https://localhost:8080/#) in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, search_alg, scheduler, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, chdir_to_trial_dir, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, raise_on_failed_trial, callbacks, max_concurrent_trials, trial_executor, _experiment_checkpoint_dir, _remote, _remote_string_queue)
    754     )
    755     while not runner.is_finished() and not experiment_interrupted_event.is_set():
--> 756         runner.step()
    757         if has_verbosity(Verbosity.V1_EXPERIMENT):
    758             _report_progress(runner, progress_reporter)

[/usr/local/lib/python3.9/dist-packages/ray/tune/execution/trial_runner.py](https://localhost:8080/#) in step(self)
    951             )
    952 
--> 953         next_trial = self._update_trial_queue_and_get_next_trial()
    954         if next_trial:
    955             logger.debug(f"Got new trial to run: {next_trial}")

[/usr/local/lib/python3.9/dist-packages/ray/tune/execution/trial_runner.py](https://localhost:8080/#) in _update_trial_queue_and_get_next_trial(self)
    887             # Create pending trials until it fails.
    888             while num_pending_trials < self._max_pending_trials:
--> 889                 if not self._update_trial_queue(blocking=wait_for_trial):
    890                     break
    891                 wait_for_trial = False  # wait at most one trial

[/usr/local/lib/python3.9/dist-packages/ray/tune/execution/trial_runner.py](https://localhost:8080/#) in _update_trial_queue(self, blocking, timeout)
   1473             Boolean indicating if a new trial was created or not.
   1474         """
-> 1475         trial = self._search_alg.next_trial()
   1476         if blocking and not trial:
   1477             start = time.time()

[/usr/local/lib/python3.9/dist-packages/ray/tune/search/search_generator.py](https://localhost:8080/#) in next_trial(self)
     98         """
     99         if not self.is_finished():
--> 100             return self.create_trial_if_possible(
    101                 self._experiment.spec, self._experiment.dir_name
    102             )

[/usr/local/lib/python3.9/dist-packages/ray/tune/search/search_generator.py](https://localhost:8080/#) in create_trial_if_possible(self, experiment_spec, output_path)
    123         self._counter += 1
    124         tag = "{0}_{1}".format(str(self._counter), format_vars(flattened_config))
--> 125         trial = _create_trial_from_spec(
    126             spec,
    127             output_path,

[/usr/local/lib/python3.9/dist-packages/ray/tune/experiment/config_parser.py](https://localhost:8080/#) in _create_trial_from_spec(spec, output_path, parser, **trial_kwargs)
    227     checkpoint_config = spec.get("checkpoint_config", CheckpointConfig())
    228 
--> 229     return Trial(
    230         # Submitting trial via server in py2.7 creates Unicode, which does not
    231         # convert to string in a straightforward manner.

[/usr/local/lib/python3.9/dist-packages/ray/tune/experiment/trial.py](https://localhost:8080/#) in __init__(self, trainable_name, config, trial_id, local_dir, evaluated_params, experiment_tag, resources, placement_group_factory, stopping_criterion, experiment_dir_name, sync_config, checkpoint_config, export_formats, restore_path, trial_name_creator, trial_dirname_creator, log_to_file, max_failures, stub, _setup_default_resource)
    300         self.experiment_tag = experiment_tag
    301         self.location = _Location()
--> 302         trainable_cls = self.get_trainable_cls()
    303         if trainable_cls and _setup_default_resource:
    304             default_resources = trainable_cls.default_resource_request(self.config)

[/usr/local/lib/python3.9/dist-packages/ray/tune/experiment/trial.py](https://localhost:8080/#) in get_trainable_cls(self)
    868         if self.stub:
    869             return None
--> 870         return get_trainable_cls(self.trainable_name)
    871 
    872     def get_trial_checkpoints(self) -> List[_TrackedCheckpoint]:

[/usr/local/lib/python3.9/dist-packages/ray/tune/registry.py](https://localhost:8080/#) in get_trainable_cls(trainable_name)
     44 def get_trainable_cls(trainable_name):
     45     validate_trainable(trainable_name)
---> 46     return _global_registry.get(TRAINABLE_CLASS, trainable_name)
     47 
     48 

[/usr/local/lib/python3.9/dist-packages/ray/tune/registry.py](https://localhost:8080/#) in get(self, category, key)
    213                     "Registry value for {}/{} doesn't exist.".format(category, key)
    214                 )
--> 215             return pickle.loads(value)
    216         else:
    217             return pickle.loads(self._to_flush[(category, key)])

[/usr/local/lib/python3.9/dist-packages/tensorflow_probability/python/distributions/sample.py](https://localhost:8080/#) in __new__(cls, *args, **kwargs)
    368 
    369       if not isinstance(distribution, tf.__internal__.CompositeTensor):
--> 370         return _Sample(*args, **kwargs)
    371     return super(Sample, cls).__new__(cls)
    372 

TypeError: __init__() missing 1 required positional argument: 'distribution'