tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.12k stars 707 forks source link

Is "tfx.extensions.google_cloud_ai_platform.Trainer" component compatible with Vertex Pipelines? #5511

Closed hugoferrero closed 1 year ago

hugoferrero commented 1 year ago

Hi. I'm training a model using Vertex pipelines. I'm using TFX. I have the following components: ExampleGen, StatisticsGen, SchemaGen, Trainer and Pusher. ExampleGen, StatisticsGen and SchemaGen works fine, but Trainer Fails. The trainer component i'm using is this "tfx.extensions.google_cloud_ai_platform.Trainer". However if i work with "tfx.components.Trainer" instead, the entire pipeline works fine. All jobs can finish ok. Useful info below. Thanks in advance.

Pipeline components: Captura de pantalla de 2022-11-18 11-35-58

This is the error in logs: Captura de pantalla de 2022-11-18 11-37-10 This is the Dockerfile:

FROM tensorflow/tfx:1.7.1
WORKDIR /pipeline

COPY . .

ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

ENTRYPOINT ["python", "tfx/scripts/run_executor.py"]

This is the pipeline code:

from typing import Dict, List, Optional

from tfx import v1 as tfx
from tfx.proto import example_gen_pb2

def create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    query_train: str,
    query_test: str,
    run_fn: str,
    serving_model_dir: str,
    ai_platform_training_args: Optional[Dict[str, str]] = None,
    beam_pipeline_args: Optional[List[str]] = None
) -> tfx.dsl.Pipeline:

  # ExampleGen Component

  input_config = example_gen_pb2.Input(splits=[
                 example_gen_pb2.Input.Split(name='train', pattern=query_train),
                 example_gen_pb2.Input.Split(name='test', pattern=query_test)
                               ])

  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      input_config=input_config
      )

  # StatisticsGen Component

  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

  # SchemaGem Component

  schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'])

  # Trainer Component

  # GCP_AI_PLATFORM_TRAINING_ARGS = {
  #  'project': GOOGLE_CLOUD_PROJECT,
  #  'region': GOOGLE_CLOUD_REGION,
  #  'scaleTier': 'CUSTOM',
  #  'masterType': 'n1-standard-4',
  #  'masterConfig': {
  #    'imageUri': PIPELINE_IMAGE
  #                 }
  #                            } 

  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
       run_fn = run_fn,
       examples = example_gen.outputs['examples'],
       schema = schema_gen.outputs['schema'],
       train_args = tfx.proto.TrainArgs(num_steps=100),
       eval_args = tfx.proto.EvalArgs(num_steps=5),
       custom_config = {
        tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
            ai_platform_training_args
                        }
                        )

  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
      filesystem=tfx.proto.PushDestination.Filesystem(
      base_directory=serving_model_dir)))

  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      trainer,
      pusher
               ] 

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      beam_pipeline_args=beam_pipeline_args)
singhniraj08 commented 1 year ago

@hugoferrero,

tfx.extensions.google_cloud_ai_platform.Trainer expects ai_platform_training_args dict params and if this dict is None it fails. This dict holds the training job parameters to be passed to Google Cloud AI Platform. For more info on ai_platform_training_args dict, please refer here.

Could you let me know if you are passing any training job parameter? if not you can use tfx.components.Trainer as shown in example code below.

Example Code:


  trainer_args = {
      'run_fn': run_fn,
      'examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args,
      'eval_args': eval_args,
  }
  if ai_platform_training_args is not None:
    trainer_args['custom_config'] = {
        tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
            ai_platform_training_args,
    }
    trainer = tfx.extensions.google_cloud_ai_platform.Trainer(**trainer_args)
  else:
    trainer = tfx.components.Trainer(**trainer_args)

  components.append(trainer)
hugoferrero commented 1 year ago

Hi @singhniraj08. Thanks for answering. Yes, I'm passing training job parameters. Here it is:

  #Dict:
  ai_platform_training_args = {
  'project': GOOGLE_CLOUD_PROJECT,
  'region': GOOGLE_CLOUD_REGION,
  'scaleTier': 'CUSTOM',
  'masterType': 'n1-standard-4',
  'masterConfig': {
     'imageUri': PIPELINE_IMAGE
                   }

# Trainer component
trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
       run_fn = run_fn,
       examples = example_gen.outputs['examples'],
       schema = schema_gen.outputs['schema'],
       train_args = tfx.proto.TrainArgs(num_steps=100),
       eval_args = tfx.proto.EvalArgs(num_steps=5),
       custom_config = {
        tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
            ai_platform_training_args
                        }
                        )
singhniraj08 commented 1 year ago

@hugoferrero,

I was able to replicate the issue with the above training job parameters and looks like some parameters are not valid. I tried the below code and it worked for me. Can you please try the below code? Thank you!

Setup used:

TensorFlow version: 2.10.1 TFX version: 1.11.0 KFP version: 1.8.16

Trainer Code:


ai_platform_training_args = {
'project': project_id,
'worker_pool_specs': [{
'machine_spec': {
'machine_type': 'n1-standard-4',
},
'replica_count': 1,
'container_spec': {
'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
},
}],
}
if use_gpu:
# See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
# for available machine types.
ai_platform_training_args['worker_pool_specs'][0]['machine_spec'].update({
'accelerator_type': 'NVIDIA_TESLA_K80',
'accelerator_count': 1
})

trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5),
custom_config={
tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
True,
tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
region,
tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
ai_platform_training_args,
'use_gpu':
use_gpu,
})
Screen Shot 2022-11-24 at 12 01 06 PM
hugoferrero commented 1 year ago

@singhniraj08 thanks for the advice. I did the same, but didn't work...and the logs are not helping. May be the problem is elsewhere, besides "Trainer" parameters. Below are the files of my pipeline: configs.py, pipeline.py, model.py, kubeflow_v2_runner.py and some extra info as well (logs, dockerfile, requirements.txt). Thanks in advance.

Pipeline image: pipeline

Logs: l1 l2

configs.py:

"""TFX  emplate configurations.
This file defines environments for a TFX pipeline.
"""

import os  # pylint: disable=unused-import

PIPELINE_NAME = 'tfx-mdhc-v10'

# GCP related configs.

# Following code will retrieve your GCP project. You can choose which project
# to use by setting GOOGLE_CLOUD_PROJECT environment variable.
try:
  import google.auth  # pylint: disable=g-import-not-at-top  # pytype: disable=import-error
  try:
    _, GOOGLE_CLOUD_PROJECT = google.auth.default(quota_project_id = 'teco-prod-adam-dev-826c')
  except google.auth.exceptions.DefaultCredentialsError:
    GOOGLE_CLOUD_PROJECT = 'my-project'
except ImportError:
  GOOGLE_CLOUD_PROJECT = 'my-project'

# Specify your GCS bucket name here. You have to use GCS to store output files
# when running a pipeline with Kubeflow Pipeline on GCP or when running a job
# using Dataflow. Default is '<gcp_project_name>-kubeflowpipelines-default'.
# This bucket is created automatically when you deploy KFP from marketplace.

GCS_BUCKET_NAME = 'my-bucket'
GCS_OUTPUTS = 'my-bucket/outputs'
GOOGLE_CLOUD_REGION = 'us-east4'
# BQ Constants
BQ_DATASET_NAME="data_lake_analytics_dev"
BQ_ML_TABLE_NAME="prepago_nosis_scoring_pp_train_total_muestra_gcp"
BQ_URI_SIN_PREFIX = f"{GOOGLE_CLOUD_PROJECT}.{BQ_DATASET_NAME}.{BQ_ML_TABLE_NAME}"
# Following image will be used to run pipeline components run if Kubeflow
# Pipelines used.
# This image will be automatically built by CLI if we use --build-image flag.
PIPELINE_IMAGE = f'gcr.io/{GOOGLE_CLOUD_PROJECT}/{PIPELINE_NAME}'
RUN_FN = 'models.keras_model.model.run_fn'
QUERY_TRAIN = f"""SELECT score, 
               nsepercentil, 
               edad, 
               antiguedadenbancos, 
               mlctc_uva,
               antiguedadlaboral, 
               target FROM `{BQ_URI_SIN_PREFIX}`
               where periodo != 202009"""

QUERY_TEST = f"""SELECT score, 
               nsepercentil, 
               edad, 
               antiguedadenbancos, 
               mlctc_uva,
               antiguedadlaboral, 
               target FROM `{BQ_URI_SIN_PREFIX}`
               where periodo = 202009"""

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
    '--project=' + GOOGLE_CLOUD_PROJECT,
    '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp')
    ]

GCP_AI_PLATFORM_TRAINING_ARGS = {
     'project': GOOGLE_CLOUD_PROJECT,
     'worker_pool_specs': [{
     'machine_spec': {
     'machine_type': 'n1-standard-4',
                      },
     'replica_count': 1,
     'container_spec': {
     'image_uri': PIPELINE_IMAGE
                        }
                          }]
                               } 

pipeline.py:

from typing import Dict, List, Optional

from tfx import v1 as tfx
from tfx.proto import example_gen_pb2

def create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    query_train: str,
    query_test: str,
    run_fn: str,
    region: str,
    serving_model_dir: str,
    ai_platform_training_args: Optional[Dict[str, str]] = None,
    beam_pipeline_args: Optional[List[str]] = None
) -> tfx.dsl.Pipeline:

  # ExampleGen Component

  input_config = example_gen_pb2.Input(splits=[
                 example_gen_pb2.Input.Split(name='train', pattern=query_train),
                 example_gen_pb2.Input.Split(name='eval', pattern=query_test)
                               ])

  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      input_config=input_config
      )

  # StatisticsGen Component

  statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

  # SchemaGem Component

  schema_gen = tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'])

  # Trainer Component

  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
       run_fn = run_fn,
       examples = example_gen.outputs['examples'],
       schema = schema_gen.outputs['schema'],
       train_args = tfx.proto.TrainArgs(num_steps=100),
       eval_args = tfx.proto.EvalArgs(num_steps=5),
       custom_config = {
        tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY: True,
        tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY: region,
        tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
            ai_platform_training_args
                        }
                        )

  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
      filesystem=tfx.proto.PushDestination.Filesystem(
      base_directory=serving_model_dir)))

  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      trainer,
      pusher
               ] 

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      beam_pipeline_args=beam_pipeline_args)

model.py:

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import feature_column
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

LABEL_KEY = "target"
TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 32

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training. Me genera (X,y).

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=LABEL_KEY),
      schema=schema).repeat() 

def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model.

  Returns:
    A Keras Model.
  """

  ### encoding de features ###
  feature_columns = []

  features_names = ["score", 
                   "nsepercentil", 
                   "edad", 
                   "antiguedadenbancos", 
                   "mlctc_uva",
                   "antiguedadlaboral", 
                   "target"]

  for header in features_names:
    feature_columns.append(feature_column.numeric_column(header))

  feature_layer = tf.keras.layers.DenseFeatures(feature_columns)  

  model = tf.keras.Sequential()
  model.add(feature_layer)
  model.add(tf.keras.layers.Dense(128, activation='relu'))
  model.add(tf.keras.layers.BatchNormalization())
  model.add(tf.keras.layers.Dense(64, activation='relu'))
  model.add(tf.keras.layers.BatchNormalization())
  model.add(tf.keras.layers.Dense(32, activation='relu'))
  model.add(tf.keras.layers.BatchNormalization())
  model.add(tf.keras.layers.Dropout(0.3))
  model.add(tf.keras.layers.Dense(16, activation='relu'))
  model.add(tf.keras.layers.BatchNormalization())
  model.add(tf.keras.layers.Dense(8, activation='relu'))
  model.add(tf.keras.layers.BatchNormalization())
  model.add(tf.keras.layers.Dense(4, activation='relu'))
  model.add(tf.keras.layers.BatchNormalization())
  model.add(tf.keras.layers.Dense(1, activation='sigmoid'))

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1, momentum=0.3)

  model.compile(
              optimizer= optimizer,
              loss=tf.keras.losses.BinaryCrossentropy(),
              metrics=[tf.keras.metrics.AUC(name="auc")]
              )

  model.summary(print_fn=logging.info)
  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      epochs=2,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

kubeflow_v2_runner.py:

import os
from absl import logging
from pipeline import configs
from pipeline import pipeline
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

_OUTPUT_DIR = os.path.join('gs://', configs.GCS_OUTPUTS)

_PIPELINE_ROOT = os.path.join(_OUTPUT_DIR, 'tfx_pipeline_output',
                              configs.PIPELINE_NAME)
_SERVING_MODEL_DIR = os.path.join(_PIPELINE_ROOT, 'serving_model')

def run():
  """Define a pipeline to be executed using Kubeflow V2 runner."""

  runner_config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
      default_image=configs.PIPELINE_IMAGE)

  dsl_pipeline = pipeline.create_pipeline(
      pipeline_name=configs.PIPELINE_NAME,
      pipeline_root=_PIPELINE_ROOT,
      query_train=configs.QUERY_TRAIN,
      query_test=configs.QUERY_TEST,
      run_fn=configs.RUN_FN,
      region=configs.GOOGLE_CLOUD_REGION,
      serving_model_dir=_SERVING_MODEL_DIR,
      ai_platform_training_args=configs.GCP_AI_PLATFORM_TRAINING_ARGS,
      beam_pipeline_args=configs.BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
                                         )

  runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=runner_config,
    output_dir=_PIPELINE_ROOT,
    output_filename=configs.PIPELINE_NAME + '_pipeline.json' 
    )

  runner.run(pipeline=dsl_pipeline)

if __name__ == '__main__':
  logging.set_verbosity(logging.INFO)
  run()

Dockerfile

FROM tensorflow/tfx:1.11.0
WORKDIR /pipeline

COPY requirements.txt .

RUN pip install -U -r requirements.txt

COPY . .

ENV PYTHONPATH="/pipeline:${PYTHONPATH}"

ENTRYPOINT ["python", "tfx/scripts/run_executor.py"]

requirements.txt

tensorflow==2.10.1
kfp==1.8.16
singhniraj08 commented 1 year ago

@hugoferrero,

Hi, I was unable to recreate the issue because I don't have access to Big Query dataset you are referring to. Could you please share the complete stack trace to give us an idea of the issue you are facing? You can go to cloud logging, expand the particular error and share the error.

I would suggest to create a base running pipeline from official documentation by clicking on Run in Google Could Vertex AI workbench and then integrate your changes to make the pipeline work. Thank you!

hugoferrero commented 1 year ago

@singhniraj08 I was able to run the entire pipeline. I ran the pipeline locally (LocalDagRunner) component by component. The problem was the line 87 (when defining "make_keras_model()" function) in the image below. "model.summary(print_fn=logging.info)" triggers the following error: "_This model has not yet been built. Build the model first by calling build() or calling fit() with some data, or specify an input_shape argument in the first layer(s) for automatic build."_ . So i removed that line of code and the pipeline ran totally fine. The main problem was that vertex pipeline logs were not helpful in order to detect the issue. I appreciate your time and support. Thank you. Captura de pantalla de 2022-11-30 21-30-00

Entire Pipeline Captura de pantalla de 2022-11-30 20-55-20

singhniraj08 commented 1 year ago

@hugoferrero,

Thank you for pointing out the root cause of the issue. We will work on this going forward. Requesting you to close this issue, if your issue has been resolved.

Thank you!

google-ml-butler[bot] commented 1 year ago

Are you satisfied with the resolution of your issue? Yes No