tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.1k stars 706 forks source link

No module named 'user_module_0' kubeflow using DataflowRunner #4764

Closed feelingsonice closed 1 year ago

feelingsonice commented 2 years ago

If the bug is related to a specific library below, please raise an issue in the respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

Describe the current behavior

When using KFP version: 1.8.11 on Google Colab, running the pipeline with beam_pipeline_args --runner=DataflowRunner, I get the error "ModuleNotFoundError: No module named 'user_module_0'". Full stacktrace in the screenshot attached.

Describe the expected behavior

Trainer module. This is taken straight from the tutorial with some minor alterations:

from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

def preprocessing_fn(inputs):
  outputs = {}

  for key in _FEATURE_KEYS:
    outputs[key] = tft.scale_to_z_score(inputs[key])

  # the tutorial has this stored as strings. I manually imported this into a BQ table and the labels are ints
  outputs[_LABEL_KEY] = inputs[_LABEL_KEY]
  return outputs

def _get_serve_tf_examples_fn(model, tf_transform_output):
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def serve_tf_examples_fn(serialized_tf_examples):
    feature_spec = tf_transform_output.raw_feature_spec()
    required_feature_spec = {
        k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
    }
    parsed_features = tf.io.parse_example(serialized_tf_examples,
                                          required_feature_spec)

    transformed_features = model.tft_layer(parsed_features)

    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  dataset = data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
      schema=tf_transform_output.raw_metadata.schema)

  transform_layer = tf_transform_output.transform_features_layer()
  def apply_transform(raw_features):
    transformed_features = transform_layer(raw_features)
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label

  return dataset.map(apply_transform).repeat()

def _build_keras_model() -> tf.keras.Model:
  inputs = [
      keras.layers.Input(shape=(1,), name=key)
      for key in _FEATURE_KEYS
  ]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model

def run_fn(fn_args: tfx.components.FnArgs):
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

The pipeline definition, also taken straight from the tutorial with minimum modifications:

from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool,
                     beam_pipeline_args: Optional[List[str]]) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query)

  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  transform = tfx.components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      materialize=False,
      module_file=module_file)

  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_UCAIP_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.UCAIP_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-4',
  }

  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      transform,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      beam_pipeline_args=beam_pipeline_args)

import os

# I queried from a different table that has the labels preprocessed as ints
# I removed my project information here for privacy reasons
QUERY = "SELECT * FROM `tfx-oss-public.palmer_penguins.palmer_penguins`"
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
BIG_QUERY_WITH_DF_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--runner=DataflowRunner',
   '--region=us-central1',
   ]

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        use_gpu=False,
        beam_pipeline_args=BIG_QUERY_WITH_DF_RUNNER_BEAM_PIPELINE_ARGS))

Standalone code to reproduce the issue

Run it in google colab.

Screen Shot 2022-03-23 at 5 47 27 PM


For reference I see these two issues are still not resolved:[1, 2]

I tried the solutions suggestion by setting force_tf_compat_v1=True. Still got the same error.

It's also worth noting that my module is stored in GCS; module_file is a GCS URI.

In addition, I'm not importing anything like the other 2 issues. I just have one trainer.py and I'm just trying to run the tutorials.

feelingsonice commented 2 years ago

@pindinagesh is this a known issue? I don't see any other discussions on it but I happens to me on both 1.6.1 as well as 1.7.0.

1025KB commented 2 years ago

Is your module file on GCS (accessible by cloud)?

feelingsonice commented 2 years ago

Is your module file on GCS (accessible by cloud)?

You mean the dataflow job don't have access to the GCS bucket?

If so, it's possible, but I also don't see any logs stating that. I'm assuming there'd be obvious permission error logs.

feelingsonice commented 2 years ago

For context it only happens when I'm running it with DataflowRunner. The direct runner completes just fine so I don't see why the permission wouldn't be provisioned to the dataflow runner.

1025KB commented 2 years ago

it has, your transform component need to access the module file, your dataflow runner don't need to access module file

1025KB commented 2 years ago

seems your have the same issue as in here

feelingsonice commented 2 years ago

So I tried setting force_tf_compat_v1=True it didn't work for me

feelingsonice commented 2 years ago

It also seem like, from the issue you linked. That it happened when op used import_utils.import_func_from_source. im not using that.

I'm also on 1.7.0. Is this issue still present? Seems like a very common usage case and I'm not doing anything complex.

1025KB commented 2 years ago

so to confirm, your module file is on GCS, right?

feelingsonice commented 2 years ago

so to confirm, your module file is on GCS, right?

Yes

1025KB commented 2 years ago

Just curious, is this example (Kubeflow instead of Vertex) working for you

feelingsonice commented 2 years ago

Can you link me the colab? I can try it.

feelingsonice commented 2 years ago

@1025KB I tried the example. It did not work. I got the same error. But it's worth noting that I had to switch the DAG runner from kubeflow to kubeflowV2. Did it because the V1 runner generates a yaml that vertex ai for some reason dont accept. Maybe there's a easy fix but didn't have time to dig into it.

feelingsonice commented 2 years ago

Also I have a corporate gcp account and I had to manually push the data root to my own gcs bucket. I dont have permission other wise

1025KB commented 2 years ago

The kubeflowDagRunner should work because of this

I created a PR to add that to kubeflowDagRunner V2, but currently there is a bug in PR sync so the PR status became weird.

what I did is I added the following to KubeflowV2DagRunner.run():

for component in pipeline.components:
  # TODO(b/187122662): Pass through pip dependencies as a first-class
  # component flag.
  if isinstance(component, tfx_base_component.BaseComponent):
    component._resolve_pip_dependencies(  # pylint: disable=protected-access
        pipeline.pipeline_info.pipeline_root)
feelingsonice commented 2 years ago

Hmm. Sounds like I just need to switch kubeflowDagRunner to KubeflowV2DagRunner? Could you give me a quick summary of what's going here?

Also not sure if vertex ai just dont support kubeflowDagRunner or im missing something here but the pipelines on vertex ai only supports json. And kubeflowDagRunner don't seem to produce that.

1025KB commented 2 years ago

KubeflowDagRunner is for Kubeflow KubeflowV2DagRunner is for Vertex

KubeflowDagRunner should work, and we found a potential fix[1] for KubeflowV2DagRunner. But you mentioned you saw the same "No module named 'user_module_0'" error on both Kubeflow and Vertex, that I'm not sure what's happening...

[1] Adding this to KubeflowV2DagRunner.run function:

for component in pipeline.components:
  # TODO(b/187122662): Pass through pip dependencies as a first-class
  # component flag.
  if isinstance(component, tfx_base_component.BaseComponent):
    component._resolve_pip_dependencies(  # pylint: disable=protected-access
        pipeline.pipeline_info.pipeline_root)
feelingsonice commented 2 years ago

But you mentioned you saw the same "No module named 'user_module_0'" error on both Kubeflow and Vertex

I didn't. I'm strictly using vertex here.

1025KB commented 2 years ago

I see, them I misunderstand.

For Vertex, before our PR is in, you can try add that code to KubeflowV2DagRunner.run, and retry?

feelingsonice commented 2 years ago

Ok can confirm it works. Do you know when the change will be released?

1025KB commented 2 years ago

next release about a month

feelingsonice commented 2 years ago

@1025KB So im now running into:

RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call.

It might just be my own code here, but I can't find what could possibly non-deterministic here and this only appears when I add the --runner=DataflowRunner flag. For reference, my beam_pipeline_args looks like:

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--runner=DataflowRunner',
   '--region=us-central1',
  '--experiments=upload_graph', # upload_graph must be enabled 
   '--dataflow_service_options=enable_prime',
   '--autoscaling_algorithm=THROUGHPUT_BASED',
   ]

And my preprocessing_fn (redacted for conciseness):

_FEATURES = [# list of str
]
_SPECIAL_IMPUTE = {
    'special_foo': 1,
}
HOURS = [1, 2, 3, 4]
TABLE_KEYS = {
    'XXX': ['XXX_1', 'XXX_2', 'XXX_3'],
    'YYY': ['YYY_1', 'YYY_2', 'YYY_3'],
}

@tf.function
def _divide(a, b):
  return tf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32))

def preprocessing_fn(inputs):
  x = {}

  for name, tensor in sorted(inputs.items()):
    if tensor.dtype == tf.bool:
      tensor = tf.cast(tensor, tf.int64)

    if isinstance(tensor, tf.sparse.SparseTensor):
      default_value = '' if tensor.dtype == tf.string else 0
      tensor = tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value)

    x[name] = tensor

  x['foo'] = _divide((x['foo1'] - x['foo2']), x['foo_denom'])
  x['bar'] = tf.cast(x['bar'] > 0, tf.int64)

  for hour in HOURS:
      total = tf.constant(0, dtype=tf.int64)
      for device_type in DEVICE_TYPES.keys():
          total = total + x[f'some_device_{device_type}_{hour}h']

  # one hot encode categorical values
  for name, keys in TABLE_KEYS.items():
    with tf.init_scope():
      initializer = tf.lookup.KeyValueTensorInitializer(
          tf.constant(keys), 
          tf.constant([i for i in range(len(keys))]))
      table = tf.lookup.StaticHashTable(initializer, default_value=-1)

    indices = table.lookup(tf.squeeze(x[name], axis=1))
    one_hot = tf.one_hot(indices, len(keys), dtype=tf.int64)

    for i, _tensor in enumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)):
      x[f'{name}_{keys[i]}'] = _tensor

  return {name: tft.scale_to_0_1(x[name]) for name in _FEATURES}
1025KB commented 2 years ago

Could you open a separate issue under tft? Thanks!

feelingsonice commented 2 years ago

@1025KB is this fixed in the latest release?

gaikwadrahul8 commented 1 year ago

Hi, @bli00

Apologies for the delay and I found similar issue #1696, user has found some workaround here and It seems like version compatibility issue between Kubeflow Pipelines Backend and TFX so I would request you to please check Kubeflow Pipelines Backend and TFX Compatibility Matrix here and also check Upgrading Kubeflow Pipelines deployment on Google Cloud, for your reference I have found one good article, I hope it will you to resolve your issue

Could you please try to run your TFX pipeline as per Compatibility Matrix versions and check is it resolving your issue ?

If issue still persists, please let us know and if possible please help us with error log to do further investigation to find out root cause for your issue ?

Thank you!

gaikwadrahul8 commented 1 year ago

Hi, @bli00

Closing this issue due to lack of recent activity for couple of weeks. Please feel free to reopen the issue or post comments, if you need any further assistance or update. Thank you!

google-ml-butler[bot] commented 1 year ago

Are you satisfied with the resolution of your issue? Yes No