Closed dhruvesh09 closed 2 years ago
The image is huge 5.5GB! What happened?
I already though the 0.27 image was huge at 2.5GB.
List of 0.27 things I've been removing:
RUN apt-get clean autoclean
RUN apt-get autoremove --yes
# Remove over 2GB of cache
RUN rm -rf /root/.cache
RUN rm -rf /var/lib/{apt,dpkg}
RUN rm -rf /var/cache/*
I've also been removing the google-cloud-* packages which aren't needed.
What also inflates the image is the source being stored both in /tfx/src
and the usual /usr/local/lib/python3.7/dist-packages/tfx
this takes up space and is confusing if you have to patch something.
Not sure what has caused this more than double increase, but I can't easily work with an image of that size on my setup.
The image is huge 5.5GB! What happened?
I already though the 0.27 image was huge at 2.5GB.
Sorry for the inconvenience here.
The increase actually happened since 0.28, because we are now basing off a verified build which is compatible with GPU out of box.
Note that we still release our python packages which should be installable to other compatible base images (i.e, official 'python' images or 'ubuntu').
Fair enough, I've been using the TFX pre-built image exclusively since employing Kubeflow and haven't really looked into what are the minimal to build from a base python image. Having all the tfx library friends pre-installed when upgrading without referring to the compatibility chart is also very useful, as I struggled with it in the past.
Perhaps there could be two images built, one GPU compatible OOB and one more minimal?
Fair enough, I've been using the TFX pre-built image exclusively employing Kubeflow and haven't really looked into what are the minimal to build from a base python image. Having all the tfx library friends pre-installed when upgrading without referring to the compatibility chart is also very useful, as I struggled with it in the past.
Perhaps there could be two images built, one GPU compatible OOB and one more minimal?
If anyone in the community is willing to maintain this, we are happy to cross link to that.
Our team isn't versed enough to pick the other "minimal" environment here. Neither is our test infra designed to work with multiple images which require testing upon release time.
First of all, thanks for releasing TFX 0.30.0!
My previous approach for using TFX was based on this template for the Chicago taxi dataset.
I made some quick adjustments in renaming some
And now I am using the default docker image that gets automatically built when updating kfp tfx pipelines in 0.30.0.
Within the transform component I pass along the preprocessing function.
transform = Transform(
examples=example_gen.outputs["examples"],
schema=schema_gen.outputs["schema"],
preprocessing_fn=preprocessing_fn,
)
Whereby preprocessing_fn is a function path as defined in the configs.
However, when running the TFX 0.30.0 based pipeline, there seems to be an issue with how the path is supplied to the transform component, and possibly to the train component.
Given that my folder structure is identical to the template, how should I tackle this?
Hi, after recently upgrading to 0.30.0 I'm experiencing the following error with ImportExampleGen:
2021-05-19 14:28:51.382035: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
INFO:absl:Running driver for ImportExampleGen
INFO:absl:MetadataStore with gRPC connection initialized
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 367, in <module>
main()
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in main
execution_info = launcher.launch()
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch
self._exec_properties)
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver
component_info=self._component_info)
File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 268, in pre_execution
component_info)
File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 111, in resolve_exec_properties
input_base_uri=input_base)
File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 82, in get_input_processor
raise NotImplementedError
NotImplementedError
My ImportExampleGen is instantiated as follows:
# TFX expects splits as train and eval, these defaults are used in the downstream components.
input_config = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(name='train', pattern='*train.tfrecord'),
example_gen_pb2.Input.Split(name='eval', pattern='*test.tfrecord'),
])
# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = ImportExampleGen(
input_base=data_root,
input_config=input_config,
payload_format=example_gen_pb2.FORMAT_TF_EXAMPLE
)
This is when attempting to run on KubeFlow pipelines. This pipeline ran successfully in version 0.28.0.
@robinvanschaik Hi, thank you for the report.
Could you clarify what do you mean by "default docker image"? Because the version in the dockerfile is not updated automatically. I've tried the template with Kubeflow Pipelines with 0.30.0 and it worked well in my case. Could you share more information like your Dockerfile or file structure? Thanks!
Hi, after recently upgrading to 0.30.0 I'm experiencing the following error with ImportExampleGen:
2021-05-19 14:28:51.382035: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0 INFO:absl:Running driver for ImportExampleGen INFO:absl:MetadataStore with gRPC connection initialized Traceback (most recent call last): File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 367, in <module> main() File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in main execution_info = launcher.launch() File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch self._exec_properties) File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver component_info=self._component_info) File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 268, in pre_execution component_info) File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 111, in resolve_exec_properties input_base_uri=input_base) File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 82, in get_input_processor raise NotImplementedError NotImplementedError
My ImportExampleGen is instantiated as follows:
# TFX expects splits as train and eval, these defaults are used in the downstream components. input_config = example_gen_pb2.Input(splits=[ example_gen_pb2.Input.Split(name='train', pattern='*train.tfrecord'), example_gen_pb2.Input.Split(name='eval', pattern='*test.tfrecord'), ]) # Brings data into the pipeline or otherwise joins/converts training data. example_gen = ImportExampleGen( input_base=data_root, input_config=input_config, payload_format=example_gen_pb2.FORMAT_TF_EXAMPLE )
This is when attempting to run on KubeFlow pipelines. This pipeline ran successfully in version 0.28.0.
Is ImportExampleGen the one tfx offered? If it's a full custom ExampleGen (not inherit from FileBasedExampleGen), could you check if correct driver is used
@1025KB it's the one offered from tfx. I'm importing the component like so from tfx.components import ImportExampleGen
Hi, @jiyongjung0
No problem, this is what I understand so far.
This works for me:
# Provide a Google container registry url to store the newly built image
tfx_image = configs.GCR_URL
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config, tfx_image=tfx_image)
In my case the preceding step creates a docker image AND appends the python path in the docker image.
No local setup.py, copying the directory and configuring the PYTHONPATH.
I believe this is similar to the docker image that would be dumped in the directory <0.30.0.
This works:
# Provide a Google container registry url to store the newly built image
tfx_image = configs.GCR_URL
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config, tfx_image=tfx_image)
The tfx taxi example has a comment stating the following:
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config, tfx_image=tfx_image)
If the os.environ.get('KUBEFLOW_TFX_IMAGE', None)
returns None
it falls back on the following line.
self.tfx_image = tfx_image or DEFAULT_KUBEFLOW_TFX_IMAGE
Either I completely misunderstood what's going on, did something wrong while experimenting.
As far as I can tell there should not be any difference in the Docker images that get generated, no?
@robinvanschaik
Yes. Your solution is perfect.
We thought that using an environment variable to pass image names are quite fragile and hard to understand. So we changed how we pass images in TFX 0.30.0. The new recommended way is passing image name to KubeflowDagRunnerConfig directly as you did. We also updated our generated code from the template, too.
So what happened here is that we don't set KUBEFLOW_TFX_IMAGE environment variable any more, the DEFAULT_KUBEFLOW_TFX_IMAGE was used instead of the newly built image and DEFAULT_KUBEFLOW_TFX_IMAGE doesn't include your code in it.
We had to let users know that they should replace their kubeflow_runner.py as you did in the release log, but I've missed that part. I'll add that retrospectively.
Thank you so much for reporting this and debugging it.
@jiyongjung0 Thank you so much!
Glad I was able to help. :)
@1025KB any update on my issue?
Hi, could you check this in your code?
`from tfx.v1.components import ImportExampleGen
print(ImportExampleGen) print(ImportExampleGen.DRIVER_CLASS)`
It should be
<class 'tfx.components.example_gen.import_example_gen.component.ImportExampleGen'> <class 'tfx.components.example_gen.driver.FileBasedDriver'>
somehow your code calls the tfx.components.example_gen.driver.Driver instead of FileBasedDriver
@1025KB my dockerfile
FROM tensorflow/tfx:0.30.0
WORKDIR /pipeline
COPY ./ ./
ENV PYTHONPATH="/pipeline:${PYTHONPATH}"
RUN /usr/bin/python3 -m pip install --upgrade pip
RUN pip install -r requirements.txt
RUN python -c 'import tensorflow_text as text # Registers the ops.'
So I originally was importing ImportExampleGen
from tfx.components not tfx.v1.components (what's the difference?)
Importing from tfx.v1.components I get the following trying to build the image:
Traceback (most recent call last):
File "kubeflow_runner.py", line 8, in <module>
from pipeline import _create_pipeline
File "/home/sa_105350268277010201544/categorizer_v2/tfx-python/pipeline.py", line 31, in <module>
from tfx.v1.components import ImportExampleGen
ModuleNotFoundError: No module named 'tfx.v1'
Error while running "/opt/conda/bin/python3.7 kubeflow_runner.py"
Importing from tfx.components
I get the following:
<class 'tfx.components.example_gen.import_example_gen.component.ImportExampleGen'>
<class 'tfx.components.example_gen.driver.Driver'>
Followed by the error again:
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 367, in <module>
main()
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in main
execution_info = launcher.launch()
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch
self._exec_properties)
File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver
component_info=self._component_info)
File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 268, in pre_execution
component_info)
File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 111, in resolve_exec_properties
input_base_uri=input_base)
File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 82, in get_input_processor
raise NotImplementedError
NotImplementedError
NOTE: I've tested this on my machine (with tfx 0.30.0 installed using the pip package) and everything works correctly. This issue seems to be isolated to the tfx 0.30.0 image and thus only appears when I try to run in a Kubeflow pipeline.
This increase in the image size has made our automatic deployment pipelines (in Github Actions) broken. We create the docker image in a Github Actions Runner with 14GBs of storage (which by default a good part of is assigned to other things) and post 0.27, the docker image generation process fails.
This increase in the image size has made our automatic deployment pipelines (in Github Actions) broken. We create the docker image in a Github Actions Runner with 14GBs of storage (which by default a good part of is assigned to other things) and post 0.27, the docker image generation process fails.
@AlirezaSadeghi I suggest we move the image size to a different issue to follow up.
w.r.t the 14GB storage: is it a hard cap? Are you able to use either Docker Hub or Google Cloud container registry or some other systems, which may or may not have such a cap?
@zhitaoli for Github actions, that seems to be hard cap, based on their docs, and some of their issues (like https://github.com/actions/virtual-environments/issues/2840#issuecomment-791177163)
Note that we do use gcr for storing and accessing the images, but they are built in the CI pipeline (in Github actions (something like the tfx pipeline create
command gets called in CI))
The wheel generation portion of the Transform component seems to have a bit of tight assumptions.
Problem:
Transform
component failing with ModuleNotFoundError: No module named 'user_module_0'
More Info:
The module_file
we're providing for the Transform component, imports a couple of functions and constants from packages that are present elsewhere in the project structure, and not to the closest vicinity to the transform.py
.
def package_user_module_file(instance_name: Text, module_path: Text,
pipeline_root: Text) -> Tuple[Text, Text]:
...
source_files = []
# Discover all Python source files in this directory for inclusion.
for file_name in os.listdir(user_module_dir):
if file_name.endswith('.py'):
source_files.append(file_name)
module_names = []
for file_name in source_files:
if file_name in (_EPHEMERAL_SETUP_PY_FILE_NAME, '__init__.py'):
continue
module_name = re.sub(r'\.py$', '', file_name)
module_names.append(module_name)
This dependency resolution logic assumes that all there is required to do is to include the immediate neighbouring python files. While this can be true, it doesn't cover many usecases.
Our transform.py
is constructed like this (Its path being A.B.transform.py
)
from X.Y.Z import categorical_features
from T.X.Y import transformed_name_generation_logic
...
def preprocessing_fn():
....
And now no matter what, our Transform code fails with dependency resolution errors.
Our current workarounds would be to either replace the udf_utils
module in the docker image while building the docker image (in Dockerfile). Or try to subclass the Transform component and its executor and change the getPreprocessingFn logic.
Would be great to know your thoughts on it.
@1025KB my dockerfile
FROM tensorflow/tfx:0.30.0 WORKDIR /pipeline COPY ./ ./ ENV PYTHONPATH="/pipeline:${PYTHONPATH}" RUN /usr/bin/python3 -m pip install --upgrade pip RUN pip install -r requirements.txt RUN python -c 'import tensorflow_text as text # Registers the ops.'
So I originally was importing
ImportExampleGen
from tfx.components not tfx.v1.components (what's the difference?)Importing from tfx.v1.components I get the following trying to build the image:
Traceback (most recent call last): File "kubeflow_runner.py", line 8, in <module> from pipeline import _create_pipeline File "/home/sa_105350268277010201544/categorizer_v2/tfx-python/pipeline.py", line 31, in <module> from tfx.v1.components import ImportExampleGen ModuleNotFoundError: No module named 'tfx.v1' Error while running "/opt/conda/bin/python3.7 kubeflow_runner.py"
Importing from
tfx.components
I get the following:<class 'tfx.components.example_gen.import_example_gen.component.ImportExampleGen'> <class 'tfx.components.example_gen.driver.Driver'>
This should be driver.FileBasedDriver if you use latest tfx. It seems your pipeline compile env (old tfx version that uses Driver) and runtime env (new tfx version that Driver itself is abstract) uses different tfx version
Followed by the error again:
Traceback (most recent call last): File "/opt/conda/lib/python3.7/runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "/opt/conda/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 367, in
main() File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/kubeflow/container_entrypoint.py", line 360, in main execution_info = launcher.launch() File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 198, in launch self._exec_properties) File "/opt/conda/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 167, in _run_driver component_info=self._component_info) File "/opt/conda/lib/python3.7/site-packages/tfx/dsl/components/base/base_driver.py", line 268, in pre_execution component_info) File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 111, in resolve_exec_properties input_base_uri=input_base) File "/opt/conda/lib/python3.7/site-packages/tfx/components/example_gen/driver.py", line 82, in get_input_processor raise NotImplementedError NotImplementedError NOTE: I've tested this on my machine (with tfx 0.30.0 installed using the pip package) and everything works correctly. This issue seems to be isolated to the tfx 0.30.0 image and thus only appears when I try to run in a Kubeflow pipeline.
I am also having an issue with the visualisations of the different components in Kubeflow pipelines (KFP).
It seems that the pathing for the visualisation does not align with the actual bucket names, but I am not sure if this a user error on my end. This seemed to work fine in the past, until we have updated to 0.30.0.
Once we visualise in KFP we get the following error message in the component.
Note I made the gs path fictional, but it will highlight my point.
OSError: Invalid input path gs://kubeflowpipelines-default/pipeline_output/pipeline_version_1/StatisticsGen/statistics/1361/eval/stats_tfrecord.
In reality the folders seem to be named in the following.
OSError: Invalid input path gs://kubeflowpipelines-default/pipeline_output/pipeline_version_1/StatisticsGen/statistics/1361/Split-eval/FeatureStats.pb
My gut tells me a couple of things that could be going on.
Importing as
from tfx import v1 as tfx
rather than import tfx
causes some minor difference in behavior.
Or I need to adjust the names somehow, just like I do in the example_gen component.
output_config = example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=7),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=1)
]))
Any pointers?
We also do see the issue @robinvanschaik just brought up on kubeflow.
I am also having an issue with the visualisations of the different components in Kubeflow pipelines (KFP).
It seems that the pathing for the visualisation does not align with the actual bucket names, but I am not sure if this a user error on my end. This seemed to work fine in the past, until we have updated to 0.30.0.
Once we visualise in KFP we get the following error message in the component. Note I made the gs path fictional, but it will highlight my point.
OSError: Invalid input path gs://kubeflowpipelines-default/pipeline_output/pipeline_version_1/StatisticsGen/statistics/1361/eval/stats_tfrecord.
In reality the folders seem to be named in the following.
OSError: Invalid input path gs://kubeflowpipelines-default/pipeline_output/pipeline_version_1/StatisticsGen/statistics/1361/Split-eval/FeatureStats.pb
This might be a versioin not match issue too, as we changed the path recently
My gut tells me a couple of things that could be going on.
Importing as
from tfx import v1 as tfx
rather thanimport tfx
causes some minor difference in behavior.Or I need to adjust the names somehow, just like I do in the example_gen component.
output_config = example_gen_pb2.Output( split_config=example_gen_pb2.SplitConfig(splits=[ example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=7), example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2), example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=1) ]))
it's tfx.v1.proto.SplitConfig now, you can check this example
Any pointers?
I've spent the last 2 days trying to upgrade our pipelines to 0.30.0 and 1.0.0rc1. It's been very infuriating.
The UDF changes break the workaround in https://github.com/tensorflow/tfx/issues/3435 for me when using
h --worker_harness_container_image=none
. This leads to No module named transform
as I believe the extra packages are not installed.
It appears to me that running with Flink is now pretty much impossible. again since 0.30.0
After reading the MR and and setting UNSUPPORTED_DO_NOT_PACKAGE_USER_MODULES="true" when builing the pipeline, adding it to the pipeline via container_op and also adding it to the EXTERNAL flink workers, my pipelines are still brokenwith a ModuleNotFoundError: No module named 'user_module_0'
.
The env variable also doesn't disable the attempt to create the root when KubeflowDagRunner is run to generate the pipeline archive.
This entire setup currently is very cumbersome and hard to debug/grasp:
Setting up TFX with a sensible structure, CI publishing pipelines, Kubernetes with Kubeflow and Flink to even work is extremely hard already and it's very hard to keep a map of what runs where and how you containers need to be organised. Now there is more code on a bucket with wheels floating about and weird pickling and pip installation logic that results in problems. To that I've also been experiencing regular OOM, grpc errors, rate limiting errors and hanging errors that have been reported on Flink/DirectRunner with no real workarounds/solutions presented. Oh and packages between the tfx image and the python beam sdk are different, unless you make sure they aren't.
This absolute replacement of how module_file
works and the addition of module_path
should have been a massive breaking change and the use-case of putting your code in a container (which is I thought the spirit of Kubeflow) should be documented and supported.
I don't mind the frequent deprecation or API changes and don't want this to sound like a massive rant, but I just want to communicate how difficult it is to run TFX as advertised MLOps way: https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning, when embracing devops, custom components, kubeflow and beam.
I've been upgrading in since 0.21 with hopes of solving large dataset issues (partly with upgraded beam), but have been running into lots of road stops. Looking at the issues I don't think anyone is running Flink at scale with all the components and working properly, this release just made it 2x more for me to attempt do that, image size aside.
@robinvanschaik @AlirezaSadeghi For the visualization issue, https://github.com/kubeflow/pipelines/issues/5638 is the issue in the Kubeflow Pipelines and we need to fix in that end.
Hi @jiyongjung0 ,
Thanks for pointing me to the right Github issue.
I sometimes have trouble finding out whether an issue was already raised as it could have been done so at the tfx, kfp or the individual tfx component repos.
I will keep an eye out!
We also have another issue on the Evaluator component with our Estimator models that are trained in CAIP.
The evaluator component (the beam job essentially) is failing with the following stacktrace:
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 267, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/trigger.py", line 1220, in process_elements
yield output.with_value(self.phased_combine_fn.apply(output.value))
File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/combiners.py", line 963, in merge_only
return self.combine_fn.merge_accumulators(accumulators)
File "/usr/local/lib/python3.6/dist-packages/apache_beam/transforms/combiners.py", line 752, in merge_accumulators
a in zip(self._combiners, zip(*accumulators))
File "/usr/local/lib/python3.6/dist-packages/apache_beam/transforms/combiners.py", line 752, in <listcomp>
a in zip(self._combiners, zip(*accumulators))
File "/usr/local/lib/python3.6/site-packages/tensorflow_model_analysis/evaluators/eval_saved_model_util.py", line 245, in merge_accumulators
self._maybe_do_batch(result)
File "/usr/local/lib/python3.6/site-packages/tensorflow_model_analysis/evaluators/eval_saved_model_util.py", line 215, in _maybe_do_batch
self._eval_metrics_graph = self._loaded_models[self._model_name]
TypeError: 'NoneType' object is not subscriptable
This wasn't previously a case and has popped up after the upgrade.
I thought maybe it's due to the fact that the new evaluation model gets exported under Format-TFMA
directory, but the old one was exported under eval_dir_model
. But running the pipeline with a new name (such that no existing BlessedModels are available) also leads into this exception as well.
Any insights? 🙃
We also have another issue on the Evaluator component with our Estimator models that are trained in CAIP.
The evaluator component (the beam job essentially) is failing with the following stacktrace:
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "dataflow_worker/shuffle_operations.py", line 267, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/trigger.py", line 1220, in process_elements yield output.with_value(self.phased_combine_fn.apply(output.value)) File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/combiners.py", line 963, in merge_only return self.combine_fn.merge_accumulators(accumulators) File "/usr/local/lib/python3.6/dist-packages/apache_beam/transforms/combiners.py", line 752, in merge_accumulators a in zip(self._combiners, zip(*accumulators)) File "/usr/local/lib/python3.6/dist-packages/apache_beam/transforms/combiners.py", line 752, in <listcomp> a in zip(self._combiners, zip(*accumulators)) File "/usr/local/lib/python3.6/site-packages/tensorflow_model_analysis/evaluators/eval_saved_model_util.py", line 245, in merge_accumulators self._maybe_do_batch(result) File "/usr/local/lib/python3.6/site-packages/tensorflow_model_analysis/evaluators/eval_saved_model_util.py", line 215, in _maybe_do_batch self._eval_metrics_graph = self._loaded_models[self._model_name] TypeError: 'NoneType' object is not subscriptable
This wasn't previously a case and has popped up after the upgrade.
I thought maybe it's due to the fact that the new evaluation model gets exported under
Format-TFMA
directory, but the old one was exported undereval_dir_model
. But running the pipeline with a new name (such that no existing BlessedModels are available) also leads into this exception as well.
We have a tfx_version properties set for artifact to distinguish whether to use Format-TFMA or eval_dir_model you can check the MLMD to see the tfx_version and maybe print the model loading path in evaluator to see if it points to the correct location.
Any insights? 🙃
We also have another issue on the Evaluator component with our Estimator models that are trained in CAIP. The evaluator component (the beam job essentially) is failing with the following stacktrace:
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "dataflow_worker/shuffle_operations.py", line 267, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/trigger.py", line 1220, in process_elements yield output.with_value(self.phased_combine_fn.apply(output.value)) File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/combiners.py", line 963, in merge_only return self.combine_fn.merge_accumulators(accumulators) File "/usr/local/lib/python3.6/dist-packages/apache_beam/transforms/combiners.py", line 752, in merge_accumulators a in zip(self._combiners, zip(*accumulators)) File "/usr/local/lib/python3.6/dist-packages/apache_beam/transforms/combiners.py", line 752, in <listcomp> a in zip(self._combiners, zip(*accumulators)) File "/usr/local/lib/python3.6/site-packages/tensorflow_model_analysis/evaluators/eval_saved_model_util.py", line 245, in merge_accumulators self._maybe_do_batch(result) File "/usr/local/lib/python3.6/site-packages/tensorflow_model_analysis/evaluators/eval_saved_model_util.py", line 215, in _maybe_do_batch self._eval_metrics_graph = self._loaded_models[self._model_name] TypeError: 'NoneType' object is not subscriptable
This wasn't previously a case and has popped up after the upgrade. I thought maybe it's due to the fact that the new evaluation model gets exported under
Format-TFMA
directory, but the old one was exported undereval_dir_model
. But running the pipeline with a new name (such that no existing BlessedModels are available) also leads into this exception as well.We have a tfx_version properties set for artifact to distinguish whether to use Format-TFMA or eval_dir_model you can check the MLMD to see the tfx_version and maybe print the model loading path in evaluator to see if it points to the correct location.
I did some experiment at my side, empty path will results in different error, so it seems not the path issue, does it work in beam DirectRunner?
OSError: SavedModel file does not exist at: ..../Format-Servo/{saved_model.pbtxt|saved_model.pb}
Any insights? 🙃
OSError: SavedModel file does not exist at: ....
Yes, it actually works in direct runner. And the case is very simple, no blessing and no old vs new path [this being why I believe also it's not a path issue, since there's only one path]. It's a new pipeline that's training a model for the first time and running the evaluator, but failing! (All in the newest version of TFX).
For reference, this gets logged in the execution that results in this failure (kinda showing that its the simplest case possible)
INFO:absl:Request was made to ignore the baseline ModelSpec and any change thresholds. This is likely because a baseline model was not provided: updated_config=model_specs {
signature_name: "eval"
}
slicing_specs {
}
metrics_specs {
model_names: ""
thresholds {
key: "accuracy"
value {
value_threshold {
upper_bound {
value: 0.65
}
}
}
}
thresholds {
key: "auc"
value {
value_threshold {
lower_bound {
value: 0.64
}
}
}
}
}
OSError: SavedModel file does not exist at: ....
Yes, it actually works in direct runner. And the case is very simple, no blessing and no old vs new path [this being why I believe also it's not a path issue, since there's only one path]. It's a new pipeline that's training a model for the first time and running the evaluator, but failing! (All in the newest version of TFX).
So the same code works with DirectRunner but not DataflowRunner, are there other differences?
For reference, this gets logged in the execution that results in this failure (kinda showing that its the simplest case possible)
This log looks fine
INFO:absl:Request was made to ignore the baseline ModelSpec and any change thresholds. This is likely because a baseline model was not provided: updated_config=model_specs { signature_name: "eval" } slicing_specs { } metrics_specs { model_names: "" thresholds { key: "accuracy" value { value_threshold { upper_bound { value: 0.65 } } } } thresholds { key: "auc" value { value_threshold { lower_bound { value: 0.64 } } } } }
TFMA has a setup() to make sure the loaded model is not None, not sure if it's dataflow issue? could you check if that is called or not?
TFMA has a setup() to make sure the loaded model is not None, not sure if it's dataflow issue? could you check if that is called or not?
I'm not sure how I can check if that function is called in dataflow or not, will you provide some guidance if there's a proper way?
looking at the stacktrace doesn't show it, but probably not the proper place to look for setup being called.
There's also no entry in the whole dataflow job to show that setup
is called.
Also on other differences between the job that runs on the directRunner vs the dataflowRunner, there's no configuration differences, only the fact that maybe we use python 3.7 locally, but dataflow seems to be using python 3.6?
maybe add a print in it and packaging your own tfma?
could you also ask on beam repo to see if they have any clue?
It might be that the DataflowRunner wasn't updated. Is that possible to check the version?
It might be that the DataflowRunner wasn't updated. Is that possible to check the version?
Can you please elaborate more? By dataflowRunner version, do you mean the beam version the dataflow service is running? Or something else I'm missing. And by not updated, is it something that the users of TFX can do, or something that should happen in TFX/Beam/TFMA/Dataflow?
Sorry, it is the beam version you want to check. The setup() is in the documentation of 2.29 (https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.setup).
Sorry, it is the beam version you want to check. The setup() is in the documentation of 2.29 (https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.setup).
I saw this "If you are using Dataflow, you need to enable Dataflow Runner V2 before using this feature." is that the reason?
Sorry, it is the beam version you want to check. The setup() is in the documentation of 2.29 (https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.setup).
I saw this "If you are using Dataflow, you need to enable Dataflow Runner V2 before using this feature." is that the reason?
That can actually be the reason, we're not using runner_v2 since for one of our jobs, it showed inferior performance, maybe independently for Evaluator we can do so. I'll do it and report back! Thanks @1025KB
Sorry, it is the beam version you want to check. The setup() is in the documentation of 2.29 (https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.setup).
I saw this "If you are using Dataflow, you need to enable Dataflow Runner V2 before using this feature." is that the reason?
Unfortunately we can't use runner_v2 for evaluator as well. Reason being the execution on dataflow stalls and the job never starts. Logs show something like the following:
Info
2021-06-04 13:47:26.027 CEST
"Initializing SDKHarness with unbounded number of workers."
Info
2021-06-04 13:47:37.844 CEST
"Waiting for 16 SDK Harnesses to register."
Info
2021-06-04 13:47:49.075 CEST
"Healthz check of harness at localhost:8081/healthz failed with HTTP code 500"
I don't know if there's any configuration changes we should make changing versions from 0.27 to 1.0rc I'm missing 😔
Hi @vaskozl, thank you for your detailed report and feedback. I am the author of the user module packaging change you mention, so I am very sorry for the issues you are facing. The proximal cause is that we do not test the case where a pipeline root is not known at pipeline compilation time, and the override flag meant to be an escape hatch in case of unintended behavior mistakenly did not cover this case.
Let me back up and explain the problem we intend to solve with user code packaging, and some of interactions in different environments this is trying to support.
First, the previous module_file
import mechanism ends up being incompatible with the normal Python import mechanism and more critically the Beam DoFn pickling behavior. If a Beam DoFn ends up referencing an function or object foo
in a user module, in most cases it will be pickled and sent to the worker as a reference to the module path user_module_0.foo
. This works fine for in-process Beam execution because the ephemeral user_module_0
module is already loaded and cached, but during multi-process (or remote) Beam execution, this is a problem, because the unpickling step cannot know how to load user_module_0
from a different process. The packaging change is needed to solve this. This is an immediate problem we needed to solve for the Transform component because as of 0.30, the default for the force_tf_compat_v1
flag was flipped to False
(in https://github.com/tensorflow/tfx/commit/d46461af38ba398fa903a9b82ed1c379b44163b1). The significance here is that when TFv1 compat is off, Tensorflow Transform generates a Beam graph that directly references a function in the user module, which would cause an error like the ModuleNotFoundError
you mentioned if the packaging step is disabled (you may try manually setting force_tf_compat_v1=True
in the Transform component).
Let's now discuss the new packaging mechanism. Here, before pipeline execution, a user module (say my_module.py
) is now (1) packaged into a wheel file, (2) ephemerally installed so that it can be loaded directly with import my_module
from within the process and in any subprocesses and (3) added as an --extra_package
for installation on workers before execution with Apache Beam. This way, when Beam deserializes a DoFn that references say my_module.foo
, the native Python import mechanism can locate the correct object.
The second problem this mechanism is meant to solve is when the component executor does not run on the same machine as the pipeline submission process. For example, some setups end up having this run in 3 parts: (1) on the user's machine, the pipeline is constructed, potentially with reference to a local module file, (2) on a remote machine, the component executor runs (needing access to the user module somehow), and (3) later, the component executor spins up remote Beam workers that do the distributed computation. Previously, there was little real support for passing through the user module from (1) to (2), but we did support just having this module file be stored as a URI pointing to a remote filesystem to be downloaded by (2); this by itself, however, isn't enough if objects in the user module are referenced in the Beam graph, as described above. The approach taken with this change is to reuse the same mechanism as for passing the user module from (2) to (3) (i.e., with a packaged wheel file). This way, the user no longer needs to manually copy their user module to a designated remote path for each run, with the risk of changing the behavior of any existing jobs.
With this approach and for Kubeflow (as you mention), there is now a problem when no pipeline root is specified -- in this case, where can we store any packaged user modules for this pipeline? One solution for doing this without building a new image could be to package it zipped together with the KFP YAML description somehow, but I don't think this mechanism exists yet. Another solution is to build this into the container image when building the image with the TFX CLI. This is the current preferred path forward, but it is not implemented yet.
Now, some questions for you. First, can you try rerunning your job with packaging disabled but passing force_tf_compat_v1=True
for Transform? If this works, would the immediate solution of fixing the override flag for the Kubeflow runner be a suitable workaround short-term? Next, we are still not sure about what the medium-term fate is of execution on Kubeflow without a specified pipeline root -- can you describe your use case for this, and in this scenario what kind of shared filesystem you are using for the pipeline root (since you mention frustration dealing with buckets at pipeline creation time)? Do you build containers for your pipeline runs, and if so, is it using the CLI or a different mechanism? Do you have any outstanding concerns not touched on here?
CC: @dhruvesh09 @zhitaoli @1025KB @jiyongjung0 @neuromage @ruoyu90
Hi @vaskozl, thank you for your detailed report and feedback. I am the author of the user module packaging change you mention, so I am very sorry for the issues you are facing. The proximal cause is that we do not test the case where a pipeline root is not known at pipeline compilation time, and the override flag meant to be an escape hatch in case of unintended behavior mistakenly did not cover this case.
Let me back up and explain the problem we intend to solve with user code packaging, and some of interactions in different environments this is trying to support.
First, the previous
module_file
import mechanism ends up being incompatible with the normal Python import mechanism and more critically the Beam DoFn pickling behavior. If a Beam DoFn ends up referencing an function or objectfoo
in a user module, in most cases it will be pickled and sent to the worker as a reference to the module pathuser_module_0.foo
. This works fine for in-process Beam execution because the ephemeraluser_module_0
module is already loaded and cached, but during multi-process (or remote) Beam execution, this is a problem, because the unpickling step cannot know how to loaduser_module_0
from a different process. The packaging change is needed to solve this. This is an immediate problem we needed to solve for the Transform component because as of 0.30, the default for theforce_tf_compat_v1
flag was flipped toFalse
(in d46461a). The significance here is that when TFv1 compat is off, Tensorflow Transform generates a Beam graph that directly references a function in the user module, which would cause an error like theModuleNotFoundError
you mentioned if the packaging step is disabled (you may try manually settingforce_tf_compat_v1=True
in the Transform component).Let's now discuss the new packaging mechanism. Here, before pipeline execution, a user module (say
my_module.py
) is now (1) packaged into a wheel file, (2) ephemerally installed so that it can be loaded directly withimport my_module
from within the process and in any subprocesses and (3) added as an--extra_package
for installation on workers before execution with Apache Beam. This way, when Beam deserializes a DoFn that references saymy_module.foo
, the native Python import mechanism can locate the correct object.The second problem this mechanism is meant to solve is when the component executor does not run on the same machine as the pipeline submission process. For example, some setups end up having this run in 3 parts: (1) on the user's machine, the pipeline is constructed, potentially with reference to a local module file, (2) on a remote machine, the component executor runs (needing access to the user module somehow), and (3) later, the component executor spins up remote Beam workers that do the distributed computation. Previously, there was little real support for passing through the user module from (1) to (2), but we did support just having this module file be stored as a URI pointing to a remote filesystem to be downloaded by (2); this by itself, however, isn't enough if objects in the user module are referenced in the Beam graph, as described above. The approach taken with this change is to reuse the same mechanism as for passing the user module from (2) to (3) (i.e., with a packaged wheel file). This way, the user no longer needs to manually copy their user module to a designated remote path for each run, with the risk of changing the behavior of any existing jobs.
With this approach and for Kubeflow (as you mention), there is now a problem when no pipeline root is specified -- in this case, where can we store any packaged user modules for this pipeline? One solution for doing this without building a new image could be to package it zipped together with the KFP YAML description somehow, but I don't think this mechanism exists yet. Another solution is to build this into the container image when building the image with the TFX CLI. This is the current preferred path forward, but it is not implemented yet.
Now, some questions for you. First, can you try rerunning your job with packaging disabled but passing
force_tf_compat_v1=True
for Transform? If this works, would the immediate solution of fixing the override flag for the Kubeflow runner be a suitable workaround short-term? Next, we are still not sure about what the medium-term fate is of execution on Kubeflow without a specified pipeline root -- can you describe your use case for this, and in this scenario what kind of shared filesystem you are using for the pipeline root (since you mention frustration dealing with buckets at pipeline creation time)? Do you build containers for your pipeline runs, and if so, is it using the CLI or a different mechanism? Do you have any outstanding concerns not touched on here?CC: @dhruvesh09 @zhitaoli @1025KB @jiyongjung0 @neuromage @ruoyu90
Hi, thanks for the detailed response, I have a couple of concerns. So for us as well, the Transform component (and evaluator later on) are breaking.
Reason being, as I mentioned above in https://github.com/tensorflow/tfx/issues/3761#issuecomment-851438164, the wheel generation code is only including the immediate neighbouring python files (not even packages and recursively adding them).
I believe maybe the correct solution is to just add the whole setup.py
file of the project? Since you can't know what files are required to create a properly structured package. This whole mechanism to me looks like over-engineered magic that only handles a small subset of usecases, magically.
Also, having the worker_harness_container_image
in the default pipeline-level beam_pipeline_args
, the extra package is not being added to the dataflow job (I believe due to the implementation of make_beam_dependency_flags
in BaseBeamExecutor
). The code should already exist in the image, but the job is failing.
I don't know how it was being handled before 0.27, but our pipelines have been running the Transform component properly with only a module file.
On permissions. You are missing the relationship between your runtime environment's permissions vs your compile time environment's permissions.
How we've been deploying pipelines before was basically: Compile the pipelines to yaml in staging CI
, and then pushing them to the prod environment.
So, the CI system didn't require any permissions to write to prod, why? Because it didn't need to find root_pipeline_path
and try to write a wheel file to it. CI didn't have access to prod.
But the kubeflow cluster, is running in prod and has access to prod and pipeline_root
will be required either by a kubeflow entity
(already has access), or a pipeline entity
at execution time
(already running in prod with a proper service account). There are a couple more details that is there but I'll stop here not to make it too long.
It would be great to know your thoughts on these points.
Thanks @AlirezaSadeghi.
For your issue with the Transform component in particular, we flipped the force_tf_compat_v1
flag to False
in this version. With TFv1 compat behavior off, the Transform library when run in non-local Beam environments requires that we package user code into an --extra_package
wheel before execution (or else it will fail with the Beam DoFn unpickling user_module_0
error at runtime). If you turn off the packaging behavior here (constructing without an accessible pipeline_root), could you try also manually passing force_tf_compat_v1=True
into your Transform
component constructor? The fact that without a wheel --extra_package
Beam cannot support remote execution when the pipeline references objects in the user module is an important reason we added this packaging logic (and TF Transform requires such a reference when running in V2 mode [unlike in V1 mode] because of the nature of tf.function
-- this is why these Beam issues may not occur in 0.27).
Your point regarding the current behavior of taking the Python files in the same directory as the target module file for packaging is well-taken. You are absolutely right that this is ultimately not the best long-term solution, and my view is that having user module files as the unit of customizable user code is ultimately brittle because, for example, these files may be part of larger packages (like you point out), and these packages may have complicated build processes, but will suffice for simple cases where the user-module is reasonably self-contained without extra dependencies. The intention for this change is to provide a way for existing usage of module_file
to be compatible with packaging for remote execution (as a wheel) and running on remote Beam runners (as an --extra_package
wheel). The plan is to allow for finer-grained control of dependencies and how this user code is packaged (e.g., whether remote execution should install a user-specified wheel [potentially with syntax like .with_pip_dependency(...)
on the component instance] and import a specified module [say, specified as a Python module path instead of a file path], instead of using this automatic packaging mechanism), but this is not ready in this version. Let me know if you have other suggestions here.
For your Kubeflow deployment of these pipelines, is my understanding correct that the user module is specified as the URI of a stored module file (for example, on GCS or S3)? From what you are saying, your CI does not directly interact with this module_file
file because it doesn't build a custom image for this pipeline, right? (Does it have access or read it at all?) Here, while compiling the pipeline, it is worthwhile to have better first-party support of this type of user code dependency and somehow keep it as a pipeline-level artifact together with the serialized pipeline. This better and more uniformly supports the use case where this user module was specified as a local path not accessible remotely, and even for the case where this is on GCS or S3, this makes pipelines more hermetic (and is probably better CI/CD practice) because a developer pushing a new version to this path will not affect already-running or recurring pipelines. Along with just having a place to put any packaged wheels, this was the rationale of storing the wheel file under the pipeline root. Without a pipeline root, if we are to support user code wheels, where should we put them? The most correct thing to do as I understand it within KFP is to build a custom image, adding these wheels during CI/CD, but I understand that this is not convenient. We could have a kind of "pipeline build root" accessible from CI/CD containing resources that are later used by the pipeline, but this is pretty similar to the pipeline root option. Do you have any suggestions here?
CC: @vaskozl @dhruvesh09 @zhitaoli @1025KB @jiyongjung0 @neuromage @ruoyu90
Thanks @AlirezaSadeghi.
For your issue with the Transform component in particular, we flipped the
force_tf_compat_v1
flag toFalse
in this version. With TFv1 compat behavior off, the Transform library when run in non-local Beam environments requires that we package user code into an--extra_package
wheel before execution (or else it will fail with the Beam DoFn unpicklinguser_module_0
error at runtime). If you turn off the packaging behavior here (constructing without an accessible pipeline_root), could you try also manually passingforce_tf_compat_v1=True
into yourTransform
component constructor? The fact that without a wheel--extra_package
Beam cannot support remote execution when the pipeline references objects in the user module is an important reason we added this packaging logic (and TF Transform requires such a reference when running in V2 mode [unlike in V1 mode] because of the nature oftf.function
-- this is why these Beam issues may not occur in 0.27).Your point regarding the current behavior of taking the Python files in the same directory as the target module file for packaging is well-taken. You are absolutely right that this is ultimately not the best long-term solution, and my view is that having user module files as the unit of customizable user code is ultimately brittle because, for example, these files may be part of larger packages (like you point out), and these packages may have complicated build processes, but will suffice for simple cases where the user-module is reasonably self-contained without extra dependencies. The intention for this change is to provide a way for existing usage of
module_file
to be compatible with packaging for remote execution (as a wheel) and running on remote Beam runners (as an--extra_package
wheel). The plan is to allow for finer-grained control of dependencies and how this user code is packaged (e.g., whether remote execution should install a user-specified wheel [potentially with syntax like.with_pip_dependency(...)
on the component instance] and import a specified module [say, specified as a Python module path instead of a file path], instead of using this automatic packaging mechanism), but this is not ready in this version. Let me know if you have other suggestions here.For your Kubeflow deployment of these pipelines, is my understanding correct that the user module is specified as the URI of a stored module file (for example, on GCS or S3)? From what you are saying, your CI does not directly interact with this
module_file
file because it doesn't build a custom image for this pipeline, right? (Does it have access or read it at all?) Here, while compiling the pipeline, it is worthwhile to have better first-party support of this type of user code dependency and somehow keep it as a pipeline-level artifact together with the serialized pipeline. This better and more uniformly supports the use case where this user module was specified as a local path not accessible remotely, and even for the case where this is on GCS or S3, this makes pipelines more hermetic (and is probably better CI/CD practice) because a developer pushing a new version to this path will not affect already-running or recurring pipelines. Along with just having a place to put any packaged wheels, this was the rationale of storing the wheel file under the pipeline root. Without a pipeline root, if we are to support user code wheels, where should we put them? The most correct thing to do as I understand it within KFP is to build a custom image, adding these wheels during CI/CD, but I understand that this is not convenient. We could have a kind of "pipeline build root" accessible from CI/CD containing resources that are later used by the pipeline, but this is pretty similar to the pipeline root option. Do you have any suggestions here?CC: @vaskozl @dhruvesh09 @zhitaoli @1025KB @jiyongjung0 @neuromage @ruoyu90
Thanks @charlesccychen!
So a couple of questions, thoughts and clarifications.
The user_module exists in the repo of the pipeline, locally(so not on a remote S3 or GCS or ... these can later be provided as RuntimeParams, but default is always repo-local).
When CI runs, it clones the repo (actions/checkout@v2) and then:
KubeflowDagRunner(Pipeline...)
to generate the argo files.
Previously the first step didn't need write access to prod GCS, now it does so, since it needs to write out wheel packages. (We can set None for pipeline_root, or we can set pipeline_root to point to a staging bucket by default regardless of the env, but that's less convenient in Runtime, since compile time happens once, runtime though, many times)
I don't have any suggestions for this actually since the current state makes sense, and this is a fairly simple challenge to solve for downstream users of TFX, and we did that. Just the fact that it needed to be a bit better advertised (like point out write permissions to the pipeline_root is from now on required at compile time ...).
Thanks for the explanations.
What I don't seem to understand is the fact that no matter if I set the worker_harness_container_image
or setup.py
, the Transform beam job that gets started from Kubeflow pipelines fails with the user_module_0
error. It is in direct contrast with running the Transform job, in dataflow, but from LocalDagRunner where the dataflow job succeeds.
At the pipeline level, nothing is different, but at the dataflow job level from the dataflow UI, the extra-packages
field is empty when the orchestrator is Kubeflow pipelines.
I'm really puzzled here, for two reasons:
setup.py
is provided to the dataflow job anyway, why is there a need to also provide the extra-packages?INFO:absl:Nonempty beam arg setup_file already includes dependency
, and no extra package ends up in the dataflow job.transform.py
, I do believe it works because there's a combination of setup.py
that installs everything, and that extra-package. This again brings me to wonder why is even extra-pacakges
required when everything gets installed through setup.py
on the dataflow workers.I have attached an image of the two dataflow jobs from the same pipelines (Local vs Kubeflow) down below: Dataflow Config Image
Sorry, it is the beam version you want to check. The setup() is in the documentation of 2.29 (https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.transforms.core.html#apache_beam.transforms.core.CombineFn.setup).
I saw this "If you are using Dataflow, you need to enable Dataflow Runner V2 before using this feature." is that the reason?
Unfortunately we can't use runner_v2 for evaluator as well. Reason being the execution on dataflow stalls and the job never starts. Logs show something like the following:
Info 2021-06-04 13:47:26.027 CEST "Initializing SDKHarness with unbounded number of workers." Info 2021-06-04 13:47:37.844 CEST "Waiting for 16 SDK Harnesses to register." Info 2021-06-04 13:47:49.075 CEST "Healthz check of harness at localhost:8081/healthz failed with HTTP code 500"
I don't know if there's any configuration changes we should make changing versions from 0.27 to 1.0rc I'm missing 😔
Just for anyone who might run into this.
I mentioned that the runner_v2
jobs are stuck, turns out dataflow's runner_v2, starts n=(number of harness threads) parallel threads to fetch and install dependencies on a worker (which differs from V1 behaviour of just installing dependencies once), and that exceeded the amount of storage we had allocated to our workers (disk_size_gb
). Increasing that fixed the issue.
Thank you @charlesccychen for the analysis and detailed explanation.
Now, some questions for you. First, can you try rerunning your job with packaging disabled but passing force_tf_compat_v1=True for Transform?
I just tried with the UNSUPPORTED_DO_NOT_PACKAGE_USER_MODULES
env set and force_tf_compat_v1=True
for transform and the transform step ran as expected!
Next, we are still not sure about what the medium-term fate is of execution on Kubeflow without a specified pipeline root -- can you describe your use case for this, and in this scenario what kind of shared filesystem you are using for the pipeline root (since you mention frustration dealing with buckets at pipeline creation time)? Do you build containers for your pipeline runs, and if so, is it using the CLI or a different mechanism? Do you have any outstanding concerns not touched on here?
My setup is almost exactly the same as @AlirezaSadeghi:
tfx
image on every commit, run tests and verifies the dags compile. This tfx image is pushed to a container registry, and can be used by both kubeflow and the flink workers. The image build contains setup.py install
for the custom components and adds the user modules to PYTHONPATH.--worker_pool
. This is the EXTERNAL
configuration of the SDK harness: https://beam.apache.org/documentation/runtime/sdk-harness-config/ I usually don't change the pipeline root manually via kubeflow. It's set to a S3 compatible bucket, and changing it usually done via git, letting CI do it's thing, the only reason I it came up was because access to it it was now required to build, and not setting it skips that step, exactly as AlirezaSadeghi mentioned.
I find that there is a lot of value in having the same image that is tested by the CI running in Kubeflow and the SDK. I'm not sure if there is another way to do it if you have a custom component (e.g. an ExampleGen).
I appreciate the simplicity and re-usability that user_module aims to provide. I find though, that as soon as you enter custom component land, or need to patch/rebuild the tfx image for size/backporting beam fixes etc, it makes sense to use gitops and versioned images.
Thank you again for responding so quickly and thoroughly.
After upgrading to 0.30, the yaml file that the pipeline generates is now 2MB, which exceeds the size that Kubeflow allows when running!
It seems like it's including in every single component, the JSON for the entire pipeline definition in the --tfx_ir
parameter. We have 14 components in our pipeline so it quickly added up and exceeds the limit.
Example:
- name: base-model-calibrator
container:
args:
- --pipeline_name
- my-pipeline
- --pipeline_root
- '{{inputs.parameters.pipeline-root}}'
- --kubeflow_metadata_config
# ... <snip>
- --node_id
- base_model_calibrator
- --tfx_ir
- |-
{
"pipelineInfo": {
"id": "my-pipeline"
},
"nodes": [
{
"pipelineNode": {
"nodeInfo": {
"type": {
"name": "tfx.extensions.google_cloud_big_query.example_gen.component.BigQueryExampleGen"
},
"id": "calibration_bq_example_gen"
# ... <snip>
- name: latest_blessed_base_model_resolver
container:
args:
- --pipeline_name
- my-pipeline
- --pipeline_root
- '{{inputs.parameters.pipeline-root}}'
- --kubeflow_metadata_config
# ... <snip>
- --node_id
- latest_blessed_base_model_resolver
- --tfx_ir
- |-
{
"pipelineInfo": {
"id": "my-pipeline"
},
"nodes": [
{
"pipelineNode": {
"nodeInfo": {
"type": {
"name": "tfx.extensions.google_cloud_big_query.example_gen.component.BigQueryExampleGen"
},
"id": "calibration_bq_example_gen"
What can we do to prevent that?
Please comment if you find any issues with TFX 0.30.0
Thanks