tensorflow / tfx

TFX is an end-to-end platform for deploying production ML pipelines
https://tensorflow.org/tfx
Apache License 2.0
2.12k stars 707 forks source link

TFX Evaluator: Cast string to float is not supported #2920

Closed vr-devil closed 3 years ago

vr-devil commented 3 years ago

Tensorflow Version: 2.3.1 TFX Version: 0.25.0 Python Version: 3.7.3

I encounter a problem which the value of label of raw example didn't be transformed to indicator format in evaluator component.

pipeline:

    components = []

    example_gen = CsvExampleGen(input_base=data_path)
    components.append(example_gen)

    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
    components.append(statistics_gen)

    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs['statistics'],
        infer_feature_shape=True)
    components.append(schema_gen)

    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema'])
    components.append(example_validator)

    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        preprocessing_fn=PREPROCESSING_FN
    )
    components.append(transform)

    # Uses user-provided Python function that implements a model using TF-Learn.
    trainer_args = {
        'run_fn': RUN_FN,
        'transformed_examples': transform.outputs['transformed_examples'],
        'schema': schema_gen.outputs['schema'],
        'transform_graph': transform.outputs['transform_graph'],
        'train_args': trainer_pb2.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
        'eval_args': trainer_pb2.EvalArgs(num_steps=configs.EVAL_NUM_STEPS),
        'custom_executor_spec':
            executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
    }

    trainer = Trainer(**trainer_args)
    components.append(trainer)

    # Get the latest blessed model for model validation.
    model_resolver = ResolverNode(
        instance_name='latest_blessed_model_resolver',
        resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing))
    components.append(model_resolver)

    # Uses TFMA to compute a evaluation statistics over features of a model and
    # perform quality validation of a candidate model (compared to a baseline).
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key="tags")],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=tfma.metrics.specs_from_metrics([
            tfma.metrics.ExampleCount(name='example_count'),
            tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        ]))
    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        # Change threshold will be ignored if there is no baseline (first run).
        eval_config=eval_config)
    components.append(evaluator)

    # Checks whether the model passed the validation steps and pushes the model
    # to a file destination if check passed.
    pusher_args = {
        'model':
            trainer.outputs['model'],
        'model_blessing':
            evaluator.outputs['blessing'],
        'push_destination':
            pusher_pb2.PushDestination(
                filesystem=pusher_pb2.PushDestination.Filesystem(
                    base_directory=serving_model_dir)),
    }

    pusher = Pusher(**pusher_args)  # pylint: disable=unused-variable
    components.append(pusher)

transformation:

def _fill_in_missing(x):
    """Replace missing values in a SparseTensor.

    Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

    Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
    in the second dimension.

    Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
    """
    if isinstance(x, tf.sparse.SparseTensor):
        default_value = '' if x.dtype == tf.string else 0
        dense_tensor = tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value)
    else:
        dense_tensor = x

    return tf.squeeze(dense_tensor, axis=1)

def _transform_labels(inputs):
    labels = inputs[features.LABEL_KEY]
    print("labels", labels)
    labels_fill = _fill_in_missing(labels)
    print("labels_fill", labels_fill)
    labels_split = tf.compat.v1.strings.split(labels_fill, sep=",")
    print("labels_split", labels_split)
    labels_tokens = tft.compute_and_apply_vocabulary(labels_split, vocab_filename=features.LABEL_KEY)
    print("labels_tokens", labels_tokens)
    labels_indicators = tf.reduce_max(tf.one_hot(tf.sparse.to_dense(labels_tokens), depth=12), axis=1)
    print("labels_indicators", labels_indicators)
    return labels_indicators

def preprocessing_fn(inputs):

    contents = _transform_contents(inputs)
    labels = _transform_labels(inputs)

    outputs = {
        "content_xf": contents,
        "tags_xf": labels
    }

    return outputs

trainer:

def _input_fn(file_pattern, data_accessor, tf_transform_output, batch_size=200):
    """Generates features and label for tuning/training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
    return data_accessor.tf_dataset_factory(
        file_pattern,
        dataset_options.TensorFlowDatasetOptions(
            batch_size=batch_size,
            label_key="tags_xf"),
        tf_transform_output.transformed_metadata.schema)

def _build_model():
    n_labels = 12
    dropout_rate = 0.3
    learning_rate = 1e-3

    columns = tf.feature_column.numeric_column("content_xf", shape=features.TOP_K)

    input_layer = {"content_xf": keras.layers.Input(name="content", shape=features.TOP_K)}

    inputs = keras.layers.DenseFeatures(columns)(input_layer)
    x = keras.layers.Dense(128, kernel_initializer='he_uniform', activation='relu')(inputs)
    x = keras.layers.Dropout(dropout_rate)(x)
    x = keras.layers.Dense(128, kernel_initializer='he_uniform', activation='relu')(x)
    x = keras.layers.Dropout(dropout_rate)(x)
    outputs = keras.layers.Dense(n_labels, activation='sigmoid')(x)

    model = keras.Model(input_layer, outputs=outputs)
    model.compile(
        loss='binary_crossentropy',
        optimizer=keras.optimizers.Adam(lr=learning_rate))
    model.summary(print_fn=logging.info)

    return model

def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(features.LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn

def run_fn(fn_args):
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
                              tf_transform_output, constants.TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
                             tf_transform_output, constants.EVAL_BATCH_SIZE)

    # # Write logs to path
    # tensorboard_callback = tf.keras.callbacks.TensorBoard(
    #     log_dir=fn_args.model_run_dir, update_freq='batch')

    print(train_dataset)
    print("element_spec", train_dataset.element_spec)

    model = _build_model()
    model.fit(
        train_dataset,
        epochs=10,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps)

    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
                tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
            ),
    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

when evaluating the model, below exception occurred:

IINFO:absl:Running executor for Evaluator
WARNING:absl:"maybe_add_baseline" and "maybe_remove_baseline" are deprecated,
        please use "has_baseline" instead.
INFO:absl:Request was made to ignore the baseline ModelSpec and any change thresholds. This is likely because a baseline model was not provided: updated_config=
model_specs {
  label_key: "tags"
}
slicing_specs {
}
metrics_specs {
  metrics {
    class_name: "ExampleCount"
    config: "{\"name\": \"example_count\"}"
  }
}
metrics_specs {
  metrics {
    class_name: "WeightedExampleCount"
    config: "{\"name\": \"weighted_example_count\"}"
  }
}
metrics_specs {
  metrics {
    class_name: "ExampleCount"
    config: "{\"name\": \"example_count\"}"
  }
}

INFO:absl:Using ./pipeline_output/Trainer/model/218/serving_model_dir as  model.
INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
2020-12-04 18:46:17.286423: W tensorflow/core/framework/op_kernel.cc:1744] OP_REQUIRES failed at cast_op.cc:124 : Unimplemented: Cast string to float is not supported
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1006, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 1035, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/evaluators/metrics_plots_and_validations_evaluator.py", line 341, in add_input
    result = c.add_input(a, get_combiner_input(elements[0], i))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 630, in add_input
    self._process_batch(accumulator)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 594, in _process_batch
    metric.update_state(*inputs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/utils/metrics_utils.py", line 90, in decorated
    update_op = update_state_fn(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 176, in update_state_fn
    return ag_update_state(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 255, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 457, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 339, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 603, in update_state
    y_true = math_ops.cast(y_true, self._dtype)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/math_ops.py", line 922, in cast
    x = gen_math_ops.cast(x, base_type, name=name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/gen_math_ops.py", line 1858, in cast
    _ops.raise_from_not_ok_status(e, name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "beam_dag_runner.py", line 30, in <module>
    run_pipeline()
  File "beam_dag_runner.py", line 24, in run_pipeline
    metadata_connection_config=metadata.sqlite_metadata_connection_config(METADATA_PATH)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 165, in run
    absl.logging.info('Component %s is scheduled.', component_id)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 568, in __exit__
    self.result = self.run()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 346, in run_stages
    bundle_context_manager,
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 534, in _run_stage
    bundle_manager)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 572, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle
    element.data)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 84, in process
    self._run_component()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 88, in _run_component
    self._component_launcher.launch()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch
    copy.deepcopy(execution_decision.exec_properties))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 72, in _run_executor
    copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tfx/components/evaluator/executor.py", line 259, in Do
    tensor_adapter_config=tensor_adapter_config))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 568, in __exit__
    self.result = self.run()
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/pipeline.py", line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 346, in run_stages
    bundle_context_manager,
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 534, in _run_stage
    bundle_manager)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 572, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle
    element.data)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 809, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 818, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1221, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 860, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1069, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 1072, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1069, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 1072, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 155, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1376, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1006, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 1035, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/evaluators/metrics_plots_and_validations_evaluator.py", line 341, in add_input
    result = c.add_input(a, get_combiner_input(elements[0], i))
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 630, in add_input
    self._process_batch(accumulator)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 594, in _process_batch
    metric.update_state(*inputs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/utils/metrics_utils.py", line 90, in decorated
    update_op = update_state_fn(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 176, in update_state_fn
    return ag_update_state(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 255, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 457, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/autograph/impl/api.py", line 339, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/keras/metrics.py", line 603, in update_state
    y_true = math_ops.cast(y_true, self._dtype)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/math_ops.py", line 922, in cast
    x = gen_math_ops.cast(x, base_type, name=name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/ops/gen_math_ops.py", line 1858, in cast
    _ops.raise_from_not_ok_status(e, name)
  File "/Users/kaizhong/Library/Python/3.7/lib/python/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
RuntimeError: tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast] [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/EvaluateMetricsAndPlots/ComputeMetricsAndPlots()/ComputePerSlice/ComputeUnsampledMetrics/CombinePerSliceKey/WindowIntoDiscarding']

debug infomation: image

i had read all tutorials & guide, and still have no idea how to resolve this problem.

mdreves commented 3 years ago

When a signature_name is not set the keras model is called directly instead. In this case your model has multiple inputs. I think you are passing serialized tf.Examples to the model so what you want to do is call the default serving signature instead. This can be configured by setting the signature_name in the ModelSpec. For example:

eval_config = tfma.EvalConfig(
    # NOTE the addition of signature_name here.
    model_specs=[tfma.ModelSpec(signature_name="serving_default", label_key="tags")],
    ...)
hanneshapke commented 3 years ago

Hi @mdreves & @kai-zhong, I am facing the same issue as @kai-zhong. I am sorry if I misunderstand the issue, but is it actually supported to transform labels by TFMA? In the given examples, the raw labels are strings, transformed into token ids. In _get_serve_tf_examples_fn, the label (raw string) is dropped and never converted to an int. The model predicts the index via the softmax, but for the metric calculation, the label would have to go through the same (TFT) transformations as during the preprocessing. In @kai-zhong, isn't the model generating tags_xf, but we try to compare to tags?

mdreves commented 3 years ago

TFT transformations are not yet supported. My hope is that they will be in the next TFMA release, but there is still an outstanding issue to resolve so it might not make it in before the cut off.

hanneshapke commented 3 years ago

Hi @mdreves, Thank you so much for the fast reply! Really amazing. I hope the TFT support by TFMA makes in the next release. If not, I am happy to test it via the nightly versions.

Thank you for all your work and support, Hannes

vr-devil commented 3 years ago

hi @mdreves , thank you for support. I specified the signature_name using "serving_default", but the problem has not be resolved.

INFO:absl:Running executor for Evaluator
WARNING:absl:"maybe_add_baseline" and "maybe_remove_baseline" are deprecated,
        please use "has_baseline" instead.
INFO:absl:Request was made to ignore the baseline ModelSpec and any change thresholds. This is likely because a baseline model was not provided: updated_config=
model_specs {
  signature_name: "serving_default"
  label_key: "tags"
}
slicing_specs {
}
metrics_specs {
  metrics {
    class_name: "ExampleCount"
    config: "{\"name\": \"example_count\"}"
  }
}
metrics_specs {
  metrics {
    class_name: "WeightedExampleCount"
    config: "{\"name\": \"weighted_example_count\"}"
  }
}
metrics_specs {
  metrics {
    class_name: "ExampleCount"
    config: "{\"name\": \"example_count\"}"
  }
}

INFO:absl:Using ./pipeline_output/Trainer/model/6/serving_model_dir as  model.
INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
2020-12-09 11:03:45.310153: W tensorflow/core/framework/op_kernel.cc:1744] OP_REQUIRES failed at cast_op.cc:124 : Unimplemented: Cast string to float is not supported
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1006, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 1035, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow_model_analysis/evaluators/metrics_plots_and_validations_evaluator.py", line 341, in add_input
    result = c.add_input(a, get_combiner_input(elements[0], i))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 630, in add_input
    self._process_batch(accumulator)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 594, in _process_batch
    metric.update_state(*inputs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/keras/utils/metrics_utils.py", line 90, in decorated
    update_op = update_state_fn(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/keras/metrics.py", line 176, in update_state_fn
    return ag_update_state(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 255, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 457, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 339, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/keras/metrics.py", line 603, in update_state
    y_true = math_ops.cast(y_true, self._dtype)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/ops/math_ops.py", line 922, in cast
    x = gen_math_ops.cast(x, base_type, name=name)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/ops/gen_math_ops.py", line 1858, in cast
    _ops.raise_from_not_ok_status(e, name)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "beam_dag_runner.py", line 30, in <module>
    run_pipeline()
  File "beam_dag_runner.py", line 24, in run_pipeline
    metadata_connection_config=metadata.sqlite_metadata_connection_config(METADATA_PATH)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 165, in run
    absl.logging.info('Component %s is scheduled.', component_id)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 568, in __exit__
    self.result = self.run()
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 346, in run_stages
    bundle_context_manager,
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 534, in _run_stage
    bundle_manager)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 572, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle
    element.data)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 867, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 84, in process
    self._run_component()
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tfx/orchestration/beam/beam_dag_runner.py", line 88, in _run_component
    self._component_launcher.launch()
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tfx/orchestration/launcher/base_component_launcher.py", line 209, in launch
    copy.deepcopy(execution_decision.exec_properties))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py", line 72, in _run_executor
    copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tfx/components/evaluator/executor.py", line 259, in Do
    tensor_adapter_config=tensor_adapter_config))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 568, in __exit__
    self.result = self.run()
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 547, in run
    return self.runner.run_pipeline(self, self._options)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 176, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 186, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 346, in run_stages
    bundle_context_manager,
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 534, in _run_stage
    bundle_manager)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 572, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 852, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 353, in push
    response = self.worker.do_instruction(request)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 985, in process_bundle
    element.data)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 354, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 809, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 818, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1221, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 860, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1069, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 1072, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1069, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 1072, in apache_beam.runners.worker.operations.FlattenOperation.process
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 155, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1376, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 742, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 865, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1006, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "apache_beam/runners/worker/operations.py", line 1035, in apache_beam.runners.worker.operations.PGBKCVOperation.process
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow_model_analysis/evaluators/metrics_plots_and_validations_evaluator.py", line 341, in add_input
    result = c.add_input(a, get_combiner_input(elements[0], i))
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 630, in add_input
    self._process_batch(accumulator)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow_model_analysis/metrics/tf_metric_wrapper.py", line 594, in _process_batch
    metric.update_state(*inputs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/keras/utils/metrics_utils.py", line 90, in decorated
    update_op = update_state_fn(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/keras/metrics.py", line 176, in update_state_fn
    return ag_update_state(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 255, in wrapper
    return converted_call(f, args, kwargs, options=options)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 457, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 339, in _call_unconverted
    return f(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/keras/metrics.py", line 603, in update_state
    y_true = math_ops.cast(y_true, self._dtype)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/ops/math_ops.py", line 922, in cast
    x = gen_math_ops.cast(x, base_type, name=name)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/ops/gen_math_ops.py", line 1858, in cast
    _ops.raise_from_not_ok_status(e, name)
  File "/Users/kaizhong/Workspace/sjk-python/text_classifier_tfx/venv/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
RuntimeError: tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast] [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/EvaluateMetricsAndPlots/ComputeMetricsAndPlots()/ComputePerSlice/ComputeUnsampledMetrics/CombinePerSliceKey/WindowIntoDiscarding']

hi @hanneshapke , thank you too for reply.

you don't misunderstand the issue. and wether drop label in _get_serve_tf_examples_fn or not, the raw label is not transformed.

@mdreves I am happy test it, too.

forgive me poor english.

mdreves commented 3 years ago

If you can provide an example of what the label 'tags' looks like that will help. In addition if you can provide an example of the prediction output from calling inference on your model.

vr-devil commented 3 years ago

ok , @mdreves .

my model is for multi-label classification.

csv looks like: content tags
word word word word word ... label1,label2
word word word word word ... label3,label6
word word word word word ... label8,label2,label10
word word word word word ... label5

current have 12 classes. maybe have more number classes in future.

I want transform tags to indictor format like [1, 0, 0, 1 ..... ], and model prediction is indictor format too, and eventually transform back to string labels when calling inference.

hanneshapke commented 3 years ago

@kai-zhong I think you might also need the additional feature in TFMA to use the label transformations from TFT which Mike mentioned.

@mdreves I am happy to test the TFMA + TFT integration for labels. I have a pipeline ready to test the new feature. Happy to compile TFMA versions from a feature branch if that helps. Do you mind posting in this thread when we can test the new feature of TFMA on master or a feature branch? Thank you!

vr-devil commented 3 years ago

@hanneshapke what are the additional feature you talking about, exactly? I don't get it.

hanneshapke commented 3 years ago

@kai-zhong Currently TFMA can't transform your labels from label1, label2 to one-hot-vector1,one-hot-vector2,... In contrast to the model training, TFMA is using raw data for the model evaluation.

    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],   <---- configured to use raw/untransformed examples

The model predicts a one-hot vector (due to the softmax), but the label TFMA is receiving from the data sets are still strings. @mdreves mentioned that TFMA will support the label transformations in the future. Currently, any pipeline transforming labels as part of the pipeline is running into this issue like us. Seems like we need a feature store :)

I hope the explanation makes sense.

mdreves commented 3 years ago

@hanneshapke thanks for responding, that is what I suspect is the issue as well.

I'll try to post to this thread once we have a workable implementation with keras and TFT.

mdreves commented 3 years ago

@hanneshapke, @kai-zhong: You can give the following a try (should be in 0.26 release):

    eval_config = tfma.EvalConfig(
        # Add "tft_layer" to preprocessing_function_names to get the transformations. 
        # Then update label_key to point to transformed label name.
        model_specs=[tfma.ModelSpec(label_key="xf_tags", preprocessing_function_names="tft_layer")],
        ...))

Let me know if you have issues.

hanneshapke commented 3 years ago

Hi @mdreves ,

Thank you for your message. I was very excited to see the changes on the master branch. I tried to test the rc0 version of 0.26.0, but the pip package seems broken. pip install completes fine, but tfx can't be imported. When I downgrade to 0.25.0, tfx can be imported. I have reproduced the issue on different instances (ubuntu 20.10, python3.8, GCPs deep learning image with TF2.3 and 2.4).

Python 3.8.6 (default, Sep 25 2020, 09:36:53) 
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import tfx
>>> tfx.__version__
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: module 'tfx' has no attribute '__version__'

@mdreves Just posting this here to let you know that I am trying to test the package, but wasn't able to test tfma so far. I'll investigate the install problem further.

@kai-zhong Where you able to install tfx==0.26.0rc0?

hanneshapke commented 3 years ago

@mdreves quick update from my side:

ValueError: t not found in model signatures: _SignatureMap({'serving_default': <ConcreteFunction signature_wrapper(description) at 0x7F19C394A9D0>}) [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/ExtractTransformedFeatures/Predict']

My exported model contains a signature with the name serving_default.

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['description'] tensor_info:
        dtype: DT_STRING
        shape: unknown_rank
        name: serving_default_description:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['label_xf'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 58)
        name: StatefulPartitionedCall_2:0
  Method name is: tensorflow/serving/predict

I am export the model with the following concrete function:

    features_spec = dict(
        description=tf.TensorSpec(shape=(None), dtype=tf.string),
    )

    signatures = {
        'serving_default':
            _get_serve_raw_input_fn(model, tf_transform_output).get_concrete_function(features_spec),
    }

    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

and _get_serve_raw_input_fn is defined as follows:

def _get_serve_raw_input_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(description):
        input_features = dict()
        input_features[features.TEXT_KEY] = tf.reshape(description[features.TEXT_KEY], [-1, 1])
        transformed_features = model.tft_layer(input_features)
        outputs = model(transformed_features)
        return {'label_xf': outputs}

    return serve_tf_examples_fn

My TFMA configuration looks like:

eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(signature_name="serving_default", label_key="label_xf", preprocessing_function_names="tft_layer")],
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(
                class_name='SparseCategoricalAccuracy',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.1}),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value': -1e-2})))
        ])
    ]
)

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config
)

Have you seen the error before?

mdreves commented 3 years ago

Sorry I sent you a typo earlier, the preprocessing_function_names is a list not a single value. It is processing the "tft_layer" string as a list of chars which is why "t" is used. The config should be:

eval_config = tfma.EvalConfig(
    # NOTE the use of ["tft_layer"] vs "tft_layer"
    model_specs=[tfma.ModelSpec(signature_name="serving_default", label_key="label_xf", preprocessing_function_names=["tft_layer"])],
    ...
)
hanneshapke commented 3 years ago

@mdreves Thank you for your reply. Your hint resolved the previous issue. I am now getting the following errors:

/opt/conda/lib/python3.7/site-packages/tensorflow_model_analysis/model_util.py in input_specs_to_tensor_representations(input_specs)
    444     else:
    445       tensor_representation.dense_tensor.column_name = name
--> 446       for dim in type_spec.shape[1:] if len(type_spec.shape) > 1 else []:
    447         if dim is None:
    448           raise ValueError(

/opt/conda/lib/python3.7/site-packages/tensorflow/python/framework/tensor_shape.py in __len__(self)
    844     """Returns the rank of this shape, or raises ValueError if unspecified."""
    845     if self._dims is None:
--> 846       raise ValueError("Cannot take the length of shape with unknown rank.")
    847     return len(self._dims)
    848 

ValueError: Cannot take the length of shape with unknown rank. [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/ExtractPredictions/Predict']

I changed the feature specs for the model signature from None to 1, but no luck. The error persists.

    features_spec = dict(
        description=tf.TensorSpec(shape=(1), dtype=tf.string),
    )
vr-devil commented 3 years ago

@mdreves thanks for your great work. so cool. the issue is resolved.

@hanneshapke hi, I have no problem. I provide some information to you. hope that can help you.

this is my _get_serve_tf_examples_fn.

def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        # feature_spec.pop(features.LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn

this is my model.

(venv) ➜  text_classifier_tfx git:(tfx) ✗ saved_model_cli show --dir ./pipeline_output/Trainer/model/64/serving_model_dir --tag_set serve --signature_def serving_default
The given SavedModel SignatureDef contains the following input(s):
  inputs['examples'] tensor_info:
      dtype: DT_STRING
      shape: (-1)
      name: serving_default_examples:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['output_0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 12)
      name: StatefulPartitionedCall_1:0
Method name is: tensorflow/serving/predict
hanneshapke commented 3 years ago

@kai-zhong Thank you for letting me know. I will investigate my implementation. I am super excited about this TFMA/TFT feature.

mdreves commented 3 years ago

@kai-zhong

This still isn't fully working as we expect (i.e. having to include the label in the serving function isn't ideal). If your label is optional then this might be workable for now, but I wouldn't call this a success just yet.

@hanneshapke

Can you paste the output for the following:

model.tft_layer._get_save_spec()

hanneshapke commented 3 years ago

Thank you @mdreves for your reply. The output of model.tft_layer._get_save_spec() is None.

I continued with the experimenting with the tft implementation in tfma. I am unsure if label_key should be the name of the transformed label or the name of the raw label (in my case, label represents the raw label, e.g. car, tree, while label_xf represents the id of the label produced via tft.compute_and_apply_vocabulary.

In config.proto I read that

  // Label key (single-output model). The key can identify either a transformed
  // feature (see preprocessing_function_names) or a raw input feature. 

If I use the raw label label, then I am getting a RuntimeError

RuntimeError: tensorflow.python.framework.errors_impl.UnimplementedError: Cast string to float is not supported [Op:Cast] [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/EvaluateMetricsAndPlots/ComputeMetricsAndPlots()/ComputePerSlice/ComputeUnsampledMetrics/CombinePerSliceKey/WindowIntoDiscarding']

If I use the name of the transformed label label_xf (as you mentioned in your message, then I am getting the following error now:

ValueError: no value provided for label: model_name=, output_name=, sub_key=None, aggregation_type=None, StandardMetricInputs=StandardMetricInputs(label=None, prediction=array([0.05338061, 0.02272601, 0.04672484, 0.02584095, 0.01440819,
       0.07280581, 0.02458213, 0.01275045, 0.02832412, 0.05874791,
       0.0069367 , 0.00378417, 0.00272558, 0.00569465, 0.00334701,
       0.00393633, 0.00602159, 0.00551182, 0.00420852, 0.00989084,
       0.00469063, 0.00745677, 0.01901124, 0.00471016, 0.00766135,
       0.10077026, 0.04602642, 0.01656717, 0.00872794, 0.00305044,
       0.0207827 , 0.02860937, 0.00401834, 0.00827207, 0.02177576,
       0.04321518, 0.00406642, 0.00541327, 0.00485998, 0.06251741,
       0.01274851, 0.00903907, 0.00273021, 0.00271059, 0.00720123,
       0.00304619, 0.01558074, 0.00451899, 0.00686545, 0.00558727,
       0.00393828, 0.00385722, 0.00159618, 0.01395822, 0.06245903,
       0.00614076, 0.00206682, 0.0014041 ], dtype=float32), example_weight=None, features=None)

This may be caused by a configuration error (i.e. label, and/or prediction keys were not specified) or an error in the pipeline. [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/EvaluateMetricsAndPlots/ComputeMetricsAndPlots()/ComputePerSlice/ComputeUnsampledMetrics/CombinePerSliceKey/WindowIntoDiscarding']

The error makes sense since the transformed label is the vocab id from the tft.compute_and_apply_vocabulary transformation. The network's last layer is a softmax layer and I calculate the loss from logits with tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True).

Is such a case even supported by TFMA without TFT transformations>?

hanneshapke commented 3 years ago

One more test on my side: I changed the labels from indices to one hot encoded vectors and I swapped the SparseCategoricalAccuracy to CategoricalAccuracy. The error remains the same.

mdreves commented 3 years ago

To use transformed labels, you would need to use labels_xf in your case. The reason you get the last error is because the TFT transform didn't work so the label is None. When you called model.tft_layer._get_save_spec() was this before the model was saved? Can you try loading the model using model = tf.kears.models.load_model(...) and then calling _get_save_spec().

hanneshapke commented 3 years ago

@mdreves After loading the model, I am getting now:

>> model.tft_layer._get_save_spec()
{'description': TensorSpec(shape=(None, 1), dtype=tf.string, name='description')}
mdreves commented 3 years ago

Check your tft_layer setup's inputs and outputs. Based on the above, the only output from the layer is 'description'. Also see the previous comments about the issue of not being able to pop the label from the feature_spec for the serving function in order to this to work properly currently.

mdreves commented 3 years ago

I believe issue #3016 is different, please see comments there for a possible solution for that error.

mdreves commented 3 years ago

This is the current workaround for having it work both with inference and with TFMA:

def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        if not model.tft_layer.built:
            # Temporary workaround: Need to call the tft_layer with the label so that
            # it will be included in the layer's input_spec. This is needed so that
            # TFMA can call tft_layer with labels. However, the actual call for
            # inference is done without the label.
            parsed_features_with_label = tf.io.parse_example(
                serialized_tf_examples, feature_spec)
            _ = model.tft_layer(parsed_features_with_label)
        feature_spec.pop(_LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn

Then in the TFMA config use "tft_layer" for the preprocessing function:


eval_config = tfma.EvalConfig(
    # NOTE the use of ["tft_layer"] vs "tft_layer"
    model_specs=[tfma.ModelSpec(signature_name="serving_default", label_key="label_xf", preprocessing_function_names=["tft_layer"])],
    ...
)```
arghyaganguly commented 3 years ago

Closing this based on above comment thread.Please feel free to reopen.

google-ml-butler[bot] commented 3 years ago

Are you satisfied with the resolution of your issue? Yes No

hanneshapke commented 3 years ago

@arghyaganguly Quick question: I upgraded to TFX 0.29 and get the error when I am using the tft_layer:

ValueError: tft_layer not found in model signatures: _SignatureMap({'serving_default': <ConcreteFunction signature_wrapper(*, examples) at 0x7F3A269CE090>}) [while running 'ExtractEvaluateAndWriteResults/ExtractAndEvaluate/ExtractTransformedFeatures/Predict']

Did something major change in the TFT functionality with TFMA?

My _get_serve_tf_examples_fn function is the one suggested by @mdreves

def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        if not model.tft_layer.built:
            # Temporary workaround: Need to call the tft_layer with the label so that
            # it will be included in the layer's input_spec. This is needed so that
            # TFMA can call tft_layer with labels. However, the actual call for
            # inference is done without the label.
            parsed_features_with_label = tf.io.parse_example(
                serialized_tf_examples, feature_spec)
            _ = model.tft_layer(parsed_features_with_label)
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn

The trained model is then saved as usual in my run_fn function:

    signatures = {
        "serving_default": _get_serve_tf_examples_fn(
            model, tf_transform_output
        ).get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")
        ),
    }
    model.save(
        fn_args.serving_model_dir, save_format="tf", signatures=signatures
    )

And later I configured TFMA as follows:

eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key="label_xf", preprocessing_function_names=["tft_layer"])],
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[...])

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)

I am getting the error when I am executing the evaluator in the pipeline.

Did something change in the original TFT/TFMA example with the version 0.29? Thank you for your reply.

hanneshapke commented 3 years ago

@mdreves @arghyaganguly Let's close this issue.

I noticed that my pipelines failed because the order of the signature functions matter. The function which calls the tft_layer first needs to include the label in order to with TFMA. My pipelines failed if the serving_default signature function would be called first (I remove the label from the tft preprocessing there.

    signatures = {
        'eval': _get_eval_tf_examples_fn(
            model, tf_transform_output).get_concrete_function(example_input_feature_spec),
        'serving_default': _get_serve_tf_raw_fn_with_post_process(
            model, tf_transform_output).get_concrete_function(raw_input_features_spec),
        'token_ids': _get_lit_tf_token_ids_fn(
            model).get_concrete_function(raw_input_token_id_spec)
    }

@mdreves @arghyaganguly Should this behavior be documented? If so, where do you think is a good place? Happy to provide a PR around it. ...

axeltidemann commented 3 years ago

@hanneshapke I am experiencing the exact same issue. Can you be a bit more specific about what you changed to the order of the signature functions? Did your pipeline call eval somewhere before serving_default?

mdreves commented 3 years ago

Having a shared serving function is no longer recommended. Instead, create a separate signature function for just the preprocessing and then update the EvalConfig to specify this function under preprocessing_function_names leaving the main signature function for inference.

For example:

  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def serve_tf_examples_fn(serialized_tf_example):
    """Returns the output to be used in the serving signature."""
    raw_feature_spec = tf_transform_output.raw_feature_spec()
    # Remove label feature since these will not be present at serving time.
    raw_feature_spec.pop(_LABEL_KEY)
    raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
    transformed_features = model.tft_layer(raw_features)

    return model(transformed_features)

  @tf.function(input_signature=[
      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  ])
  def transform_features_fn(serialized_tf_example):
    """Returns the transformed_features to be fed as input to evaluator."""
    raw_feature_spec = tf_transform_output.raw_feature_spec()
    raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
    transformed_features = model.tft_layer(raw_features)
    return transformed_features

  return {
      'serving_default': serve_tf_examples_fn,
      'transform_features': transform_features_fn
  }

Then update the EvalConfig:

eval_config = tfma.EvalConfig(
      model_specs=[
          tfma.ModelSpec(
              signature_name='serving_default',
              preprocessing_function_names=['transform_features'])
              label_key='...')
      ],
      ...)
axeltidemann commented 3 years ago

Awesome, thank you once again @mdreves for your insightful input.

UsharaniPagadala commented 3 years ago

@kai-zhong

Could you please confirm if this issue can be closed.Thanks

vr-devil commented 3 years ago

@UsharaniPagadala yeah, it can be closed. thanks for everyone.

google-ml-butler[bot] commented 3 years ago

Are you satisfied with the resolution of your issue? Yes No