apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Feature Request]: In Python how to assign types to multi-output tags? #26493

Open cozos opened 1 year ago

cozos commented 1 year ago

What would you like to happen?

I have a multi-output DoFn:

    class DoFn1:
        def process(self, row) -> Iterable[Union[Dict[str, Any], pvalue.TaggedOutput]]:
            if something:
                yield some_dict(...)
            else:
                yield pvalue.TaggedOutput("bad", ...)

And another DoFn that consumes its outputs

    class DoFn2:
        def process(self, row: Dict[str, Any]) -> Iterable[...]:
            if something:
                yield some_dict(...)
            else:
                yield pvalue.TaggedOutput("bad", ...)

And then when I use it like this:

    pcoll = ...
    pcoll = pcoll | "dofn1" >> beam.ParDo(DoFn1()).with_outputs(
        "bad",
        main="good",
    )
    pcoll["good"] | "dofn2" >> beam.ParDo(DoFn2())

I get an error that looks like this:

apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 'dofn2': requires Dict[str, Any] 
but got Union[Dict[str, Any], TaggedOutput] for row

How can I tell Beam type checking that the good tag gives Dict[str, Any], and bad gives TaggedOutput? Am I forced to propagate the tagged output type hints?

Also asked in SO here if not appropriate for Github: https://stackoverflow.com/questions/76151787/in-apache-beam-dataflow-multi-output-dofns-how-do-you-assign-type-hints-to-spec

I found a related discussion here: https://github.com/apache/beam/pull/9810

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

tvalentyn commented 1 year ago

Have you tried using .with_input_types ? https://github.com/apache/beam/blob/c31243ece6a78e46ea2a25147643143d0ab20d93/sdks/python/apache_beam/transforms/ptransform.py#L871

RXminuS commented 10 months ago

@tvalentyn I'm not quite understanding what you mean. I think this issue is more referring to the fact that it doesn't really seam possible to set the return type of a expand to be either a Tuple or a TaggedPCollection so that you get typed outputs for each of the keys/indexes.

I'm stuck on this as well breaking both static typing with a warning

WARNING  | apache_beam.typehints.decorators:strip_pcoll_helper:381 - This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[apache_beam.pvalue.PCollection[XXX], apache_beam.pvalue.PCollection[YYY]] instead.

Nor can I perform runtime type-checking correctly

tvalentyn commented 10 months ago

In the context of example in the description, I am asking whether we tried to replace

    pcoll["good"] | "dofn2" >> beam.ParDo(DoFn2())

to

    pcoll["good"] | "dofn2" >> beam.ParDo(DoFn2()).with_input_types(<correct typehint for elements belonging to good pcoll, which seems to be Dict[str, Any]>)
tvalentyn commented 10 months ago

also cc: @jrmccluskey who is an expert on this topic.

lazarillo commented 8 months ago

Any updates on this? I am having trouble with the same thing as the OP.

tvalentyn commented 8 months ago

@lazarillo have you tried using with_input_types on transforms that consume tagged output?

lazarillo commented 8 months ago

Yes, I have. I think you are correct and it will solve the issue. Unfortunately, I have multiple issues going on, so for the moment, I cannot fully confirm that it worked.

robmoore commented 4 months ago

I tried this approach with beam.io.WriteToCsv without success.

tvalentyn commented 4 months ago

@robmoore could you elaborate please?

robmoore commented 4 months ago

Certainly. I tried using beam.io.WriteToCsv(...).with_input_types(...) for one of my tagged outputs, passing in my named tuple subclass as the type. The typing hint apparently was ignored.

tvalentyn commented 3 months ago

Hmm, did you get a type hint violation error? Is there a minimal example that you can provide by chance?