Closed DvirDukhan closed 3 years ago
pipeline.py
# Lint as: python2, python3
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TFX taxi template pipeline definition.
This file defines TFX pipeline and various components in the pipeline.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from typing import Optional, Text, List, Dict, Any
import tensorflow_model_analysis as tfma
from ml_metadata.proto import metadata_store_pb2
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.extensions.google_cloud_ai_platform.pusher import executor as ai_platform_pusher_executor
from tfx.extensions.google_cloud_ai_platform.trainer import executor as ai_platform_trainer_executor
from tfx.extensions.google_cloud_big_query.example_gen import component as big_query_example_gen_component # pylint: disable=unused-import
from tfx.orchestration import pipeline
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
from redis_component.component import RedisExampleGen
from redis_proto import redis_config_pb2
from redis_proto import redis_hash_query_pb2
def create_pipeline(
pipeline_name: Text,
pipeline_root: Text,
data_path: Text,
# TODO(step 7): (Optional) Uncomment here to use BigQuery as a data source.
# query: Text,
preprocessing_fn: Text,
run_fn: Text,
train_args: trainer_pb2.TrainArgs,
eval_args: trainer_pb2.EvalArgs,
eval_accuracy_threshold: float,
serving_model_dir: Text,
metadata_connection_config: Optional[
metadata_store_pb2.ConnectionConfig] = None,
beam_pipeline_args: Optional[List[Text]] = None,
ai_platform_training_args: Optional[Dict[Text, Text]] = None,
ai_platform_serving_args: Optional[Dict[Text, Any]] = None,
) -> pipeline.Pipeline:
"""Implements the chicago taxi pipeline with TFX."""
components = []
#TODO: Hard coded connection config and schema, move to config or change component later.
_redis_config = redis_config_pb2.RedisConnConfig(host="redis_host", port="redis_port", password="secret")
_pattern = 'record_*'
_schema = [
{
'name': 'pickup_community_area',
'type': 'integer'
},
{
'name': 'fare',
'type': 'float'
},
{
'name': 'trip_start_month',
'type': 'integer'
},
{
'name': 'trip_start_hour',
'type': 'integer'
},
{
'name': 'trip_start_day',
'type': 'integer'
},
{
'name': 'trip_start_timestamp',
'type': 'integer'
},
{
'name': 'pickup_latitude',
'type': 'float'
},
{
'name': 'pickup_longitude',
'type': 'float'
},
{
'name': 'dropoff_latitude',
'type': 'float'
},
{
'name': 'dropoff_longitude',
'type': 'float'
},
{
'name': 'trip_miles',
'type': 'float'
},
{
'name': 'pickup_census_tract',
'type': 'integer'
},
{
'name': 'dropoff_census_tract',
'type': 'float'
},
{
'name': 'payment_type',
'type': 'string'
},
{
'name': 'company',
'type': 'string'
},
{
'name': 'trip_seconds',
'type': 'float'
},
{
'name': 'dropoff_community_area',
'type': 'float'
},
{
'name': 'tips',
'type': 'float'
},
{
'name': 'big_tipper',
'type': 'integer'
}
]
_query = redis_hash_query_pb2.RedisHashQuery(hash_key_pattern=_pattern, schema=_schema)
example_gen = RedisExampleGen(conn_config=_redis_config, query=_query)
# Brings data into the pipeline or otherwise joins/converts training data.
# example_gen = CsvExampleGen(input=external_input(data_path))
components.append(example_gen)
# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
# TODO(step 5): Uncomment here to add StatisticsGen to the pipeline.
components.append(statistics_gen)
# Generates schema based on statistics files.
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True)
# TODO(step 5): Uncomment here to add SchemaGen to the pipeline.
components.append(schema_gen)
# Performs anomaly detection based on statistics and data schema.
example_validator = ExampleValidator( # pylint: disable=unused-variable
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
# TODO(step 5): Uncomment here to add ExampleValidator to the pipeline.
components.append(example_validator)
# Performs transformations and feature engineering in training and serving.
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
preprocessing_fn=preprocessing_fn)
# TODO(step 6): Uncomment here to add Transform to the pipeline.
components.append(transform)
# Uses user-provided Python function that implements a model using TF-Learn.
trainer_args = {
'run_fn': run_fn,
'transformed_examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args,
'eval_args': eval_args,
'custom_executor_spec':
executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
}
if ai_platform_training_args is not None:
trainer_args.update({
'custom_executor_spec':
executor_spec.ExecutorClassSpec(
ai_platform_trainer_executor.GenericExecutor
),
'custom_config': {
ai_platform_trainer_executor.TRAINING_ARGS_KEY:
ai_platform_training_args,
}
})
trainer = Trainer(**trainer_args)
# TODO(step 6): Uncomment here to add Trainer to the pipeline.
components.append(trainer)
# Get the latest blessed model for model validation.
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
# TODO(step 6): Uncomment here to add ResolverNode to the pipeline.
components.append(model_resolver)
# Uses TFMA to compute a evaluation statistics over features of a model and
# perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
model_specs=[tfma.ModelSpec(label_key='big_tipper')],
slicing_specs=[tfma.SlicingSpec()],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='BinaryAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': eval_accuracy_threshold}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10})))
])
])
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
# TODO(step 6): Uncomment here to add Evaluator to the pipeline.
components.append(evaluator)
# Checks whether the model passed the validation steps and pushes the model
# to a file destination if check passed.
pusher_args = {
'model':
trainer.outputs['model'],
'model_blessing':
evaluator.outputs['blessing'],
'push_destination':
pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir)),
}
if ai_platform_serving_args is not None:
pusher_args.update({
'custom_executor_spec':
executor_spec.ExecutorClassSpec(ai_platform_pusher_executor.Executor
),
'custom_config': {
ai_platform_pusher_executor.SERVING_ARGS_KEY:
ai_platform_serving_args
},
})
pusher = Pusher(**pusher_args) # pylint: disable=unused-variable
# TODO(step 6): Uncomment here to add Pusher to the pipeline.
components.append(pusher)
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
# Change this value to control caching of execution results. Default value
# is `False`.
# enable_cache=True,
metadata_connection_config=metadata_connection_config,
beam_pipeline_args=beam_pipeline_args,
)
kubeflow_dag_runner.py
# Lint as: python2, python3
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Define KubeflowDagRunner to run the pipeline using Kubeflow."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from absl import logging
from pipeline import configs
from pipeline import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import trainer_pb2
from tfx.utils import telemetry_utils
# TFX pipeline produces many output files and metadata. All output data will be
# stored under this OUTPUT_DIR.
OUTPUT_DIR = os.path.join('gs://', configs.GCS_BUCKET_NAME)
# TFX produces two types of outputs, files and metadata.
# - Files will be created under PIPELINE_ROOT directory.
PIPELINE_ROOT = os.path.join(OUTPUT_DIR, 'tfx_pipeline_output',
configs.PIPELINE_NAME)
# The last component of the pipeline, "Pusher" will produce serving model under
# SERVING_MODEL_DIR.
SERVING_MODEL_DIR = os.path.join(PIPELINE_ROOT, 'serving_model')
# Specifies data file directory. DATA_PATH should be a directory containing CSV
# files for CsvExampleGen in this example. By default, data files are in the
# GCS path: `gs://{GCS_BUCKET_NAME}/tfx-template/data/`. Using a GCS path is
# recommended for KFP.
#
# One can optionally choose to use a data source located inside of the container
# built by the template, by specifying
# DATA_PATH = 'data'. Note that Dataflow does not support use container as a
# dependency currently, so this means CsvExampleGen cannot be used with Dataflow
# (step 8 in the template notebook).
DATA_PATH = 'gs://{}/tfx-template/data/'.format(configs.GCS_BUCKET_NAME)
def run():
"""Define a kubeflow pipeline."""
# Metadata config. The defaults works work with the installation of
# KF Pipelines using Kubeflow. If installing KF Pipelines using the
# lightweight deployment option, you may need to override the defaults.
# If you use Kubeflow, metadata will be written to MySQL database inside
# Kubeflow cluster.
metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
# TODO(b/157598477) Find a better way to pass parameters from CLI handler to
# pipeline DSL file, instead of using environment vars.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config, tfx_image=tfx_image)
pod_labels = kubeflow_dag_runner.get_default_pod_labels()
pod_labels.update({telemetry_utils.LABEL_KFP_SDK_ENV: 'tfx-template'})
kubeflow_dag_runner.KubeflowDagRunner(
config=runner_config, pod_labels_to_attach=pod_labels
).run(
pipeline.create_pipeline(
pipeline_name=configs.PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_path=DATA_PATH,
# TODO(step 7): (Optional) Uncomment below to use BigQueryExampleGen.
# query=configs.BIG_QUERY_QUERY,
preprocessing_fn=configs.PREPROCESSING_FN,
run_fn=configs.RUN_FN,
train_args=trainer_pb2.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
eval_args=trainer_pb2.EvalArgs(num_steps=configs.EVAL_NUM_STEPS),
eval_accuracy_threshold=configs.EVAL_ACCURACY_THRESHOLD,
serving_model_dir=SERVING_MODEL_DIR,
# beam_pipeline_args=configs
# .REDIS_BEAM_PIPELINE_ARGS,
))
if __name__ == '__main__':
logging.set_verbosity(logging.INFO)
run()
The python code in the image cannot find the redis_component
module.
To debug, try the following:
docker run -it --entrypoint python3 <your image> -c 'import redis_component'
@Ark-kun Thanks for the reply That is exactly the issue.
➜ ~ docker run -it --entrypoint python3 gcr.io/my_bucket/tfx-pipeline:latest -c 'import redis_component'
Traceback (most recent call last):
File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'redis_component'
Is this have to do with the TFX image used during the pipeline build?
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
# TODO(b/157598477) Find a better way to pass parameters from CLI handler to
# pipeline DSL file, instead of using environment vars.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config, tfx_image=tfx_image)
Should I provide my own custom image based upon gcr.io/cloud-ml-pipelines-test/tfx-kubeflow
with my custom component installed?
Should I provide my own custom image based upon gcr.io/cloud-ml-pipelines-test/tfx-kubeflow with my custom component installed?
You component code needs to be in the image you use in your component. For python-class-based components, it probably needs to be in the image that's set as default. For the components created using create_container_component
, the image can be specified during the component creation.
Also the container code location in the container image matters since it affects python's ability to find the module.
@DvirDukhan We're using plenty of custom TFX components and have solved this as @Ark-kun suggested by moving it into a docker image based on TFX:
FROM tensorflow/tfx:0.24.0
COPY <your-component-code> /<path-in-docker-image>
ENV PYTHONPATH="/<path-in-docker-image>:${PYTHONPATH}"
Then you just set this image as your base image for the pipeline. This also has the advantage that you can put all your pipeline code in the image you're using which means easier code versioning.
@ConverJens @Ark-kun thanks for your replies, I appreciate it. I have partial success with this approach, which is a good progress
When using the custom image, should I give it as an argument for the tfx cli as the value of --build_base_image
flag, OR, should I specify it as the tfx_image
value for KubeflowDagRunnerConfig
? Is there a priority or contradictions between those two arguments/configurations?
I managed to have a successful execution only when modifying the auto-generated docker file, which created when executing tfx pipeline create
. I manually edited it to install my custom component, and then called tfx pipeline update
, but in my opinion this is not the right approach.
@DvirDukhan Happy to help!
I'm not using the tfx cli so I can't answer how that should work. Instead, I simply build the image with Docker and compile the pipeline using TFX DSL kubeflow_dag_runner where I specify my own custom image, like this:
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig( kubeflow_metadata_config=metadata_config, tfx_image=custom_tfx_image, pipeline_operator_funcs=( [ onprem.mount_pvc(persistent_volume_claim, persistent_volume, persistent_volume_mount), ] ))
This produces a tar.gz. that can uploaded through the UI or using the kfp python client.
Overall, the other questions you have seems to be about TFX specifically and should probably be posted on TFX issue page.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Glad we were able to help.
Hi I'm trying to execute a modification of the TFX
Chicago taxi
pipeline example, with GKE Kubeflow pipelines (https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines). I have written a custom component (https://github.com/DvirDukhan/redis_example_gen) in order to replace theCsvExampleGen
in the original example, with an example generator that extracts data from Redis. My pipeline library tree is as extracted from the deployed docker image (I apologize for the verbosity but this is just to show that the component files are there):I manage to deploy the pipeline to kubeflow, however, it fails during experiment execution when starting the
redisexamplegen
component with the following log:which indicates that my component is not installed on the container/runtime.
On the TFX documentation for costume components, it is mentioned that in order to deploying a costume component
Beside code changes, all the newly added parts (ComponentSpec, Executor, component interface) need to be accessible in pipeline running environment in order to run the pipeline properly.
I tried to supply additional
beam
configuration arguments, for the location of the componentsetup.py
file:but with no success (for various values for specifying the setup file location) and as far as I understand, this is not the case here.
My question is: What else I need to do in order to install the custom component on the pipeline running environment? I would be happy to supply any additional information required.
Lastly, I am sorry for duplicating this issue from the TFX repository (https://github.com/tensorflow/tfx/issues/2549), but unfortunately, I have no solid answers yet, so I hope to get some directions here. Your answers are highly appreciated. Thanks!