tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.11k stars 706 forks source link

TFX trainer component running in Kubeflow fails although it was successful in the Interactive Context #6525

Closed crbl1122 closed 9 months ago

crbl1122 commented 9 months ago

If the bug is related to a specific library below, please raise an issue in the respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

Describe the current behavior

In GCP I run a Kubeflow ML pipeline with TFX components using a custom service account. The pipeline reads data from BigQuery and it has the following components: components = [example_gen, statistics_gen, schema_gen, transform, trainer, pusher]

The main problem is that it fails at the last "trainer" step, although I tested each step in the interactive context and all were OK. The secondary problem is that I cannot display log messages for the trainer module execution code in the main GCP pipeline dashboard (in the logs area). This complicates my debugging attempts. I can only view the logs from Logs Explorer but I cannot display the messages for the python trainer module code, seem only to be the framework messages. In those messages I view only one type of error message. I identified that this operation uses the default service account (not the custom one) and it might not have all permissions needed. I tried to set the trainer component to use the custom SA but it does not use it. How can I set it properly for the custom SA?

com.google.cloud.ai.platform.common.errors.AiPlatformException: code=ALREADY_EXISTS, message=Schema with name projects/2385..../locations/..../metadataStores/default/metadataSchemas/f14caf7e-4234-473d-ac31-.... and version 0.0.1 already exists., cause=null

Please view details: https://stackoverflow.com/questions/77652732/tfx-trainer-component-running-in-kubeflow-fails-although-it-was-successful-in-th

Describe the expected behavior

Being able to succesfully run the TFX Trainer component with Kubeflow pipeline.

Standalone code to reproduce the issue

Providing a bare minimum test case or step(s) to reproduce the problem will greatly help us to debug the issue. If possible, please share a link to Colab/Jupyter/any notebook.

Name of your Organization (Optional)

Other info / logs

Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.

crbl1122 commented 9 months ago

https://stackoverflow.com/questions/77652732/tfx-trainer-component-running-in-kubeflow-fails-although-it-was-successful-in-th

singhniraj08 commented 9 months ago

@crbl1122,

Can you also share the complete logs from logs explorer for this pipeline or a minimal pipeline code to recreate this issue.

For the error which you mentioned in issues, the pipeline is saving the same artifacts in same location which is causing this error. Complete logs will help us understand which artifact is causing this issue and how we can resolve it. Thanks.

crbl1122 commented 9 months ago

code updated

@singhniraj08 Thank you for the quick answer. I couldn't identify where the schema artifact (?) is written twice. Here is the code:

%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'F1', 'F2', 'F3'
]
_LABEL_KEY = 'F_ASIG_FLAG'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()

def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  # outputs = keras.layers.Dense(3)(d)
    outputs = keras.layers.Dense(1, activation='sigmoid')(d) # binary classification

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      # loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      loss=tf.keras.losses.BinaryCrossentropy,
      # metrics=[keras.metrics.SparseCategoricalAccuracy()]
      metrics=[keras.metrics.BinaryAccuracy()])

  model.summary(print_fn=logging.info)
  return model

# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
######
# upload trainer module to GCS
!gsutil cp {_trainer_module_file_v2} {MODULE_ROOT}/

######
from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, # data_root: str,
                     query: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, 
                     use_gpu: bool,
                     beam_pipeline_args: Optional[List[str]]) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  # example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(query=query)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  # pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
  #     model=trainer.outputs['model'],
  #     custom_config={
  #         tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
  #             True,
  #         tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
  #             region,
  #         tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
  #             serving_image,
  #         tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
  #           vertex_serving_spec,
  #     })

  components = [
      example_gen,
      trainer,
      # pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      #NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)

#### setting constants ###
import uuid
import time

# Get the current time in UTC as a str
current_time_utc = str(int(time.time()))

PIPELINE_NAME = 'test-ppline' + '-' + str(uuid.uuid4()) + '-' + current_time_utc
print('PIPELINE_NAME', PIPELINE_NAME)

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('MODULE_ROOT',MODULE_ROOT)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# S-ar putea sa fie cauza de eroare in prima versiune
# # This is the path where your model will be pushed for serving.
# SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
#     GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

#######
# docs_infra: no_execute
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--region=europe-west3'
   ]

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)

_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        # data_root=DATA_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS, # for BigQuery
        # We will use CPUs only for now.
        use_gpu=False))

######
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME,
                                enable_caching=False)
job.submit(service_account=service_account)
singhniraj08 commented 9 months ago

@crbl1122,

I tried running the pipeline code but there where lot of errors in the pipeline code which needs to be fixed, so I was not able to replicate the issue using the example code. I tried running the TFX BigQuery tutorial and it ran without any issues. Can you try running the example notebook and see if that works for you and share the pipeline code which can replicate the error. Thank you!

crbl1122 commented 9 months ago

@singhniraj08 My biggest problem is that I cannot view in the Vertex interface or in LogsExplorer, the logs from the component execution, therefore it is almost impossible to debug them blindly. In other tests where I used only Kubeflow components instead of TFX, I was able to view them. Do you have any idea why it happens and what can I do to visualize the components logs? I ensured that the service account used to run this pipeline has the log writer/viewer privileges with no effect.

Regarding this TFX pipeline, I follow the following two tutorials:

  1. Read data from BigQuery [https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_bq](Read from BigQuery)
  2. create and run a TFX pipeline which trains an ML model using Vertex AI Training service and publishes it to Vertex AI for serving [https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_vertex_training]( create and run a TFX pipeline which trains an ML model using Vertex AI Training service and publishes it to Vertex AI for serving)

I can run successfully, both of them independently. But when I combine them as per the above code, the second component (Trainer) fails. These are the differences between the code from tutorial #2 and my code:

def _make_keras_model() -> tf.keras.Model: """Creates a DNN Keras model for classifying penguin data.

Returns: A Keras Model. """

# The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  # outputs = keras.layers.Dense(3)(d)
    outputs = keras.layers.Dense(1, activation='sigmoid')(d) # binary classification #<--New

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      # loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
      loss=tf.keras.losses.BinaryCrossentropy, #<--new
      # metrics=[keras.metrics.SparseCategoricalAccuracy()]
      metrics=[keras.metrics.BinaryAccuracy()]) #<-- new

I will update the code in the issue to match the modified code

crbl1122 commented 9 months ago

@singhniraj08 Hi, Even without logging, I managed to identify the coding problems in the trainer module and it works now. It is important to know how to resolve the problem of missing visualization of the components logs. If not possible to answer here, I will close this issue and open another one.

singhniraj08 commented 9 months ago

@crbl1122, Normally the component logs should appear in logs explorer, but I will try to verify it with the team and update this issue. Since this issue has been resolved you, requesting you to close this issue. Thank you!

github-actions[bot] commented 9 months ago

Are you satisfied with the resolution of your issue? Yes No