tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.09k stars 695 forks source link

Example for Pusher component that uses TF Serving as destinations #4186

Closed dineshvelmuruga closed 1 year ago

dineshvelmuruga commented 2 years ago

URL(s) with the issue:

https://www.tensorflow.org/tfx/guide/pusher

Description of issue (what needs changing):

No Example for Pusher component that uses TF Serving as destinations

Clear description:

No Example for Pusher component that uses TF Serving as destinations

ConverJens commented 2 years ago

@dineshvelmuruga TF serving isn't a destination, it's an application. You decide where TF serving is to look for model, be it S3, GS, local disc or a PCV in Kubernetes. Hence, just configure Pusher to push wherever your TF serving instance is looking.

dineshvelmuruga commented 2 years ago

@dineshvelmuruga TF serving isn't a destination, it's an application. You decide where TF serving is to look for model, be it S3, GS, local disc or a PCV in Kubernetes. Hence, just configure Pusher to push wherever your TF serving instance is looking.

@ConverJens does TFX supports Azure Blob Storage/ ADLS. Because this info is critical to make a decision to use TFX for one of our clients

ConverJens commented 2 years ago

@dineshvelmuruga I can't say for sure, I haven't tested but it seems like it: https://www.tensorflow.org/io/tutorials/azure. TFX itself doesn't have explicit storage support but rather relies on Tensorflows fs support. At least that's how I have understood it.

Anyway, I'm using S3 through Minio and that's only a matter of setting the correct environment variables in your pipeline which is quite easily done.

dineshvelmuruga commented 2 years ago

@ConverJens Can you give some pointers of using S3 through minio in the pipeline

ConverJens commented 2 years ago

@dineshvelmuruga There are just two things that needs to be done:

  1. Set the required env vars
  2. point to an s3 location

For 1. you need to set: S3_ENDPOINT: "my.minio.server:9000/" #just localhost:9000 of you are running locally AWS_ACCESS_KEY_ID: "my-minio-access-key" AWS_SECRET_ACCESS_KEY: "my-minio-secret-key" // Or set these to 0 if you want to disable SSL S3_USE_HTTPS: "1" S3_VERIFY_SSL: "1"

And for 2. you just need to provide an S3 path: "s3://my-bucket/my-folder". Note that the bucket must exist with appropriate permissions a priori but any folders will be created.

Useful links: https://aws.amazon.com/blogs/compute/tensorflow-serving-on-kubernetes-spot-instances/ https://www.kubeflow.org/docs/external-add-ons/serving/tfserving_new/

dineshvelmuruga commented 2 years ago

@ConverJens Thank you. I will try this out

dineshvelmuruga commented 2 years ago

@ConverJens , I want to read the image from azure blob in TFX pipeline(ExampleGen). Do you have examples for same in S3 using minio?

ConverJens commented 2 years ago

@dineshvelmuruga What do you mean "read the image"? Do you mean read data with ExampleGen from S3?

If you set the env vars on your pipeline as I specified, then all non-beam TFX components can read from S3. For beam components you also need to pass the appropriate beam args:

beam_args = [f'--s3_endpoint_url=https:my.minio.server:9000/',
                    f'--s3_access_key_id=my-minio-access-key,
                    # f'--s3_secret_access_key=my-minio-secret-key',
                    f'--s3_verify=1',
                    '--s3_disable_ssl=/path/to/ca-certs']

Note that this will leak your minio access key in clear text to the pipeline.yaml. I have an open PR that fixes this by allowing one to read in beam args from env vars, which allows one to use k8s secrets. The PR is still under review but if you also need this functionality you can leave a thumbs up to indicate importance: https://github.com/tensorflow/tfx/pull/4147.

dineshvelmuruga commented 2 years ago

@ConverJens How to set env variable in a tfx pipeline

ConverJens commented 2 years ago

@dineshvelmuruga Depends on how you run it. If running in KFP, set the appropriate pipeline_operator_funcs in your KubeflowDagRunnerConfig when compiling your pipeline.

dineshvelmuruga commented 2 years ago

@ConverJens I am running it in my local.

  1. deployed minio and create a bucket tfx-kfp, uploaded data.csv to tfx-kfp/data/data.csv
  2. Using interactive notebook and set below params

Code: os.environ['S3_ENDPOINT'] = "127.0.0.1:9000" os.environ['AWS_ACCESS_KEY_ID'] = "AKIAIOSFODNN7EXAMPLED" os.environ['AWS_SECRET_ACCESS_KEY'] = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEYD' os.environ['AWS_ENDPOINT_URL'] = "http://127.0.0.1:9000/" os.environ['S3_USE_HTTPS'] = "1" os.environ['S3_VERIFY_SSL'] = "0"

beam_args = [f'--s3_endpoint_url=http://127.0.0.1:9000/', f'--s3_access_key_id=AKIAIOSFODNN7EXAMPLED', f'--s3_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEYD', f'--s3_verify=0', '--s3_disable_ssl=False']

data_path = "s3://tfx-kfp/data/" example_gen = tfx.components.CsvExampleGen(input_base=data_path).with_beam_pipeline_args(beam_args) context.run(example_gen)

Error: /mnt/d/miniconda_ubuntu/envs/tfx_workout/lib/python3.8/site-packages/tfx/components/example_gen/csv_example_gen/executor.py in _CsvToExample(pipeline, exec_properties, split_pattern) 128 csv_files = fileio.glob(csv_pattern) 129 if not csv_files: --> 130 raise RuntimeError( 131 'Split pattern {} does not match any files.'.format(csv_pattern)) 132

RuntimeError: Split pattern s3://tfx-kfp/data/* does not match any files.

ConverJens commented 2 years ago

@dineshvelmuruga Your setup looks ok. The only thing that you should probably do is to set your env var S3_USE_HTTPS=0 since you aren't using https and you should also specify the runner for beam. Default is DirectRunner I believe but in order to pass s3 info to the directrunner you also need to specify it explicitly:

f'--direct_running_mode=multi_processing',
f'--direct_num_workers=0'

The error indicates you have no data at the specified location, have you verified that there is? You can test your s3 connection using a package like boto3 or s3fs and see if you can list files from your notebook.

wyljpn commented 2 years ago

@dineshvelmuruga What do you mean "read the image"? Do you mean read data with ExampleGen from S3?

If you set the env vars on your pipeline as I specified, then all non-beam TFX components can read from S3. For beam components you also need to pass the appropriate beam args:

beam_args = [f'--s3_endpoint_url=https:my.minio.server:9000/',
                    f'--s3_access_key_id=my-minio-access-key,
                    # f'--s3_secret_access_key=my-minio-secret-key',
                    f'--s3_verify=1',
                    '--s3_disable_ssl=/path/to/ca-certs']

Note that this will leak your minio access key in clear text to the pipeline.yaml. I have an open PR that fixes this by allowing one to read in beam args from env vars, which allows one to use k8s secrets. The PR is still under review but if you also need this functionality you can leave a thumbs up to indicate importance: #4147.

Hi @ConverJens Recently, I have been working on TFX v0.28.0 with MinIO. I wanted to specify a MinIO bucket as the pipeline root. I read some documents and issues on TF, TFX, and MinIO github. For example, this issue and the Issue posted by you on MinIO github. https://github.com/minio/minio/issues/11481

Base on the description and my environment, I wrote the code below.

from tfx.orchestration import pipeline
import os

os.environ['S3_ENDPOINT'] = 'minio-service.kubeflow:9000'
os.environ['BUCKET_NAME'] = 'tfx'
os.environ['AWS_ACCESS_KEY_ID'] = "minio"
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio123'
os.environ['AWS_ENDPOINT_URL'] = "http://minio-service.kubeflow:9000/"
os.environ['S3_USE_HTTPS'] = "0"
os.environ['S3_VERIFY_SSL'] = "0"

beam_arg = [
  '--direct_num_workers=4',
  '--direct_running_mode=multi_processing',
  '--s3_endpoint_url=http://minio-service.kubeflow:9000/',
  '--s3_access_key_id=minio',
  '--s3_secret_access_key=minio123',
  '--s3_verify=False',
  '--s3_disable_ssl=True'
]

k_pipeline = pipeline.Pipeline(
  pipeline_name=pipeline_name,
  pipeline_root='s3://tfx',
  components=components,
  beam_pipeline_args=beam_arg,
)

It got an error saying “Couldn't connect to server” at CsvEampleGen that loads dataset from local. Logs showed as below.

INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with gRPC connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Adding KFP pod name kiba-minio-pipeline-cv42z-681655262 to execution
/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/cli_logger.py:61: FutureWarning: Not all Ray CLI dependencies were found. In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will only be usable via `pip install 'ray[default]'`. Please update your install command.
  "update your install command.", FutureWarning)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in <module>
    main()
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 353, in main
    execution_info = launcher.launch()
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch
    self._exec_properties)
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver
    component_info=self._component_info)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 311, in pre_execution
    component_info=component_info)
  File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 144, in _prepare_output_artifacts
    base_driver._prepare_output_paths(example_artifact)  # pylint: disable=protected-access
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 65, in _prepare_output_paths
    fileio.makedirs(artifact_dir)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 83, in makedirs
    _get_filesystem(path).makedirs(path)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 76, in makedirs
    tf.io.gfile.makedirs(path)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 483, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.path_to_bytes(path))
tensorflow.python.framework.errors_impl.AbortedError: All 10 retry attempts failed. The last failure: Unknown: : curlCode: 7, Couldn't connect to server

And if the CsvExampleGen loads dataset from s3 bucket, it will get the similar error as @dineshvelmuruga met.

Error:
/mnt/d/miniconda_ubuntu/envs/tfx_workout/lib/python3.8/site-packages/tfx/components/example_gen/csv_example_gen/executor.py in _CsvToExample(pipeline, exec_properties, split_pattern)
128 csv_files = fileio.glob(csv_pattern)
129 if not csv_files:
--> 130 raise RuntimeError(
131 'Split pattern {} does not match any files.'.format(csv_pattern))
132

RuntimeError: Split pattern s3://tfx-kfp/data/* does not match any files.

Could you kindly tell me what I should do to fix it?

ConverJens commented 2 years ago

@dineshvelmuruga @wyljpn I believe that you are both setting your env vars incorrectly. While the values are likely correct, they need to be set at container runtime, not at pipeline compile time as it seems as you are doing now. The correct way (if you are using KFP at least) is to use the pipeline_operator_func like this:

runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
    kubeflow_metadata_config=metadata_config,
    tfx_image=pipeline_base_image,
    pipeline_operator_funcs=(
            pipeline_operators
    ))
kubeflow_tfx_compiler = kubeflow_dag_runner.KubeflowDagRunner(config=runner_config)
kubeflow_tfx_compiler.run(pipeline)

where pipeline_operators is a listed of k8s operations where you set the env vars:

def env_params(name, value):
    def _env_params(task):
        return task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))

    return _env_params
pipeline_operators = [env_params('S3_ENDPOINT', <your-s3-endpoint)>,
                                      env_params('other-env-bar', 'other-value')]
ConverJens commented 2 years ago

@dineshvelmuruga What do you mean "read the image"? Do you mean read data with ExampleGen from S3? If you set the env vars on your pipeline as I specified, then all non-beam TFX components can read from S3. For beam components you also need to pass the appropriate beam args:

beam_args = [f'--s3_endpoint_url=https:my.minio.server:9000/',
                    f'--s3_access_key_id=my-minio-access-key,
                    # f'--s3_secret_access_key=my-minio-secret-key',
                    f'--s3_verify=1',
                    '--s3_disable_ssl=/path/to/ca-certs']

Note that this will leak your minio access key in clear text to the pipeline.yaml. I have an open PR that fixes this by allowing one to read in beam args from env vars, which allows one to use k8s secrets. The PR is still under review but if you also need this functionality you can leave a thumbs up to indicate importance: #4147.

@wyljpn See my other comment about how to set env vars.

Hi @ConverJens Recently, I have been working on TFX v0.28.0 with MinIO. I wanted to specify a MinIO bucket as the pipeline root. I read some documents and issues on TF, TFX, and MinIO github. For example, this issue and the Issue posted by you on MinIO github. minio/minio#11481

Base on the description and my environment, I wrote the code below.

from tfx.orchestration import pipeline
import os

os.environ['S3_ENDPOINT'] = 'minio-service.kubeflow:9000'
os.environ['BUCKET_NAME'] = 'tfx'
os.environ['AWS_ACCESS_KEY_ID'] = "minio"
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio123'
os.environ['AWS_ENDPOINT_URL'] = "http://minio-service.kubeflow:9000/"
os.environ['S3_USE_HTTPS'] = "0"
os.environ['S3_VERIFY_SSL'] = "0"

beam_arg = [
  '--direct_num_workers=4',
  '--direct_running_mode=multi_processing',
  '--s3_endpoint_url=http://minio-service.kubeflow:9000/',
  '--s3_access_key_id=minio',
  '--s3_secret_access_key=minio123',
  '--s3_verify=False',
  '--s3_disable_ssl=True'
]

k_pipeline = pipeline.Pipeline(
  pipeline_name=pipeline_name,
  pipeline_root='s3://tfx',
  components=components,
  beam_pipeline_args=beam_arg,
)

What do you mean "load from local"? From a local s3? Executed how?

It got an error saying “Couldn't connect to server” at CsvEampleGen that loads dataset from local. Logs showed as below.

INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with gRPC connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Adding KFP pod name kiba-minio-pipeline-cv42z-681655262 to execution
/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/cli_logger.py:61: FutureWarning: Not all Ray CLI dependencies were found. In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will only be usable via `pip install 'ray[default]'`. Please update your install command.
  "update your install command.", FutureWarning)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in <module>
    main()
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 353, in main
    execution_info = launcher.launch()
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch
    self._exec_properties)
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver
    component_info=self._component_info)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 311, in pre_execution
    component_info=component_info)
  File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 144, in _prepare_output_artifacts
    base_driver._prepare_output_paths(example_artifact)  # pylint: disable=protected-access
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 65, in _prepare_output_paths
    fileio.makedirs(artifact_dir)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 83, in makedirs
    _get_filesystem(path).makedirs(path)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 76, in makedirs
    tf.io.gfile.makedirs(path)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 483, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.path_to_bytes(path))
tensorflow.python.framework.errors_impl.AbortedError: All 10 retry attempts failed. The last failure: Unknown: : curlCode: 7, Couldn't connect to server

Again, where is this executed (KFP, GCS, local etc) and what is your s3 source and where does that reside?

And if the CsvExampleGen loads dataset from s3 bucket, it will get the similar error as @dineshvelmuruga met.

Error:
/mnt/d/miniconda_ubuntu/envs/tfx_workout/lib/python3.8/site-packages/tfx/components/example_gen/csv_example_gen/executor.py in _CsvToExample(pipeline, exec_properties, split_pattern)
128 csv_files = fileio.glob(csv_pattern)
129 if not csv_files:
--> 130 raise RuntimeError(
131 'Split pattern {} does not match any files.'.format(csv_pattern))
132

RuntimeError: Split pattern s3://tfx-kfp/data/* does not match any files.

Could you kindly tell me what I should do to fix it?

From the last log it looks like you may have gotten the s3 setup to work but have you verified that the bucket exists, has the write permissions and that data is actually available there?

Also, what does your csv example gen code look like?

wyljpn commented 2 years ago

@dineshvelmuruga What do you mean "read the image"? Do you mean read data with ExampleGen from S3? If you set the env vars on your pipeline as I specified, then all non-beam TFX components can read from S3. For beam components you also need to pass the appropriate beam args:

beam_args = [f'--s3_endpoint_url=https:my.minio.server:9000/',
                    f'--s3_access_key_id=my-minio-access-key,
                    # f'--s3_secret_access_key=my-minio-secret-key',
                    f'--s3_verify=1',
                    '--s3_disable_ssl=/path/to/ca-certs']

Note that this will leak your minio access key in clear text to the pipeline.yaml. I have an open PR that fixes this by allowing one to read in beam args from env vars, which allows one to use k8s secrets. The PR is still under review but if you also need this functionality you can leave a thumbs up to indicate importance: #4147.

@wyljpn See my other comment about how to set env vars.

Hi @ConverJens Recently, I have been working on TFX v0.28.0 with MinIO. I wanted to specify a MinIO bucket as the pipeline root. I read some documents and issues on TF, TFX, and MinIO github. For example, this issue and the Issue posted by you on MinIO github. minio/minio#11481 Base on the description and my environment, I wrote the code below.

from tfx.orchestration import pipeline
import os

os.environ['S3_ENDPOINT'] = 'minio-service.kubeflow:9000'
os.environ['BUCKET_NAME'] = 'tfx'
os.environ['AWS_ACCESS_KEY_ID'] = "minio"
os.environ['AWS_SECRET_ACCESS_KEY'] = 'minio123'
os.environ['AWS_ENDPOINT_URL'] = "http://minio-service.kubeflow:9000/"
os.environ['S3_USE_HTTPS'] = "0"
os.environ['S3_VERIFY_SSL'] = "0"

beam_arg = [
  '--direct_num_workers=4',
  '--direct_running_mode=multi_processing',
  '--s3_endpoint_url=http://minio-service.kubeflow:9000/',
  '--s3_access_key_id=minio',
  '--s3_secret_access_key=minio123',
  '--s3_verify=False',
  '--s3_disable_ssl=True'
]

k_pipeline = pipeline.Pipeline(
  pipeline_name=pipeline_name,
  pipeline_root='s3://tfx',
  components=components,
  beam_pipeline_args=beam_arg,
)

What do you mean "load from local"? From a local s3? Executed how?

It got an error saying “Couldn't connect to server” at CsvEampleGen that loads dataset from local. Logs showed as below.

INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with gRPC connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Adding KFP pod name kiba-minio-pipeline-cv42z-681655262 to execution
/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/cli_logger.py:61: FutureWarning: Not all Ray CLI dependencies were found. In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will only be usable via `pip install 'ray[default]'`. Please update your install command.
  "update your install command.", FutureWarning)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in <module>
    main()
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 353, in main
    execution_info = launcher.launch()
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch
    self._exec_properties)
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver
    component_info=self._component_info)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 311, in pre_execution
    component_info=component_info)
  File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 144, in _prepare_output_artifacts
    base_driver._prepare_output_paths(example_artifact)  # pylint: disable=protected-access
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 65, in _prepare_output_paths
    fileio.makedirs(artifact_dir)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/io/fileio.py", line 83, in makedirs
    _get_filesystem(path).makedirs(path)
  File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/io/plugins/tensorflow_gfile.py", line 76, in makedirs
    tf.io.gfile.makedirs(path)
  File "/opt/conda/lib/python3.7/site-packages/tensorflow/python/lib/io/file_io.py", line 483, in recursive_create_dir_v2
    _pywrap_file_io.RecursivelyCreateDir(compat.path_to_bytes(path))
tensorflow.python.framework.errors_impl.AbortedError: All 10 retry attempts failed. The last failure: Unknown: : curlCode: 7, Couldn't connect to server

Again, where is this executed (KFP, GCS, local etc) and what is your s3 source and where does that reside?

And if the CsvExampleGen loads dataset from s3 bucket, it will get the similar error as @dineshvelmuruga met.

Error:
/mnt/d/miniconda_ubuntu/envs/tfx_workout/lib/python3.8/site-packages/tfx/components/example_gen/csv_example_gen/executor.py in _CsvToExample(pipeline, exec_properties, split_pattern)
128 csv_files = fileio.glob(csv_pattern)
129 if not csv_files:
--> 130 raise RuntimeError(
131 'Split pattern {} does not match any files.'.format(csv_pattern))
132

RuntimeError: Split pattern s3://tfx-kfp/data/* does not match any files.

Could you kindly tell me what I should do to fix it?

From the last log it looks like you may have gotten the s3 setup to work but have you verified that the bucket exists, has the write permissions and that data is actually available there?

Also, what does your csv example gen code look like?

Sorry I didn't explain it clearly. When I said "load from local", I meant load dataset from a PVC just for test. And after I can use S3 successfully, I wanted to make the CsvExampleGen load dataset from MinIO in another pod in the same Kubeflow cluster.

My current CsvExampleGen that load dataset from a PVC.

data_dir = os.path.join(persistent_volume_mount, namespace, pipeline_name, "data")

example_gen = CsvExampleGen(
    input_base=data_dir, ...)

If we want to make CsvExampleGen load dataset from MinIO, we only need to change the input_base, right?

data_dir = "s3://dataset/foo/"

example_gen = CsvExampleGen(
    input_base=data_dir, ...)
wyljpn commented 2 years ago

@dineshvelmuruga @wyljpn I believe that you are both setting your env vars incorrectly. While the values are likely correct, they need to be set at container runtime, not at pipeline compile time as it seems as you are doing now. The correct way (if you are using KFP at least) is to use the pipeline_operator_func like this:

runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
    kubeflow_metadata_config=metadata_config,
    tfx_image=pipeline_base_image,
    pipeline_operator_funcs=(
            pipeline_operators
    ))
kubeflow_tfx_compiler = kubeflow_dag_runner.KubeflowDagRunner(config=runner_config)
kubeflow_tfx_compiler.run(pipeline)

where pipeline_operators is a listed of k8s operations where you set the env vars:

def env_params(name, value):
    def _env_params(task):
        return task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))

    return _env_params
pipeline_operators = [env_params('S3_ENDPOINT', <your-s3-endpoint)>,
                                      env_params('other-env-bar', 'other-value')]

Hi, @ConverJens Thank you for your kind response.

Base on the description, I modified my code as below shown.

def env_params(name, value):
    def _env_params(task):
        return task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))

    return _env_params

pipeline_operators = [env_params('S3_ENDPOINT', s3_path),
                      env_params('BUCKET_NAME', s3_bucket),
                      env_params('AWS_ACCESS_KEY_ID', 'minio'),
                      env_params('AWS_SECRET_ACCESS_KEY', 'minio123'),
                      env_params('AWS_ENDPOINT_URL', 'http://minio-service.kubeflow.svc.cluster.local:9000/'),
                      env_params('S3_USE_HTTPS', "0"),
                      env_params('S3_VERIFY_SSL', "False")]

runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
    kubeflow_metadata_config=metadata_config,
    tfx_image=tfx_image,
    pipeline_operator_funcs=(
        + [
            onprem.mount_pvc(
                persistent_volume_claim,
                persistent_volume,
                persistent_volume_mount,
            )
        ]
        + pipeline_operators
    ),
)

It seems that it could connect to the MinIO, because I found some folders were created in the bucket.
image

However, it didn't contain anything in the folder. And an error happened with CsvExampleGen. S3ClientError(message, code) The AWS Access Key Id you provided does not exist in our records.

Error logs:

INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
48
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
49
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
50
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
51
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
52
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
53
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
54
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
55
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
56
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
57
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
58
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
59
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
60
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
61
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
62
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
63
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
64
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
65
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
66
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
67
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
68
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
69
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
70
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
71
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
72
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
73
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
74
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
75
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
76
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
77
INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.MutableMapping[str, typing.Any]
78
INFO:absl:tensorflow_text is not available: No module named 'tensorflow_text'
79
INFO:absl:Running driver for CsvExampleGen
80
INFO:absl:MetadataStore with gRPC connection initialized
81
INFO:absl:select span and version = (0, None)
82
INFO:absl:Glob pattern for split : /rflow-tfx-pvc/rflow/aman/data/export-*/*
83
INFO:absl:Regex pattern for split : /rflow-tfx-pvc/rflow/aman/data/export-(?P<span>.*)/[^/]*
84
INFO:absl:latest span and version = (0, None)
85
INFO:absl:Adding KFP pod name aman-k6xv8-4253229490 to execution
86
INFO:absl:Running executor for CsvExampleGen
87
INFO:absl:Attempting to infer TFX Python dependency for beam
88
INFO:absl:Copying all content from install dir /opt/conda/lib/python3.7/site-packages/tfx to temp dir /tmp/tmplwuwpl51/build/tfx
89
INFO:absl:Generating README.ml-pipelines-sdk.md temp setup file at /tmp/tmplwuwpl51/build/tfx/setup.py
90
INFO:absl:Creating temporary sdist package, logs available at /tmp/tmplwuwpl51/build/tfx/setup.log
91
INFO:absl:Added --extra_package=/tmp/tmplwuwpl51/build/tfx/dist/tfx_ephemeral-7.1.5.tar.gz to beam args
92
INFO:absl:Generating examples.
93
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
94
WARNING:absl:If direct_num_workers is not equal to 1, direct_running_mode should be `multi_processing` or `multi_threading` instead of `in_memory` in order for it to have the desired worker parallelism effect.
95
INFO:absl:Processing input csv data /rflow-tfx-pvc/rflow/aman/data/export-0/* to TFExample.
96
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x7f02bf0c48c0> ====================
97
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x7f02bf0c49e0> ====================
98
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7f02bf0c4cb0> ====================
99
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x7f02bf0c4e60> ====================
100
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x7f02bf0c4ef0> ====================
101
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x7f02bf0c5050> ====================
102
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x7f02bf0c50e0> ====================
103
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x7f02bf0c5170> ====================
104
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x7f02bf0c5200> ====================
105
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7f02bf0c5440> ====================
106
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x7f02bf0c53b0> ====================
107
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x7f02bf0c54d0> ====================
108
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
109
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc4fb110> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
110
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
111
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc47ee50> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
112
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
113
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc486cd0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
114
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
115
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc486210> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
116
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
117
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc486fd0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
118
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
119
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc44f2d0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
120
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
121
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc44f790> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
122
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
123
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc44fc50> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
124
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
125
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc456150> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
126
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
127
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02be666110> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
128
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
129
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc456ad0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
130
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
131
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc456fd0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
132
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
133
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc45d510> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
134
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
135
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc45da10> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
136
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
137
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc45df10> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
138
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
139
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f02bc465450> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
140
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_AppliedPTransform_InputToRecord/ReadFromText/Read/Impulse_5)+(ref_AppliedPTransform_InputToRecord/ReadFromText/Read/Map(<lambda at iobase.py:899>)_6))+(InputToRecord/ReadFromText/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(InputToRecord/ReadFromText/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_2_split/Write)
141
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((((ref_PCollection_PCollection_2_split/Read)+(InputToRecord/ReadFromText/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/Process))+(ref_AppliedPTransform_InputToRecord/ParseCSVLine_9))+(ref_AppliedPTransform_InputToRecord/ExtractParsedCSVLines_10))+(ref_AppliedPTransform_InputToRecord/InferColumnTypes/KeyWithVoid_12))+(ref_PCollection_PCollection_5/Write))+(InputToRecord/InferColumnTypes/CombinePerKey/Precombine))+(InputToRecord/InferColumnTypes/CombinePerKey/Group/Write)
142
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((InputToRecord/InferColumnTypes/CombinePerKey/Group/Read)+(InputToRecord/InferColumnTypes/CombinePerKey/Merge))+(InputToRecord/InferColumnTypes/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_InputToRecord/InferColumnTypes/UnKey_17))+(ref_PCollection_PCollection_9/Write)
143
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/DoOnce/Impulse_66)+(ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:2957>)_67))+(ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/DoOnce/Map(decode)_69))+(ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/InitializeWrite_70))+(ref_PCollection_PCollection_43/Write))+(ref_PCollection_PCollection_44/Write)
144
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_AppliedPTransform_InputToRecord/InferColumnTypes/DoOnce/Impulse_19)+(ref_AppliedPTransform_InputToRecord/InferColumnTypes/DoOnce/FlatMap(<lambda at core.py:2957>)_20))+(ref_AppliedPTransform_InputToRecord/InferColumnTypes/DoOnce/Map(decode)_22))+(ref_AppliedPTransform_InputToRecord/InferColumnTypes/InjectDefault_23))+(ref_PCollection_PCollection_13/Write)
145
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((((((((ref_PCollection_PCollection_5/Read)+(ref_AppliedPTransform_InputToRecord/ToTFExample_24))+(ref_AppliedPTransform_SplitData/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)_27))+(ref_AppliedPTransform_WriteSplit[train]/MaybeSerialize_29))+(ref_AppliedPTransform_WriteSplit[eval]/MaybeSerialize_54))+(ref_AppliedPTransform_WriteSplit[train]/Shuffle/AddRandomKeys_31))+(ref_AppliedPTransform_WriteSplit[train]/Shuffle/ReshufflePerKey/Map(reify_timestamps)_33))+(WriteSplit[train]/Shuffle/ReshufflePerKey/GroupByKey/Write))+(ref_AppliedPTransform_WriteSplit[eval]/Shuffle/AddRandomKeys_56))+(ref_AppliedPTransform_WriteSplit[eval]/Shuffle/ReshufflePerKey/Map(reify_timestamps)_58))+(WriteSplit[eval]/Shuffle/ReshufflePerKey/GroupByKey/Write)
146
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((((WriteSplit[eval]/Shuffle/ReshufflePerKey/GroupByKey/Read)+(ref_AppliedPTransform_WriteSplit[eval]/Shuffle/ReshufflePerKey/FlatMap(restore_timestamps)_60))+(ref_AppliedPTransform_WriteSplit[eval]/Shuffle/RemoveRandomKeys_61))+(ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/WindowInto(WindowIntoFn)_71))+(ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/WriteBundles_72))+(ref_AppliedPTransform_WriteSplit[eval]/Write/Write/WriteImpl/Pair_73))+(WriteSplit[eval]/Write/Write/WriteImpl/GroupByKey/Write)
147
INFO:botocore.credentials:Found credentials in environment variables.
148
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/e5e663cb-6ae7-48ca-86f7-768734a42822.data_tfrecord.gz: Traceback (most recent call last):
149
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
150
    ContentType=request.mime_type)
151
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
152
    return self._make_api_call(operation_name, kwargs)
153
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
154
    raise error_class(parsed_response, operation_name)
155
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
156

157
During handling of the above exception, another exception occurred:
158

159
Traceback (most recent call last):
160
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
161
    response = self._client.create_multipart_upload(request)
162
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
163
    raise messages.S3ClientError(message, code)
164
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
165

166
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/cece7494-391b-4874-b7d5-096f190faf5a.data_tfrecord.gz: Traceback (most recent call last):
167
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
168
    ContentType=request.mime_type)
169
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
170
    return self._make_api_call(operation_name, kwargs)
171
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
172
    raise error_class(parsed_response, operation_name)
173
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
174

175
During handling of the above exception, another exception occurred:
176

177
Traceback (most recent call last):
178
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
179
    response = self._client.create_multipart_upload(request)
180
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
181
    raise messages.S3ClientError(message, code)
182
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
183

184
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/2c129c73-10fb-472d-8908-f0162dc527d0.data_tfrecord.gz: Traceback (most recent call last):
185
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
186
    ContentType=request.mime_type)
187
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
188
    return self._make_api_call(operation_name, kwargs)
189
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
190
    raise error_class(parsed_response, operation_name)
191
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
192

193
During handling of the above exception, another exception occurred:
194

195
Traceback (most recent call last):
196
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
197
    response = self._client.create_multipart_upload(request)
198
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
199
    raise messages.S3ClientError(message, code)
200
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
201

202
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/f2b10c7f-42a4-42af-bd2c-7924b6ee8320.data_tfrecord.gz: Traceback (most recent call last):
203
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
204
    ContentType=request.mime_type)
205
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
206
    return self._make_api_call(operation_name, kwargs)
207
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
208
    raise error_class(parsed_response, operation_name)
209
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
210

211
During handling of the above exception, another exception occurred:
212

213
Traceback (most recent call last):
214
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
215
    response = self._client.create_multipart_upload(request)
216
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
217
    raise messages.S3ClientError(message, code)
218
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
219

220
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/63e6a393-8efa-4a1f-9fb0-2e6dec628fe3.data_tfrecord.gz: Traceback (most recent call last):
221
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
222
    ContentType=request.mime_type)
223
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
224
    return self._make_api_call(operation_name, kwargs)
225
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
226
    raise error_class(parsed_response, operation_name)
227
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
228

229
During handling of the above exception, another exception occurred:
230

231
Traceback (most recent call last):
232
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
233
    response = self._client.create_multipart_upload(request)
234
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
235
    raise messages.S3ClientError(message, code)
236
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
237

238
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/aa3e628c-92ce-4d52-9436-c603ecbfa5f9.data_tfrecord.gz: Traceback (most recent call last):
239
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
240
    ContentType=request.mime_type)
241
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
242
    return self._make_api_call(operation_name, kwargs)
243
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
244
    raise error_class(parsed_response, operation_name)
245
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
246

247
During handling of the above exception, another exception occurred:
248

249
Traceback (most recent call last):
250
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
251
    response = self._client.create_multipart_upload(request)
252
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
253
    raise messages.S3ClientError(message, code)
254
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
255

256
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/b7faefa2-adb3-45fd-9ae4-f05da6fac6fc.data_tfrecord.gz: Traceback (most recent call last):
257
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
258
    ContentType=request.mime_type)
259
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
260
    return self._make_api_call(operation_name, kwargs)
261
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
262
    raise error_class(parsed_response, operation_name)
263
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
264

265
During handling of the above exception, another exception occurred:
266

267
Traceback (most recent call last):
268
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
269
    response = self._client.create_multipart_upload(request)
270
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
271
    raise messages.S3ClientError(message, code)
272
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
273

274
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/6c44ccd5-f98e-40a6-a5d6-1a2a48ef3f0f.data_tfrecord.gz: Traceback (most recent call last):
275
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
276
    ContentType=request.mime_type)
277
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
278
    return self._make_api_call(operation_name, kwargs)
279
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
280
    raise error_class(parsed_response, operation_name)
281
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
282

283
During handling of the above exception, another exception occurred:
284

285
Traceback (most recent call last):
286
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
287
    response = self._client.create_multipart_upload(request)
288
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
289
    raise messages.S3ClientError(message, code)
290
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
291

292
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/85ca75d5-b722-4242-a60a-c2b3b02a3aea.data_tfrecord.gz: Traceback (most recent call last):
293
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
294
    ContentType=request.mime_type)
295
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
296
    return self._make_api_call(operation_name, kwargs)
297
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
298
    raise error_class(parsed_response, operation_name)
299
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
300

301
During handling of the above exception, another exception occurred:
302

303
Traceback (most recent call last):
304
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
305
    response = self._client.create_multipart_upload(request)
306
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
307
    raise messages.S3ClientError(message, code)
308
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
309

310
ERROR:root:Error in _start_upload while inserting file s3://tfx/CsvExampleGen/examples/1340/eval/beam-temp-data_tfrecord-36dec87c74d911ec8c110242ac110040/16cffcec-47fb-4fcb-a0b2-3f8412754cf0.data_tfrecord.gz: Traceback (most recent call last):
311
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
312
    ContentType=request.mime_type)
313
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
314
    return self._make_api_call(operation_name, kwargs)
315
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
316
    raise error_class(parsed_response, operation_name)
317
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
318

319
During handling of the above exception, another exception occurred:
320

321
Traceback (most recent call last):
322
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
323
    response = self._client.create_multipart_upload(request)
324
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
325
    raise messages.S3ClientError(message, code)
326
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
327

328
/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/cli_logger.py:61: FutureWarning: Not all Ray CLI dependencies were found. In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will only be usable via `pip install 'ray[default]'`. Please update your install command.
329
  "update your install command.", FutureWarning)
330
Traceback (most recent call last):
331
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 171, in create_multipart_upload
332
    ContentType=request.mime_type)
333
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
334
    return self._make_api_call(operation_name, kwargs)
335
  File "/opt/conda/lib/python3.7/site-packages/botocore/client.py", line 708, in _make_api_call
336
    raise error_class(parsed_response, operation_name)
337
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.
338

339
During handling of the above exception, another exception occurred:
340

341
Traceback (most recent call last):
342
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
343
  File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process
344
  File "apache_beam/runners/common.py", line 893, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
345
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1131, in process
346
    self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
347
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 200, in _f
348
    return fnc(self, *args, **kwargs)
349
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 196, in open_writer
350
    return FileBasedSinkWriter(self, writer_path)
351
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 417, in __init__
352
    self.temp_handle = self.sink.open(temp_shard_path)
353
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 200, in _f
354
    return fnc(self, *args, **kwargs)
355
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 138, in open
356
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
357
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filesystems.py", line 229, in create
358
    return filesystem.create(path, mime_type, compression_type)
359
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3filesystem.py", line 171, in create
360
    return self._path_open(path, 'wb', mime_type, compression_type)
361
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3filesystem.py", line 152, in _path_open
362
    path, mode, mime_type=mime_type)
363
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 99, in open
364
    uploader = S3Uploader(self.client, filename, mime_type)
365
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 555, in __init__
366
    self._start_upload()
367
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 260, in wrapper
368
    return fun(*args, **kwargs)
369
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 574, in _start_upload
370
    raise e
371
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
372
    response = self._client.create_multipart_upload(request)
373
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
374
    raise messages.S3ClientError(message, code)
375
apache_beam.io.aws.clients.s3.messages.S3ClientError: ('The AWS Access Key Id you provided does not exist in our records.', 403)
376

377
During handling of the above exception, another exception occurred:
378

379
Traceback (most recent call last):
380
  File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main
381
    "__main__", mod_spec)
382
  File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code
383
    exec(code, run_globals)
384
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in <module>
385
    main()
386
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 353, in main
387
    execution_info = launcher.launch()
388
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch
389
    copy.deepcopy(execution_decision.exec_properties))
390
  File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 72, in _run_executor
391
    copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
392
  File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/base_example_gen_executor.py", line 324, in Do
393
    split_name)))
394
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/pipeline.py", line 580, in __exit__
395
    self.result = self.run()
396
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/pipeline.py", line 559, in run
397
    return self.runner.run_pipeline(self, self._options)
398
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 133, in run_pipeline
399
    return runner.run_pipeline(pipeline, options)
400
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
401
    pipeline.to_runner_api(default_environment=self._default_environment))
402
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
403
    return self.run_stages(stage_context, stages)
404
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 359, in run_stages
405
    bundle_context_manager,
406
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 555, in _run_stage
407
    bundle_manager)
408
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 595, in _run_bundle
409
    data_input, data_output, input_timers, expected_timer_output)
410
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1004, in process_bundle
411
    timer_inputs)):
412
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator
413
    yield fs.pop().result()
414
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 435, in result
415
    return self.__get_result()
416
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
417
    raise self._exception
418
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run
419
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
420
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1000, in execute
421
    dry_run)
422
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 896, in process_bundle
423
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
424
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
425
    response = self.worker.do_instruction(request)
426
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
427
    getattr(request, request_type), request.instruction_id)
428
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
429
    bundle_processor.process_bundle(instruction_id))
430
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
431
    element.data)
432
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
433
    self.output(decoded_value)
434
  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
435
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
436
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
437
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
438
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
439
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
440
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
441
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
442
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
443
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
444
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
445
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
446
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
447
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
448
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
449
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
450
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
451
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
452
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
453
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
454
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
455
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
456
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
457
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
458
  File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process
459
  File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
460
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
461
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
462
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
463
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
464
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
465
  File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
466
  File "/opt/conda/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
467
    raise exc.with_traceback(traceback)
468
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
469
  File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process
470
  File "apache_beam/runners/common.py", line 893, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
471
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1131, in process
472
    self.writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
473
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 200, in _f
474
    return fnc(self, *args, **kwargs)
475
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 196, in open_writer
476
    return FileBasedSinkWriter(self, writer_path)
477
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 417, in __init__
478
    self.temp_handle = self.sink.open(temp_shard_path)
479
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/options/value_provider.py", line 200, in _f
480
    return fnc(self, *args, **kwargs)
481
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 138, in open
482
    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
483
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/filesystems.py", line 229, in create
484
    return filesystem.create(path, mime_type, compression_type)
485
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3filesystem.py", line 171, in create
486
    return self._path_open(path, 'wb', mime_type, compression_type)
487
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3filesystem.py", line 152, in _path_open
488
    path, mode, mime_type=mime_type)
489
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 99, in open
490
    uploader = S3Uploader(self.client, filename, mime_type)
491
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 555, in __init__
492
    self._start_upload()
493
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 260, in wrapper
494
    return fun(*args, **kwargs)
495
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 574, in _start_upload
496
    raise e
497
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/s3io.py", line 566, in _start_upload
498
    response = self._client.create_multipart_upload(request)
499
  File "/opt/conda/lib/python3.7/site-packages/apache_beam/io/aws/clients/s3/boto3_client.py", line 176, in create_multipart_upload
500
    raise messages.S3ClientError(message, code)
501
apache_beam.io.aws.clients.s3.messages.S3ClientError: ("The AWS Access Key Id you provided does not exist in our records. [while running 'WriteSplit[eval]/Write/Write/WriteImpl/WriteBundles']", 403)

Because the folder was created, I think I configured AWS_ACCESS_KEY_ID and other parameters correctly. What caused the error, and how to fix it? Do you have any idea?

ConverJens commented 2 years ago

@wyljpn

So I think that you have done everything correctly up till this point.

If I'm reading this correct you are now reading data from your pvc and csv example gen is supposed to output to s3, right?

Since they directory is created in s3 this means that you have set the env vars on your pipeline correctly because TFX can access the specified pipeline root and has created the output dir for your artifact.

Beam seems to be able to read and process the data when it's done it fails to upload it. As you can see in the log it also states that it has found s3 credentials so they are probably passed to Beam correctly.

I believe the failure is because you haven't explicitly specified that you are using the DirectRunner which means that Beam takes a slightly different code path resulting in env vars not being passed on to the subprocess. You can fix this by adding: "--runner=DirectRunner", to your specified beam args.

If this doesn't work, please post your entire code file where you specify your pipeline, env vars, beam args etc.

wyljpn commented 2 years ago

@ConverJens

If I'm reading this correct you are now reading data from your pvc and csv example gen is supposed to output to s3, right? Yes.

Adding "--runner=DirectRunner" still didn't work, it got the same error. apache_beam.io.aws.clients.s3.messages.S3ClientError: ("The AWS Access Key Id you provided does not exist in our records. [while running 'WriteSplit[train]/Write/Write/WriteImpl/WriteBundles']", 403)

Related code:

s3_path = 'minio-service.kubeflow.svc.cluster.local:9000'
s3_bucket = 'tfx'
output_base = os.path.join("s3://", s3_bucket)

def init_components(data_dir, module_file, serving_model_dir):
    example_gen = CsvExampleGen(
        input_base=data_dir,...)

    # All components
    components = [
        example_gen,
    …
    ]
    return components

def init_kubeflow_pipeline(
    components, pipeline_root, direct_num_workers) :

    beam_arg = [
        f"--direct_num_workers={direct_num_workers}",
        "--runner=DirectRunner"
        ]

    k_pipeline = pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        beam_pipeline_args=beam_arg,
    )
    return k_pipeline

if __name__ == "__main__":

    metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()

    metadata_config.mysql_db_service_host.value = 'mysql.kubeflow'
    metadata_config.mysql_db_service_port.value = "3306"
    metadata_config.mysql_db_name.value = "metadb"
    metadata_config.mysql_db_user.value = "root"
    metadata_config.mysql_db_password.value = ""
    metadata_config.grpc_config.grpc_service_host.value ='metadata-grpc-service.kubeflow'
    metadata_config.grpc_config.grpc_service_port.value ='8080'

    tfx_image = os.environ.get(
        "KUBEFLOW_TFX_IMAGE",
        "wyljpn/tfx:0.28.0",
    )

    components = init_components(
        data_dir,
        module_file,
        serving_model_dir=serving_model_dir
    )

    def env_params(name, value):
        def _env_params(task):
            return task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))
        return _env_params

    pipeline_operators = [env_params('S3_ENDPOINT', s3_path),
                          env_params('BUCKET_NAME', s3_bucket),
                          env_params('AWS_ACCESS_KEY_ID', 'minio'),
                          env_params('AWS_SECRET_ACCESS_KEY', 'minio123'),
                          env_params('AWS_ENDPOINT_URL', 'http://minio-service.kubeflow.svc.cluster.local:9000/'),
                          env_params('S3_USE_HTTPS', "0"),
                          env_params('S3_VERIFY_SSL', "False")]

    runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
        kubeflow_metadata_config=metadata_config,
        tfx_image=tfx_image,

        pipeline_operator_funcs=(
            kubeflow_dag_runner.get_default_pipeline_operator_funcs()
            + [
                onprem.mount_pvc(
                    persistent_volume_claim,
                    persistent_volume,
                    persistent_volume_mount,
                )
            ]
            + pipeline_operators

        ),
    )

    k_pipeline = init_kubeflow_pipeline(components, output_base, direct_num_workers=0)
    output_filename = f"{pipeline_name}.yaml"

    kubeflow_dag_runner.KubeflowDagRunner(
        config=runner_config,
        output_dir=output_dir,
        output_filename=output_filename,
    ).run(k_pipeline)
ConverJens commented 2 years ago

@wyljpn I don't know if there was a missunderstanding but now you are missing all the s3 related beam args that you previously posted. These also need to be present:

beam_arg = [
  '--direct_num_workers=4',
  '--direct_running_mode=multi_processing',
  '--s3_endpoint_url=http://minio-service.kubeflow:9000/',
  '--s3_access_key_id=minio',
  '--s3_secret_access_key=minio123',
  '--s3_verify=False',
  '--s3_disable_ssl=True'
]

so your full list of beam args should look more like this:

beam_arg = [
f"--direct_num_workers={direct_num_workers}",
   "--runner=DirectRunner"
  '--direct_running_mode=multi_processing',
  '--s3_endpoint_url=http://minio-service.kubeflow:9000/',
  '--s3_access_key_id=minio',
  '--s3_secret_access_key=minio123',
  '--s3_verify=False',
  '--s3_disable_ssl=True'
]

Also note that you need to explicitly specify multi_processing for the direct runner, otherwise it will use in_memory which is just a single process regardless of the direct num workers argument.

wyljpn commented 2 years ago

@ConverJens Sorry, I thought we only need to configure parameters for pipeline_operator_funcs, so I had deleted the parameters for beam args.

After I passed the parameters in beam args, outputs of TFX components could be saved in the MinIO. @ConverJens Thank you for your detailed explanation! Without your help, I couldn't make it.

image

The related code:

s3_path = 'minio-service.kubeflow.svc.cluster.local:9000'
s3_bucket = 'tfx'
output_base = os.path.join("s3://", s3_bucket)

def init_components(data_dir, module_file, serving_model_dir):
    example_gen = CsvExampleGen(
        input_base=data_dir,...)

    # All components
    components = [
        example_gen,
    …
    ]
    return components

def init_kubeflow_pipeline(
    components, pipeline_root, direct_num_workers) :

    beam_arg = [
        f"--direct_num_workers={direct_num_workers}",
        "--runner=DirectRunner",
        '--direct_running_mode=multi_processing',
        '--s3_endpoint_url=http://minio-service.kubeflow.svc.cluster.local:9000/',
        '--s3_access_key_id=minio',
        '--s3_secret_access_key=minio123',
        # '--s3_verify=False',   # no need to pass
        # '--s3_disable_ssl=True'  # no need to pass
        ]

    k_pipeline = pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        beam_pipeline_args=beam_arg,
    )
    return k_pipeline

if __name__ == "__main__":

    metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()

    metadata_config.mysql_db_service_host.value = 'mysql.kubeflow'
    metadata_config.mysql_db_service_port.value = "3306"
    metadata_config.mysql_db_name.value = "metadb"
    metadata_config.mysql_db_user.value = "root"
    metadata_config.mysql_db_password.value = ""
    metadata_config.grpc_config.grpc_service_host.value ='metadata-grpc-service.kubeflow'
    metadata_config.grpc_config.grpc_service_port.value ='8080'

    tfx_image = os.environ.get(
        "KUBEFLOW_TFX_IMAGE",
        "wyljpn/tfx:0.28.0",
    )

    components = init_components(
        data_dir,
        module_file,
        serving_model_dir=serving_model_dir
    )

    def env_params(name, value):
        def _env_params(task):
            return task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))
        return _env_params

    pipeline_operators = [env_params('S3_ENDPOINT', s3_path),
                          env_params('BUCKET_NAME', s3_bucket),
                          env_params('AWS_ACCESS_KEY_ID', 'minio'),
                          env_params('AWS_SECRET_ACCESS_KEY', 'minio123'),
                          env_params('AWS_ENDPOINT_URL', 'http://minio-service.kubeflow.svc.cluster.local:9000/'),
                          env_params('S3_USE_HTTPS', "0"),
                          env_params('S3_VERIFY_SSL', "False")]

    runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
        kubeflow_metadata_config=metadata_config,
        tfx_image=tfx_image,

        pipeline_operator_funcs=(
            kubeflow_dag_runner.get_default_pipeline_operator_funcs()
            + [
                onprem.mount_pvc(
                    persistent_volume_claim,
                    persistent_volume,
                    persistent_volume_mount,
                )
            ]
            + pipeline_operators

        ),
    )

    k_pipeline = init_kubeflow_pipeline(components, output_base, direct_num_workers=0)
    output_filename = f"{pipeline_name}.yaml"

    kubeflow_dag_runner.KubeflowDagRunner(
        config=runner_config,
        output_dir=output_dir,
        output_filename=output_filename,
    ).run(k_pipeline)
ConverJens commented 2 years ago

@wyljpn No worries, happy to help :) Great that you go it working! When I started using TFX with Minio 2 years ago there wasn't a single comprehensive post about it so hopefully this can serve as a guide to others encountering the same thing :)

wyljpn commented 2 years ago

@ConverJens Yes, thank you for your contributions!

singhniraj08 commented 1 year ago

@dineshvelmuruga,,

As mentioned in above comment, you need to configure Pusher to push the model to a directory wherever your TF serving instance is looking.

For more details please refer to official documentation on inferencing model on TF Serving and example to run TFX on cloud AI platform.

Thank you!

singhniraj08 commented 1 year ago

Closing this due to inactivity. Please take a look into the answers provided above, feel free to reopen and post your comments(if you still have queries on this). Thank you!