apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.72k stars 4.21k forks source link

[Bug]: AttributeError: 'UnionConstraint' object has no attribute 'inner_type'. Did you mean: 'inner_types'? #30019

Open lazarillo opened 6 months ago

lazarillo commented 6 months ago

What happened?

I am trying to fully use the type system as recommended, but I ran across an error entirely within the typehint package itself.

The error is: AttributeError: 'UnionConstraint' object has no attribute 'inner_type'. Did you mean: 'inner_types'?

The error comes from def get_yielded_type(type_hint): found in the typehints module.

It is difficult to tell what caused it due to the relatively large call stack within transforms and typehints, and very little information in my code. (In my code, it only references that pipeline had a problem... nothing more specific.

However, in trying to debug it, I think it is due to me returning both an Iterable of a dict and an Iterable of a TaggedOutput. For instance, I have the following relatively simple AddField DoFn, where I am also piping any errors to a different PCollection:

class AddField(DoFn):
    """Add field(s) to an element in a dictionary.

    Failures will be tagged with the provided tag, which defaults to "fail". Normal
    output is not tagged, so it will be captured as the "main" output.
    """

    def __init__(self, fail_tag: str = FAIL_TAG):
        """Add field(s) to an element in a dictionary.

        Failures will be tagged with the provided tag, which defaults to "fail". Normal
        output is not tagged, so it will be captured as the "main" output.
        """
        super().__init__()
        self.fail_tag = fail_tag

    def process(
        self, element: dict[str, Any], **fields
    ) -> Iterable[dict[str, Any]] | Iterable[pvalue.TaggedOutput]:
        """Add field(s) to an element in a dictionary.

        Failures will be tagged with the provided tag, which defaults to "fail". Normal
        output is not tagged, so it will be captured as the "main" output.
        """
        if len(fields) == 0:
            yield element
        else:
            failure = None
            for name, val in fields.items():
                if name in element:
                    failure = Failure(
                        pipeline_step="AddField",
                        element=element,
                        exception=ValueError(
                            f'Field "{name}" already exists in "{element}".'
                        ),
                    )
                    break
                element[name] = val
            if failure is None:
                yield element
            else:
                yield pvalue.TaggedOutput(self.fail_tag, failure)

Failure is a simple dataclass:

@dataclass
class Failure:
    """Object to capture failure information, to route it to another sink."""

    pipeline_step: str
    element: Any
    exception: Exception

I don't know that it is this DoFn that is failing, or which one specifically, but it captures the paradigm that I am trying to use: either return an iterable of a dict or an iterable of a tagged_output. Or it could be happening in the PTransform that receives it. All I know for sure is the final typehint function that is failing because it is looking for the inner_type attribute, when only the inner_types() method is available.


Details:

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

lazarillo commented 6 months ago

For the brave at heart, here is the call stack. But like I say above, I don't think it actually helps much:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/runner/work/phalanx/phalanx/data/dataflow/jobs/aggregatefakeperson/src/aggregatefakeperson/process.py", line 51, in <module>
    run(phxdataflow)
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/aggregatefakeperson/main.py", line 81, in run
    pipeline
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 1110, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 623, in __ror__
    result = p.apply(self, pvalueish, label)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pipeline.py", line 678, in apply
    return self.apply(transform, pvalueish)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pipeline.py", line 731, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 203, in apply
    return self.apply_PTransform(transform, input, options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform
    return transform.expand(input)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/phxdataflow/functions.py", line 537, in expand
    pval.pipeline
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pipeline.py", line 667, in apply
    return self.apply(
           ^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pipeline.py", line 678, in apply
    return self.apply(transform, pvalueish)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pipeline.py", line 731, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 203, in apply
    return self.apply_PTransform(transform, input, options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform
    return transform.expand(input)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 1826, in expand
    _ = pcoll | self._do_transform
        ~~~~~~^~~~~~~~~~~~~~~~~~~~
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/pipeline.py", line 729, in apply
    transform.type_check_inputs(pvalueish)
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py", line 926, in type_check_inputs
    type_hints = self.get_type_hints()
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/typehints/decorators.py", line 524, in get_type_hints
    self.default_type_hints()).with_defaults(
    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 1567, in default_type_hints
    return self.fn.get_type_hints()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/typehints/decorators.py", line 524, in get_type_hints
    self.default_type_hints()).with_defaults(
    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 745, in default_type_hints
    process_type_hints = process_type_hints.strip_iterable()
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/typehints/decorators.py", line 435, in strip_iterable
    yielded_type = typehints.get_yielded_type(output_type)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/hostedtoolcache/Python/3.11.7/x64/lib/python3.11/site-packages/apache_beam/typehints/typehints.py", line 1351, in get_yielded_type
    return type_hint.inner_type
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'UnionConstraint' object has no attribute 'inner_type'. Did you mean: 'inner_types'?
Waiting up to 5 seconds.
Sent all pending logs.
Error: Process completed with exit code 1.