apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.85k stars 4.25k forks source link

[Bug]: Reading from BigQuery provides inconsistent schemas #28151

Open robertwb opened 1 year ago

robertwb commented 1 year ago

What happened?

When doing a BigQuery Read like

p | beam.io.ReadFromBigQuery(
    table='apache-beam-testing:beam_bigquery_io_test.taxi_small',
    output_type='BEAM_ROW')

the TIMESTAMP fields are converted to fields of schema type Field{name=event_timestamp, description=, type=LOGICAL_TYPE<beam:logical_type:micros_instant:v1>, options={{}}} whereas in Java they are converted into (incompatible) fields of schema type Field{name=event_timestamp, description=, type=DATETIME, options={{}}}.

The Python one is probably the one that is wrong here. In addition, one cannot write elements of this type to another BigQuery table as one gets

  File "/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py", line 261, in process
    writer.write(row)
  File "/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/io/gcp/bigquery_tools.py", line 1432, in write
    return self._file_handle.write(self._coder.encode(row) + b'\n')
  File "/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/io/gcp/bigquery_tools.py", line 1379, in encode
    return json.dumps(
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/__init__.py", line 234, in dumps
    return cls(
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/io/gcp/bigquery_tools.py", line 152, in default_encoder
    raise TypeError(
TypeError: Object of type 'Timestamp' is not JSON serializable [while running 'WriteToBigQueryHandlingErrors/WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

JoeCMoore commented 1 year ago

This occurs when using the BigQuery streaming write API too. My bigquery.Timestamp schema field type is translated to DATETIME which is causing an input schema output schema mismatch error.

bigquery.Timestamp seems to stem from python datetime.datetime but even when formatting stamps in this manner it still translated the LOGICAL_TYPE<beam:logical_type:micros_instant:v1> to DATETIME.

The thread here seems to suggest it's been fixed but this might be in a pre-release not current (2.49.0 at the time of writing). Do we have an ETA on this fix?

We can use the legacy streaming API but data in the streaming buffer is not queryable for ~2-3 minutes removing the possibility of any sort of real time queries.

Abacn commented 1 year ago

@JoeCMoore For BigQuery streaming write API use case please try add

LogicalType.register_logical_type(MillisInstant)

before pipeline creation. This would solve the issue.

This was due to #22679 makes the source-of-truth for shcema translation to be in python side, and then logical types with same language types has conflict (MillisInstant, MicrosInstant). This workaround was used to fix tests: https://github.com/apache/beam/commit/b0484e792bc615b7494b5195e3ef3d464ec7e426

I see this reports a couple of times. Indeed we need to figure out a way for long term fix

johnjcasey commented 11 months ago

@Abacn is there a way to fix this long term? it seems like we should just have millisinstant

Abacn commented 11 months ago

@Abacn is there a way to fix this long term? it seems like we should just have millisinstant

I tried to make it default but it broke many coder unit tests: https://github.com/apache/beam/pull/29182

I do not have a good idea for the long term fix unless we remove joda time in Java library, or at least use java.time.Instant in all IOs