apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Task]: Make beam.Row() more robust for cross-language use #25749

Open ahmedabu98 opened 1 year ago

ahmedabu98 commented 1 year ago

What needs to happen?

TL;DR: There are two main issues here. The first is that there is no way to dynamically create beam.Row() elements to be used in a cross-language context. The second is that beam.Row() doesn't have type inferencing and defaults to Any, which is not understood by Java.

StorageWriteToBigQuery is a wrapper for a SchemaAwareExternalTransform that takes beam.Row()'s. The following creates beam.Row() elements from dicts:

output = (p
     | beam.Create([{"num": 1}, {"num": 2}])
     | beam.Map(lambda el: beam.Row(**el))
     | beam.io.StorageWriteToBigQuery(table=table))

But this gives me java.lang.IllegalArgumentException: Unknown Coder URN beam:coder:pickled_python:v1. This is probably because of the ** operator. Is there another way of creating beam.Row()'s dynamically?

I tried setting the args directly:

output = (p
     | beam.Create([{"num": 1}, {"num": 2}])
     | beam.Map(lambda el: beam.Row(num=el['num']))  <----
     | beam.io.StorageWriteToBigQuery(table=table))

But looks like there is no type inferencing and instead defaults to Any:

java.lang.IllegalArgumentException: Failed to decode Schema due to an error decoding Field proto:

name: "num"
type {
  nullable: true
  logical_type {
    urn: "beam:logical:pythonsdk_any:v1"
  }
}

The following works fine (explicitly setting int):

output = (p
     | beam.Create([{"num": 1}, {"num": 2}])
     | beam.Map(lambda el: beam.Row(num=int(el['num'])))  <----
     | beam.io.StorageWriteToBigQuery(table=table))

Issue Priority

Priority: 2 (default / most normal work should be filed as P2)

Issue Components

chamikaramj commented 1 year ago

@robertwb looking at https://github.com/apache/beam/pull/11901, seems like we did add some support for correctly inferencing the Row type, is this broken/incomplete somehow ?

Or is it just a matter of a missing type hint or a "with_output_types" hint somewhere ?

robertwb commented 1 year ago

In order to infer that lambda el: beam.Row(**el) is of a concrete type, would have to know that el has type Dict with the given keys and value types (which is feasible, but not done yet). What we do support is Row(num=int(...), ...). Or one can use with_output_types.

Abacn commented 1 year ago

I think #11901 is working here. That PR added method to create schema from typehint.

The typehint of this lambda is int:

beam.Map(lambda el: beam.Row(num=int(el['num'])))

so it works.

This lambda does not have typehint so it gives Any:

 beam.Map(lambda el: beam.Row(num=el['num']))
chamikaramj commented 1 year ago

Is there a way to use beam.Row in with_output_types for proper type inference, without defining a custom type ?

ahmedabu98 commented 1 year ago

Actually, getting some luck with RowTypeConstraint. The following works fine:

output = (p
     | beam.Create([{"num": 1, "name": "a"}, {"num": 2, "name": "b"}])
     | beam.Map(lambda el: beam.Row(**el))
          .with_output_types(beam.row_type.RowTypeConstraint.from_fields([
          ('num', int),
          ('name', str)
        ]))
     | beam.io.StorageWriteToBigQuery(table=table))
chamikaramj commented 1 year ago

Thanks. If that works, we should document that.