tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.11k stars 707 forks source link

How to write custom component #322

Closed rummens closed 3 years ago

rummens commented 5 years ago

Hello,

I am trying to figure out how to write my own component but I am struggeling to understand all the abstraction concepts like Channel, ChannelParameter, ExecutionParameter, Artifact etc.

Is there any documentation on this?

In the end I just want to excute my own python function inside of TFX, basically overwritting the do function of the Executor but I cannot get the input/outputs to work.

Here is my very fist attempt:

from tfx.components.base.base_component import BaseComponent, ComponentSpec, ExecutionParameter, ChannelParameter
from tfx.components.base.base_executor import BaseExecutor
from tfx.components.transform.component import Transform
from tfx.components.pusher.component import Pusher
from typing import Any, Dict, List, Optional, Text
from tfx.utils import types
from tfx.utils import channel

class MyCustomComponentSpec(ComponentSpec):
    COMPONENT_NAME = 'MyCustomComponent'
    PARAMETERS = {}
    INPUTS = {
        'input_examples': ChannelParameter(type_name='ExamplesPath'),
    }
    OUTPUTS = {
        'output_examples': ChannelParameter(type_name='ExamplesPath'),
    }

class MyCustomExecutor(BaseExecutor):

    def Do(self, input_dict: Dict[Text, List[types.TfxArtifact]],
           output_dict: Dict[Text, List[types.TfxArtifact]],
           exec_properties: Dict[Text, Any]) -> None:
        print(input_dict)
        print(output_dict)
        print(exec_properties)

class MyCustomComponent(BaseComponent):
    SPEC_CLASS = MyCustomComponentSpec
    EXECUTOR_CLASS = MyCustomExecutor

    def __init__(self,
                 input_examples: str,
                 output_examples: Optional[channel.Channel] = None,
                 name: Optional[Text] = None):

        input_artifact = types.TfxArtifact('ExamplesPath')
        input_artifact.uri = input_examples

        input_channel = channel.Channel(
                        type_name='ExamplesPath',
                        artifacts=[input_artifact])

        output_channel = channel.Channel(
            type_name='ExamplesPath',
            artifacts=[types.TfxArtifact('ExamplesPath')])

        spec = MyCustomComponentSpec(
            input_examples=channel.as_channel(input_channel),
            output_examples=output_channel)
        super(MyCustomComponent, self).__init__(spec=spec,
                                                name=name)
ruoyu90 commented 5 years ago

Thanks @rummens for bringing this up! We do have an example code for building custom component. We're also working on documentation on steps to create one. For the time being, could you try to follow the example and see how it goes?

By looking at your code above, it seems you're trying to custom your own ExampleGen? I asked so because the head component will be a little bit tricky since it has a special driver logic to register external artifacts to ML metadata so that it can be tracked throughout the pipeline.

rummens commented 5 years ago

Thanks for the hint, I will have a look at the slack component and start from there. I will report back..

rummens commented 5 years ago

Quick questions in the meantime. What is the conceptual difference between input/output params and execution params? It seems to me that execution and input params are avaiable at the same places, so why go through the struggle of using channels if one can just (mis)use the execution parameter?

ruoyu90 commented 5 years ago

@rummens hopefully you'll find the following explanation useful :)

In TFX, we model an execution into several parts:

By using this data model, we enable lineage tracking of artifacts and executions which will further enable more advanced features like continuous training (under developing). And as you can tell, inputs / outputs are quite different from other parameters in the execution.

rummens commented 5 years ago

Is it correct to assume that artifacts are basically some kind of files. Then the channel objects hold an tfArtifact, which contains a path/uri to these „files“, a simplified workflow could look like this:

  1. Load input “file” from Channel URI
  2. Manipulate the content with help of execution parameter
  3. Write output “file” back to “disk” via Channel URI
ruoyu90 commented 5 years ago

@rummens yep. One small note is that a Channel may contain more than one artifact.

rummens commented 5 years ago

Thanks for you help so far, I have been able to get the ExecutionParameter working but I am struggling with the Input Params.

First of all, what is the idea behind the type_name of an TfxArtifact? I get it for Channels (to create a comparison between Component Spec and the Channel) but I don't understand it in the TfxArtifcat? Must it be the same as the Channel's type_name?


Also I am struggling with this comment in the init function of TfxArtifcat:

When first created, the artefact will have an empty URI (which will be filled by the orchestration system before first usage).

If I don't set the URI of the artefact myself the checkcache sub DAG will fail because of the missing URI. I understand that the URI is set automatically but it does not work for me.


Next problem, the id of the artefact. When not setting the id, the publish_execution fails with this:

  File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/metadata.py", line 278, in publish_execution
    'input artifact {} has missing id'.format(single_input))

If I add the id, it fails with this (from which I am not able to retrieve some meaningful error message). Any idea what causes this or can you point me to a default artifcat to start with (is there a function or setting I am missing)?

*** Reading local file: /home/marcel/airflow/logs/custom_slack_test.SlackComponent/custom_slack_test.SlackComponent.publishexec/2019-07-15T15:11:41.651451+00:00/1.log
[2019-07-15 15:12:34,750] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: custom_slack_test.SlackComponent.custom_slack_test.SlackComponent.publishexec 2019-07-15T15:11:41.651451+00:00 [queued]>
[2019-07-15 15:12:34,761] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: custom_slack_test.SlackComponent.custom_slack_test.SlackComponent.publishexec 2019-07-15T15:11:41.651451+00:00 [queued]>
[2019-07-15 15:12:34,761] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2019-07-15 15:12:34,762] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-07-15 15:12:34,766] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2019-07-15 15:12:34,812] {__init__.py:1374} INFO - Executing <Task(PythonOperator): custom_slack_test.SlackComponent.publishexec> on 2019-07-15T15:11:41.651451+00:00
[2019-07-15 15:12:34,812] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'custom_slack_test.SlackComponent', 'custom_slack_test.SlackComponent.publishexec', '2019-07-15T15:11:41.651451+00:00', '--job_id', '180', '--raw', '-sd', 'DAGS_FOLDER/taxi_pipeline_simple.py', '--cfg_path', '/tmp/tmp3_8v6ohb']
[2019-07-15 15:12:36,570] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:36,570] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-07-15 15:12:36,947] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:36,946] {__init__.py:305} INFO - Filling up the DagBag from /home/marcel/airflow/dags/taxi_pipeline_simple.py
[2019-07-15 15:12:40,048] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/airflow/dags/assets/custom_slack_test
[2019-07-15 15:12:40,048] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/airflow/dags/assets/custom_slack_test/data/simple
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/airflow/dags/assets/custom_slack_test/tfx
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec True
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [Errno 2] No such file or directory: '/home/marcel/airflow/dags/assets/slack_component/component.py'
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:40,048] {code2flow.py:134} INFO - Deploying local
[2019-07-15 15:12:40,064] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:40,064] {cli.py:517} INFO - Running <TaskInstance: custom_slack_test.SlackComponent.custom_slack_test.SlackComponent.publishexec 2019-07-15T15:11:41.651451+00:00 [running]> on host marcels-vm
[2019-07-15 15:12:40,076] {python_operator.py:104} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=custom_slack_test.SlackComponent
AIRFLOW_CTX_TASK_ID=custom_slack_test.SlackComponent.publishexec
AIRFLOW_CTX_EXECUTION_DATE=2019-07-15T15:11:41.651451+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill_2019-07-15T15:11:41.651451+00:00
[2019-07-15 15:12:40,089] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec 2019-07-15 15:12:40.089313: E bazel-out/k8-opt/bin/ml_metadata/metadata_store/pywrap_tf_metadata_store_serialized.cc:3165] Created MetadataStore.
[2019-07-15 15:12:40,089] {logging_mixin.py:95} INFO - [2019-07-15 15:12:40,089] {metadata_store.py:50} INFO - MetadataStore initialized
[2019-07-15 15:12:40,117] {logging_mixin.py:95} INFO - [2019-07-15 15:12:40,117] {metadata.py:271} INFO - Publishing execution id: 1
type_id: 1
properties {
  key: "channel_id"
  value {
    string_value: "channel"
  }
}
properties {
  key: "slack_token"
  value {
    string_value: "token"
  }
}
properties {
  key: "state"
  value {
    string_value: "complete"
  }
}
properties {
  key: "timeout_sec"
  value {
    string_value: "42"
  }
}
, with inputs {'model_export': [ModelExportPath:/home/marcel/airflow/dags/assets/custom_slack_test.42]} and outputs {}
[2019-07-15 15:12:40,117] {logging_mixin.py:95} WARNING - --- Logging error ---
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 112, in execute
    return_value = self.execute_callable()
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 117, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 184, in publish_exec
    final_output = self._publish_execution_to_metadata()
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 92, in _publish_execution_to_metadata
    self._output_dict)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/metadata.py", line 302, in publish_execution
    self._store.put_events(events)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 285, in put_events
    self._swig_call(metadata_store_serialized.PutEvents, request, response)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 76, in _swig_call
    raise _make_exception(error_message, status_code)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - tensorflow.python.framework.errors_impl.InvalidArgumentError: <unprintable InvalidArgumentError object>
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - 
During handling of the above exception, another exception occurred:
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1034, in emit
    msg = self.format(record)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 880, in format
    return fmt.format(record)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 619, in format
    record.message = record.getMessage()
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 378, in getMessage
    msg = str(self.msg)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - TypeError: __str__ returned non-string (type bytes)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - Call stack:
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/bin/airflow", line 32, in <module>
    args.func(args)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
    return f(*args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
    _run(args, dag, ti)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
    pool=args.pool,
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1493, in _run_raw_task
    self.handle_failure(e, test_mode, context)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1580, in handle_failure
    self.log.exception(error)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1418, in exception
    self.error(msg, *args, exc_info=exc_info, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1412, in error
    self._log(ERROR, msg, args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1519, in _log
    self.handle(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1529, in handle
    self.callHandlers(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1591, in callHandlers
    hdlr.handle(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 905, in handle
    self.emit(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 61, in emit
    self.handler.emit(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING - Message: InvalidArgumentError()
Arguments: ()
[2019-07-15 15:12:40,186] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-07-15 15:12:40,227] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/anaconda3/lib/python3.7/site-packages/apache_beam/__init__.py:84: UserWarning: Running the Apache Beam SDK on Python 3 is not yet fully supported. You may encounter buggy behavior or missing features.
[2019-07-15 15:12:40,227] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   'Running the Apache Beam SDK on Python 3 is not yet fully supported. '
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec Traceback (most recent call last):
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/bin/airflow", line 32, in <module>
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     args.func(args)
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return f(*args, **kwargs)
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     _run(args, dag, ti)
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     pool=args.pool,
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return func(*args, **kwargs)
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     result = task_copy.execute(context=context)
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 112, in execute
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return_value = self.execute_callable()
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 117, in execute_callable
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 184, in publish_exec
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     final_output = self._publish_execution_to_metadata()
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 92, in _publish_execution_to_metadata
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     self._output_dict)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/metadata.py", line 302, in publish_execution
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     self._store.put_events(events)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 285, in put_events
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     self._swig_call(metadata_store_serialized.PutEvents, request, response)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 76, in _swig_call
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     raise _make_exception(error_message, status_code)
[2019-07-15 15:12:40,232] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec tensorflow.python.framework.errors_impl.InvalidArgumentError: <exception str() failed>
[2019-07-15 15:12:40,752] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec swig/python detected a memory leak of type 'ml_metadata::MetadataStore *', no destructor found.
[2019-07-15 15:12:44,746] {logging_mixin.py:95} INFO - [2019-07-15 15:12:44,745] {jobs.py:2562} INFO - Task exited with return code 1

Code

This is my component:

class SlackComponentSpec(base_component.ComponentSpec):
    """ComponentSpec for Custom TFX Slack Component."""

    COMPONENT_NAME = 'SlackComponent'
    PARAMETERS = {
        'slack_token': ExecutionParameter(type=Text),
        'channel_id': ExecutionParameter(type=Text),
        'timeout_sec': ExecutionParameter(type=int),
    }
    INPUTS = {'model_export': ChannelParameter(type_name='ModelExportPath')}
    OUTPUTS = {}

class SlackComponent(base_component.BaseComponent):
    """Custom TFX Slack Component.

    This custom component serves as a bridge between TFX pipeline and human model
    reviewers to enable review-and-push workflow in model development cycle. It
    utilizes Slack API to send message to user-defined Slack channel with model
    URI info and wait for go / no-go decision from the same Slack channel:
      * To approve the model, a user need to reply the thread sent out by the bot
        started by SlackComponent with 'lgtm' or 'approve'.
      * To reject the model, a user need to reply the thread sent out by the bot
        started by SlackComponent with 'decline' or 'reject'.
    """

    SPEC_CLASS = SlackComponentSpec
    EXECUTOR_CLASS = executor.Executor

    def __init__(self,
                 model_export: channel.Channel,
                 slack_token: Text,
                 channel_id: Text,
                 timeout_sec: int,
                 name: Optional[Text] = None):
        """Construct a SlackComponent.

        Args:
          model_export: A Channel of 'ModelExportPath' type, usually produced by
            Trainer component.
          model_blessing: A Channel of 'ModelBlessingPath' type, usually produced by
            ModelValidator component.
          slack_token: A token used for setting up connection with Slack server.
          channel_id: Slack channel id to communicate on.
          timeout_sec: Seconds to wait for response before default to reject.
          slack_blessing: Optional output channel of 'ModelBlessingPath' with result
            of blessing; will be created for you if not specified.
          name: Optional unique name. Necessary if multiple Pusher components are
            declared in the same pipeline.
        """

        spec = SlackComponentSpec(
            model_export=model_export,
            slack_token=slack_token,
            channel_id=channel_id,
            timeout_sec=timeout_sec)
        super(SlackComponent, self).__init__(spec=spec, name=name)

And my pipeline, configuring the Component:

    input_artifact = TfxArtifact(type_name="ModelExportPath")
    input_artifact.uri = _taxi_root
    input_artifact.id = 42

    input_channel = channel.Channel(artifacts=[input_artifact], type_name="ModelExportPath")

    slack = component.SlackComponent(slack_token="token",
                                     channel_id="channel",
                                     timeout_sec=42,
                                     model_export=input_channel)
ruoyu90 commented 5 years ago

Thanks for you help so far, I have been able to get the ExecutionParameter working but I am struggling with the Input Params.

@rummens You're very welcome! Please see reply inline :)

First of all, what is the idea behind the type_name of an TfxArtifact? I get it for Channels (to create a comparison between Component Spec and the Channel) but I don't understand it in the TfxArtifcat? Must it be the same as the Channel's type_name?

In TFX, Channel stands for a collection of Artifacts that share the same Artifact type and (optional) other properties. Thus, any TfxArtifact in a Channel should have the same type.

Also I am struggling with this comment in the init function of TfxArtifcat:

When first created, the artefact will have an empty URI (which will be filled by the orchestration system before first usage).

The comment does not cover the whole picture. Sorry for the confusion, we'll update the comment. The comment is true for Artifacts that are created by TFX. If you want to pass in an Artifact manually, it's a different story: You will need to make sure the Artifact is already registered in ML metadata and is valid (i.e. having a URI and valid id assigned by MLMD). Basically, we expect the input artifacts to a component to be outputs from other components except the head component like ExampleGen. For ExampleGen, we use a specific driver to first register external artifact in MLMD and then refer that in executor.

If I don't set the URI of the artefact myself the checkcache sub DAG will fail because of the missing URI. I understand that the URI is set automatically but it does not work for me.

Next problem, the id of the artefact. When not setting the id, the publish_execution fails with this:

  File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/metadata.py", line 278, in publish_execution
    'input artifact {} has missing id'.format(single_input))

If I add the id, it fails with this (from which I am not able to retrieve some meaningful error message). Any idea what causes this or can you point me to a default artifcat to start with (is there a function or setting I am missing)?

*** Reading local file: /home/marcel/airflow/logs/custom_slack_test.SlackComponent/custom_slack_test.SlackComponent.publishexec/2019-07-15T15:11:41.651451+00:00/1.log
[2019-07-15 15:12:34,750] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: custom_slack_test.SlackComponent.custom_slack_test.SlackComponent.publishexec 2019-07-15T15:11:41.651451+00:00 [queued]>
[2019-07-15 15:12:34,761] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: custom_slack_test.SlackComponent.custom_slack_test.SlackComponent.publishexec 2019-07-15T15:11:41.651451+00:00 [queued]>
[2019-07-15 15:12:34,761] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2019-07-15 15:12:34,762] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-07-15 15:12:34,766] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2019-07-15 15:12:34,812] {__init__.py:1374} INFO - Executing <Task(PythonOperator): custom_slack_test.SlackComponent.publishexec> on 2019-07-15T15:11:41.651451+00:00
[2019-07-15 15:12:34,812] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'custom_slack_test.SlackComponent', 'custom_slack_test.SlackComponent.publishexec', '2019-07-15T15:11:41.651451+00:00', '--job_id', '180', '--raw', '-sd', 'DAGS_FOLDER/taxi_pipeline_simple.py', '--cfg_path', '/tmp/tmp3_8v6ohb']
[2019-07-15 15:12:36,570] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:36,570] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-07-15 15:12:36,947] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:36,946] {__init__.py:305} INFO - Filling up the DagBag from /home/marcel/airflow/dags/taxi_pipeline_simple.py
[2019-07-15 15:12:40,048] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/airflow/dags/assets/custom_slack_test
[2019-07-15 15:12:40,048] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/airflow/dags/assets/custom_slack_test/data/simple
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/airflow/dags/assets/custom_slack_test/tfx
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec True
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [Errno 2] No such file or directory: '/home/marcel/airflow/dags/assets/slack_component/component.py'
[2019-07-15 15:12:40,049] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:40,048] {code2flow.py:134} INFO - Deploying local
[2019-07-15 15:12:40,064] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec [2019-07-15 15:12:40,064] {cli.py:517} INFO - Running <TaskInstance: custom_slack_test.SlackComponent.custom_slack_test.SlackComponent.publishexec 2019-07-15T15:11:41.651451+00:00 [running]> on host marcels-vm
[2019-07-15 15:12:40,076] {python_operator.py:104} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=custom_slack_test.SlackComponent
AIRFLOW_CTX_TASK_ID=custom_slack_test.SlackComponent.publishexec
AIRFLOW_CTX_EXECUTION_DATE=2019-07-15T15:11:41.651451+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill_2019-07-15T15:11:41.651451+00:00
[2019-07-15 15:12:40,089] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec 2019-07-15 15:12:40.089313: E bazel-out/k8-opt/bin/ml_metadata/metadata_store/pywrap_tf_metadata_store_serialized.cc:3165] Created MetadataStore.
[2019-07-15 15:12:40,089] {logging_mixin.py:95} INFO - [2019-07-15 15:12:40,089] {metadata_store.py:50} INFO - MetadataStore initialized
[2019-07-15 15:12:40,117] {logging_mixin.py:95} INFO - [2019-07-15 15:12:40,117] {metadata.py:271} INFO - Publishing execution id: 1
type_id: 1
properties {
  key: "channel_id"
  value {
    string_value: "channel"
  }
}
properties {
  key: "slack_token"
  value {
    string_value: "token"
  }
}
properties {
  key: "state"
  value {
    string_value: "complete"
  }
}
properties {
  key: "timeout_sec"
  value {
    string_value: "42"
  }
}
, with inputs {'model_export': [ModelExportPath:/home/marcel/airflow/dags/assets/custom_slack_test.42]} and outputs {}
[2019-07-15 15:12:40,117] {logging_mixin.py:95} WARNING - --- Logging error ---
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 112, in execute
    return_value = self.execute_callable()
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 117, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 184, in publish_exec
    final_output = self._publish_execution_to_metadata()
[2019-07-15 15:12:40,183] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 92, in _publish_execution_to_metadata
    self._output_dict)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/metadata.py", line 302, in publish_execution
    self._store.put_events(events)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 285, in put_events
    self._swig_call(metadata_store_serialized.PutEvents, request, response)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 76, in _swig_call
    raise _make_exception(error_message, status_code)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - tensorflow.python.framework.errors_impl.InvalidArgumentError: <unprintable InvalidArgumentError object>
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - 
During handling of the above exception, another exception occurred:
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1034, in emit
    msg = self.format(record)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 880, in format
    return fmt.format(record)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 619, in format
    record.message = record.getMessage()
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 378, in getMessage
    msg = str(self.msg)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - TypeError: __str__ returned non-string (type bytes)
[2019-07-15 15:12:40,184] {logging_mixin.py:95} WARNING - Call stack:
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/bin/airflow", line 32, in <module>
    args.func(args)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
    return f(*args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
    _run(args, dag, ti)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
    pool=args.pool,
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1493, in _run_raw_task
    self.handle_failure(e, test_mode, context)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1580, in handle_failure
    self.log.exception(error)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1418, in exception
    self.error(msg, *args, exc_info=exc_info, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1412, in error
    self._log(ERROR, msg, args, **kwargs)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1519, in _log
    self.handle(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1529, in handle
    self.callHandlers(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 1591, in callHandlers
    hdlr.handle(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/logging/__init__.py", line 905, in handle
    self.emit(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING -   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/log/file_task_handler.py", line 61, in emit
    self.handler.emit(record)
[2019-07-15 15:12:40,185] {logging_mixin.py:95} WARNING - Message: InvalidArgumentError()
Arguments: ()
[2019-07-15 15:12:40,186] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-07-15 15:12:40,227] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec /home/marcel/anaconda3/lib/python3.7/site-packages/apache_beam/__init__.py:84: UserWarning: Running the Apache Beam SDK on Python 3 is not yet fully supported. You may encounter buggy behavior or missing features.
[2019-07-15 15:12:40,227] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   'Running the Apache Beam SDK on Python 3 is not yet fully supported. '
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec Traceback (most recent call last):
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/bin/airflow", line 32, in <module>
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     args.func(args)
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return f(*args, **kwargs)
[2019-07-15 15:12:40,229] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     _run(args, dag, ti)
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     pool=args.pool,
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return func(*args, **kwargs)
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     result = task_copy.execute(context=context)
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 112, in execute
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return_value = self.execute_callable()
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 117, in execute_callable
[2019-07-15 15:12:40,230] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 184, in publish_exec
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     final_output = self._publish_execution_to_metadata()
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/airflow/airflow_adapter.py", line 92, in _publish_execution_to_metadata
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     self._output_dict)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/tfx/orchestration/metadata.py", line 302, in publish_execution
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     self._store.put_events(events)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 285, in put_events
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     self._swig_call(metadata_store_serialized.PutEvents, request, response)
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec   File "/home/marcel/anaconda3/lib/python3.7/site-packages/ml_metadata/metadata_store/metadata_store.py", line 76, in _swig_call
[2019-07-15 15:12:40,231] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec     raise _make_exception(error_message, status_code)
[2019-07-15 15:12:40,232] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec tensorflow.python.framework.errors_impl.InvalidArgumentError: <exception str() failed>
[2019-07-15 15:12:40,752] {base_task_runner.py:101} INFO - Job 180: Subtask custom_slack_test.SlackComponent.publishexec swig/python detected a memory leak of type 'ml_metadata::MetadataStore *', no destructor found.
[2019-07-15 15:12:44,746] {logging_mixin.py:95} INFO - [2019-07-15 15:12:44,745] {jobs.py:2562} INFO - Task exited with return code 1

Code

This is my component:

class SlackComponentSpec(base_component.ComponentSpec):
    """ComponentSpec for Custom TFX Slack Component."""

    COMPONENT_NAME = 'SlackComponent'
    PARAMETERS = {
        'slack_token': ExecutionParameter(type=Text),
        'channel_id': ExecutionParameter(type=Text),
        'timeout_sec': ExecutionParameter(type=int),
    }
    INPUTS = {'model_export': ChannelParameter(type_name='ModelExportPath')}
    OUTPUTS = {}

class SlackComponent(base_component.BaseComponent):
    """Custom TFX Slack Component.

    This custom component serves as a bridge between TFX pipeline and human model
    reviewers to enable review-and-push workflow in model development cycle. It
    utilizes Slack API to send message to user-defined Slack channel with model
    URI info and wait for go / no-go decision from the same Slack channel:
      * To approve the model, a user need to reply the thread sent out by the bot
        started by SlackComponent with 'lgtm' or 'approve'.
      * To reject the model, a user need to reply the thread sent out by the bot
        started by SlackComponent with 'decline' or 'reject'.
    """

    SPEC_CLASS = SlackComponentSpec
    EXECUTOR_CLASS = executor.Executor

    def __init__(self,
                 model_export: channel.Channel,
                 slack_token: Text,
                 channel_id: Text,
                 timeout_sec: int,
                 name: Optional[Text] = None):
        """Construct a SlackComponent.

        Args:
          model_export: A Channel of 'ModelExportPath' type, usually produced by
            Trainer component.
          model_blessing: A Channel of 'ModelBlessingPath' type, usually produced by
            ModelValidator component.
          slack_token: A token used for setting up connection with Slack server.
          channel_id: Slack channel id to communicate on.
          timeout_sec: Seconds to wait for response before default to reject.
          slack_blessing: Optional output channel of 'ModelBlessingPath' with result
            of blessing; will be created for you if not specified.
          name: Optional unique name. Necessary if multiple Pusher components are
            declared in the same pipeline.
        """

        spec = SlackComponentSpec(
            model_export=model_export,
            slack_token=slack_token,
            channel_id=channel_id,
            timeout_sec=timeout_sec)
        super(SlackComponent, self).__init__(spec=spec, name=name)

Your component code looks good actually.

And my pipeline, configuring the Component:


    input_artifact = TfxArtifact(type_name="ModelExportPath")
    input_artifact.uri = _taxi_root
    input_artifact.id = 42

    input_channel = channel.Channel(artifacts=[input_artifact], type_name="ModelExportPath")

See my previous comment, the input artifact here is created manually without being registered in MLMD which is not regarded valid. If you refer the output of the trainer as the model_export, your pipeline will run fine I think.

slack = component.SlackComponent(slack_token="token",
                                 channel_id="channel",
                                 timeout_sec=42,
                                 model_export=input_channel)
rummens commented 5 years ago

Thanks, the driver hint was the key. Everything runs fine now :-)

Only question I have is regarding the file name for the output artifcat. I cannot figure out how you share it between different components, so I just changed the uri of the output artifact to match the filename that I have created. Is there a better way or do upstream components need to know the filename via documentation etc.?

class Executor(base_executor.BaseExecutor):
    """Executor for Slack component."""

    def Do(self, input_dict: Dict[Text, List[types.TfxArtifact]],
           output_dict: Dict[Text, List[types.TfxArtifact]],
           exec_properties: Dict[Text, Any]) -> None:
        """Get human review result on a model through Slack channel.

        Args:
          input_dict: Input dict from input key to a list of artifacts, including:
            - input_example: an example for an input

          output_dict: Output dict from key to a list of artifacts, including:
            - output_example: an example for an output
          exec_properties: A dict of execution properties, including:
            - string_parameter: An string execution parameter (only used in here, not persistent or shared up stream)
            - integer_parameter: An integer execution parameter (only used in here, not persistent or shared up stream)
            - input_config: not of concern here, only relevant for Driver
            - output_config: not of concern here, only relevant for Driver

        Returns:
          None
        """
        self._log_startup(input_dict, output_dict, exec_properties)

        # Fetch execution properties from exec_properties dict.
        string_parameter = exec_properties['string_execution_parameter']
        integer_parameter = exec_properties['integer_execution_parameter']

        # Fetch input URIs from input_dict.
        input_example_uri = types.get_single_uri(input_dict['input_example'])

        # Fetch output artifact from output_dict.
        output_example = types.get_single_instance(output_dict['output_example'])

        print("I AM RUNNING!")
        print(string_parameter)
        print(integer_parameter)
        print(input_example_uri)
        print(output_example)

        input_data = ""

        # load your input
        if tf.gfile.Exists(input_example_uri):
            with open(input_example_uri, "r") as file:
                input_data = file.read()

        # make some changes
        output_data = input_data + " changed by an awesome custom executor!"

        # update output uri for up stream components to know the filename
        output_example.uri = os.path.join(output_example.uri, _DEFAULT_FILE_NAME)

        # write the changes back to your output
        io_utils.write_string_file(output_example.uri, output_data)

        # you can also set custom properties to make checks in up stream components more quickly.
        # this is optional.
        output_example.set_string_custom_property('stringProperty', "Awesome")
        output_example.set_int_custom_property('intProperty', 42)
ruoyu90 commented 5 years ago

@rummens The URI is auto-assigned by TFX through driver. If you only want to run the pipeline, you don't need to worry about the file URIs. If you want to look into the output of a specific component, you can use ML metadata API call to get the URI of the artifact produced by a component execution. You can refer to the example here. We're also working on a more user-friendly visualization solution that will cover the functionality above and many more.

rummens commented 5 years ago

But I have to tell MLMD at some point the name of the file(s) I am writing to, right? If I don’t overwrite the URI it will only contain the path to the actual folder, but not the file. A downstream component might not know which file to use, if multiple exists. The workaround above works fine for me but I was wondering if this is the intended way?

ruoyu90 commented 5 years ago

@rummens you don't have to worry about the URI which is auto-assigned by TFX framework. The downstream will know the URI as long as you set up right (the downstream component instance is using upstream output as input). As you can see in our Chicago taxi example, no URI is set for any specific output.

rummens commented 5 years ago

Thank you very much @ruoyu90, you really helped me out. Everything is working now ;-)

zhitaoli commented 5 years ago

Hi @rummens, glad things worked out. If your component will end up in open source, please feel free to send us a link and we are happy to cross link it in our docs.

rummens commented 5 years ago

Hi @zhitaoli, my summary was hosted on our internal git but I put the intressting part on a seperate repo. Feel free to propose changes :-)

robertlugg commented 5 years ago

Thanks @rummens. I had a question, happy to open another issue, but its about Channel. One is instantiated like this: input_channel = channel.Channel(artifacts=[input_artifact], type_name="ModelExportPath") My question is why must type_name be specified? Each artifact has its type, so Channel can get it from there. All it needs to do is make sure they all are the same. Correct?

rummens commented 5 years ago

That is a good question, that I stumbled across as well. I am not sure why this is but I believe the design decision was made for a reason.

Maybe @ruoyu90 can help with this?

robertlugg commented 5 years ago

I see you can use the convenience function in the channel module:

output_channel = channel.as_channel([output_instance])

, and that's fine.

I also believe we may be using 'upstream' and 'downstream' differently. I consider a component which is 'downstream' to be the one which consumes some output from its 'upstream' "node". I believe this is the more common convention.

ruoyu90 commented 5 years ago

I see you can use the convenience function in the channel module:

output_channel = channel.as_channel([output_instance])

Yes this is the convenient function for the purpose. Actually we might consider removing the optional artifacts parameter in Channel init function and merge the as_channel function to channel.py , and that's fine.

I also believe we may be using 'upstream' and 'downstream' differently. I consider a component which is 'downstream' to be the one which consumes some output from its 'upstream' "node". I believe this is the more common convention.

Do you mean the upstream_nodes and downstream_nodes defined in BaseNode / BaseComponent? I believe they are having the same meaning as you suggested?

gowthamkpr commented 4 years ago

@robertlugg Can you please respond to the above question. Thanks!

robertlugg commented 4 years ago

Please ignore my "upstream/downstream" comment from before. I reviewed this issue thread and everything seems as I'd expect.

rmothukuru commented 3 years ago

@rummens, Detailed documentation is present in Tensorflow.org site about Custom Components. Respective Links are mentioned below:

  1. Understanding Custom Components
  2. Python based Custom Components
  3. Container based Components
  4. Building Fully Custom Components

Hope this answers your question. Thanks!

rmothukuru commented 3 years ago

@rummens, Can you please confirm if we can close this issue. Thanks!

rummens commented 3 years ago

yes, thank you very much!