apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

[Feature Request]: Less strict numeric conversions writing to BigQuery with Storage API #25228

Open bastewart opened 1 year ago

bastewart commented 1 year ago

What would you like to happen?

Currently a number of type conversions are very strict in TableRowToStorageApiProto leading to potentially unnecessary loss of data.

As an example the INT64 conversion will fail if the value arriving is a Java Double, even if that double could be cast to an integer will no loss of data/precision. Below the double 36183.0 fails to convert to an INT64 even though a more optimistic conversion would succeed:

Exception: org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaDoesntMatchException: Unexpected value :36183.0, type: class java.lang.Double. Table field name: <snip>, type: INT64'}

(The colon is misleading, it was a typo that's been fixed in a newer commit.)

We could have used Number.longValue (or Math.round) here and verified there was no loss of precision by checking equality afterwards.

Code here for this example: https://github.com/apache/beam/blob/634b0453469b66ee4c135aca48b02d2425916f36/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L691-L712

In the current implementation I don't think NUMERIC and BIGNUMERIC do not accept BigInteger at the moment either. I think my issue and this would be solved with a fall-through which optimistically converts all Numbers to a long and checking for loss of precision afterwards.

In general this coupled with failing rows with any invalid values (#25227) means it's quite easy for a lot of data to fail to write.

I'd be happy to open a PR here, but have never contributed here and Java isn't my primary language, so it may be a little slow...

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

reuvenlax commented 1 year ago

This is an interesting request. Even in the above example, not all INT64 values can be converted to Java double. e.g. the following code:

long longValue = XXXX; double doubleValue = (double) longValue; assert(longValue == (long) doubleValue);

will often fail, especially for large numbers (e.g. try it out for 499999999000000001L).

I guess we could convert optimistically and fail if the specific number happens to lose precision, however I'm uncomfortable with that. That's a recipe for a pipeline that works for a while and then suddenly starts behaving poorly as the input data changes, which is very confusing. I think understandable and easily-predictable behavior is better.

reuvenlax commented 1 year ago

It also should be easy for any user to write a ParDo (or MapElements) that does their own numeric conversions before calling BigQueryIO.

bastewart commented 1 year ago

Sorry, I had missed you reply!

Summary on this message is I think in our case it's very hard to do the conversion unfortunately. I think it's also reasonable to treat 1.0 as an integer, JSON schema does for example. I also think it might also be a regression over the Streaming API implementation (at least this with #25227 and #25233 means there is a regression).

Details

Even in the above example, not all INT64 values can be converted to Java double. e.g. the following code:

Sorry, I may be misunderstanding, but that's not not the way round we have the issue.

Our issue is Java doubles arriving and needing to be converted to an INT64 for BigQuery. In that case - by my understanding (see my fix commit) - it's trivial to optimistically convert it to a long and fail if you've lost precision.

It also should be easy for any user to write a ParDo (or MapElements) that does their own numeric conversions before calling BigQueryIO.

Unfortunately in our case this is very-much non-trivial 😅

We write to 1000+ tables which have dynamic schemas*. This means we're relying on Beam to load the schema and convert the values for us. I think we'd have to re-implement, or use directly, most of StorageApiDynamicDestinations to grab schema info, and then do our own traversal of all data before handing it over to BigQueryIO.

*I am aware that automatic schema reloading in the Beam Storage Writes API is not yet released...

We also don't have direct/easy control over the types are the arrive. We're consuming a JSON blob off Kafka and bundling the data into BigQuery. Those JSON blobs have been validated against the BigQuery schema using JSON Schema. JSON schema validates numbers with 0 fractional part as an integer, so this would be hard for us to guard against.

More generally, and personally, I feel like 1.0 should be treated as an integer. It's extremely easy for that kind of mis-conversion to occur, and being overly strict just leads to unexpected consequences. This issue coupled with #25227 and #25233 means that a lot of rows can be dropped "silently" and were after switching from the Streaming API to the Storage Write API.

Streaming API Behaviour

I need to check (will get back to you later!), but I am reasonably sure the Streaming API allows this type of coercion to occur. I'll double check and get back to you though.

bastewart commented 1 year ago

Yeah, I have experimented and the Streaming API is much more lenient.

In general there can be extraneous (and "missing") decimal portions for all the numeric types. These are all valid JSON representations in the Streaming API for all of INTEGER (INT64), NUMERIC, BIGNUMERIC and FLOAT (FLOAT64):

Any non-zero decimal portion is not valid for an INTEGER though. So these are valid only for everything else:

This does mean that in addition to my fix for double/float -> long the current handling of String when an INT64 is desired will fail for something like "1.0", where the Streaming API would accept it: https://github.com/apache/beam/blob/dcfadddf52ae68a8d3d605e3a072c5d611b1fe14/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L693-L695