Closed rummens closed 5 years ago
you can pass metadata_connection_config (ml_metadata.proto.ConnectionConfig) when create the pipeline.Pipeline using kwargs, it will be passed to airflow and then metadata init
for your DB locking issue, if you use the default sqlite, check whether a CLI is connecting to the database?
How can I check if a CLI is connecting? I have none open, so if TFX is not doing something I wouldn'´t what else could try connecting.
In the meantime, I deployed a mySQL pod in the cluster and tried to connect the metadata store to it but it fails with a memory leak error. Any idea what could cause this? Is this a bug in tf metadata?
This is my config (I tried both an IP and a hostname for the mysql host --> same outcome):
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.mysql.host = 'mysql.airflow'
connection_config.mysql.port = 3306
connection_config.mysql.database = 'tfx'
connection_config.mysql.user = 'tfx'
connection_config.mysql.password = '****'
store = metadata_store.MetadataStore(connection_config)
return pipeline.Pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
components=[
example_gen, statistics_gen, infer_schema, validate_stats, transform,
trainer, model_analyzer, model_validator, pusher
],
enable_cache=True,
metadata_db_root=_metadata_db_root,
additional_pipeline_args={
"tfx_image": "airbusregistry.azurecr.io/kubeflow-tfx:latest",
'logger_args': logger_overrides
},
metadata_connection_config=store
)
[2019-06-07 11:20:54,215] {models.py:1359} INFO - Dependencies all met for <TaskInstance: chicago_taxi_simple.CsvExampleGen.chicago_taxi_simple.CsvExampleGen.checkcache 2019-06-07T11:20:15.178412+00:00 [queued]>
[2019-06-07 11:20:54,223] {models.py:1359} INFO - Dependencies all met for <TaskInstance: chicago_taxi_simple.CsvExampleGen.chicago_taxi_simple.CsvExampleGen.checkcache 2019-06-07T11:20:15.178412+00:00 [queued]>
[2019-06-07 11:20:54,223] {models.py:1571} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2019-06-07 11:20:54,242] {models.py:1593} INFO - Executing <Task(BranchPythonOperator): chicago_taxi_simple.CsvExampleGen.checkcache> on 2019-06-07T11:20:15.178412+00:00
[2019-06-07 11:20:54,243] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run chicago_taxi_simple.CsvExampleGen chicago_taxi_simple.CsvExampleGen.checkcache 2019-06-07T11:20:15.178412+00:00 --job_id 4 --raw -sd DAGS_FOLDER/taxi_pipeline_simple.py --cfg_path /tmp/tmp9ekl6svd']
[2019-06-07 11:20:54,721] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache WARNING:root:OSError while attempting to symlink the latest log directory
[2019-06-07 11:20:54,968] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache [2019-06-07 11:20:54,968] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=31
[2019-06-07 11:20:55,509] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache [2019-06-07 11:20:55,508] {__init__.py:51} INFO - Using executor LocalExecutor
[2019-06-07 11:20:55,918] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache [2019-06-07 11:20:55,917] {models.py:273} INFO - Filling up the DagBag from /usr/local/airflow/dags/taxi_pipeline_simple.py
[2019-06-07 11:21:00,955] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache 2019-06-07 11:21:00.955125: E bazel-out/k8-opt/bin/ml_metadata/metadata_store/pywrap_tf_metadata_store_serialized.cc:3165] Created MetadataStore.
[2019-06-07 11:21:00,955] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache /usr/local/airflow/airflow/dags/assets
[2019-06-07 11:21:00,955] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache /usr/local/airflow/airflow/dags/assets/data/simple
[2019-06-07 11:21:00,955] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache /usr/local/airflow/airflow/dags/assets/tfx
[2019-06-07 11:21:00,956] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache True
[2019-06-07 11:21:00,956] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache [2019-06-07 11:21:00,955] {metadata_store.py:50} INFO - MetadataStore initialized
[2019-06-07 11:21:01,048] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache [2019-06-07 11:21:01,048] {cli.py:520} INFO - Running <TaskInstance: chicago_taxi_simple.CsvExampleGen.chicago_taxi_simple.CsvExampleGen.checkcache 2019-06-07T11:20:15.178412+00:00 [running]> on host chicagotaxisimplecsvexamplegen-13c49af8947b46618fb7751f3ee93413
[2019-06-07 11:21:01,076] {python_operator.py:95} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=chicago_taxi_simple.CsvExampleGen
AIRFLOW_CTX_TASK_ID=chicago_taxi_simple.CsvExampleGen.checkcache
AIRFLOW_CTX_EXECUTION_DATE=2019-06-07T11:20:15.178412+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill_2019-06-07T11:20:15.178412+00:00
[2019-06-07 11:21:01,076] {models.py:1788} ERROR - 'MetadataStore' object has no attribute 'SerializeToString'
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 131, in execute
branch = super(BranchPythonOperator, self).execute(context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/lib/python3.6/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 110, in check_cache_and_maybe_prepare_execution
self._logger) as m:
File "/usr/local/lib/python3.6/site-packages/tfx/orchestration/metadata.py", line 53, in __enter__
self._store = metadata_store.MetadataStore(self._connection_config)
File "/usr/local/lib/python3.6/site-packages/ml_metadata/metadata_store/metadata_store.py", line 48, in __init__
config.SerializeToString())
AttributeError: 'MetadataStore' object has no attribute 'SerializeToString'
[2019-06-07 11:21:01,079] {models.py:1819} INFO - Marking task as FAILED.
[2019-06-07 11:21:01,098] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-06-07 11:21:01,098] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache """)
[2019-06-07 11:21:01,098] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache /usr/local/lib/python3.6/site-packages/apache_beam/__init__.py:84: UserWarning: Running the Apache Beam SDK on Python 3 is not yet fully supported. You may encounter buggy behavior or missing features.
[2019-06-07 11:21:01,098] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache 'Running the Apache Beam SDK on Python 3 is not yet fully supported. '
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache Traceback (most recent call last):
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/bin/airflow", line 32, in <module>
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache args.func(args)
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache return f(*args, **kwargs)
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 526, in run
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache _run(args, dag, ti)
[2019-06-07 11:21:01,113] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 445, in _run
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache pool=args.pool,
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache return func(*args, **kwargs)
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache result = task_copy.execute(context=context)
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 131, in execute
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache branch = super(BranchPythonOperator, self).execute(context)
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache return_value = self.execute_callable()
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 110, in check_cache_and_maybe_prepare_execution
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache self._logger) as m:
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/tfx/orchestration/metadata.py", line 53, in __enter__
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache self._store = metadata_store.MetadataStore(self._connection_config)
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache File "/usr/local/lib/python3.6/site-packages/ml_metadata/metadata_store/metadata_store.py", line 48, in __init__
[2019-06-07 11:21:01,114] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache config.SerializeToString())
[2019-06-07 11:21:01,115] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache AttributeError: 'MetadataStore' object has no attribute 'SerializeToString'
[2019-06-07 11:21:01,899] {base_task_runner.py:101} INFO - Job 4: Subtask chicago_taxi_simple.CsvExampleGen.checkcache swig/python detected a memory leak of type 'ml_metadata::MetadataStore *', no destructor found.
[2019-06-07 11:21:04,198] {logging_mixin.py:95} INFO - [2019-06-07 11:21:04,197] {jobs.py:2527} INFO - Task exited with return code 1
for above issue, 'MetadataStore' object has no attribute 'SerializeToString', seems your config type is MetadataStore instead of config type?
for metadata lock issue, could you check if there are multiple pipeline writing to same db? it shouldn't happen if there is only one single connection
Hi rummens -- is this still an issue for you?
We have actually moved away from Airflow on K8s, so I never came around to test this. I thought that you might wanna keep this around in case other people have the same problem? But from my perspective we can close this.
Thanks. Unfortunately, we can't reproduce either issue.
Description
I am running into the following error, when executing the taxi example on AirFlow (1.10.2) installation on a K8s cluster.
I am not sure why this happens since the csv component is the only one running. So there should be no concurrent hits on the sqlite db. The underlying PV is ReadWriteMany, so the file structure supports multiple writes.
Is this a bug or is there an option to increase the timeout?
Possible Solution
If this is not the case I guess the only way is to connect MLMD to its own mySQL instance but I am struggling with it. I do not understand where to add the metadata_store variable to the pipeline definition?
I assume I have to change something in the final return statement of the pipeline (metadata_db_root maybe?). Unfortunately there is no API/doc of
tfx.orchestration import pipeline
, so I have no idea which attribute to use:-/