apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.68k stars 4.19k forks source link

[Bug]: Python TypeError when converting Avro `logicalType` `timestamp-millis` to Beam Schema #31656

Open gergely-g opened 2 weeks ago

gergely-g commented 2 weeks ago

What happened?

When importing Avro files that have schemas with the field type:

{"type": "long", "logicalType": "timestamp-millis"}

an attempt to convert the collection to Beam Schemas (beam.Row) will fail with a TypeError in RowCoderImpl in the form:

apache_beam/coders/coder_impl.py:209: in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
    ???
apache_beam/coders/coder_impl.py:248: in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
    ???
apache_beam/coders/coder_impl.py:1824: in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
    ???
E   TypeError: an integer is required [while running 'ReadFromAvro/Map(<lambda at avroio.py:633>)']

Avro files with timestamp-millis logical types are created for example when a BigQuery table data with column type TIMESTAMP gets exported to .avro files.

Unit test to repro the issue:

import datetime
import tempfile
import unittest

import apache_beam as beam
import fastavro

class TestBeamSchemaConversions(unittest.TestCase):
    def test_convert_timestamp_millis(self):
        """Demonstrate bug: Avro-to-Beam schema conversion cannot handle timestamp-millis logical type."""
        avro_schema = {
            "type": "record",
            "name": "Test",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
            ],
        }

        # Write test records into a temp file using fastavro.
        with tempfile.NamedTemporaryFile(delete=True) as input_avro_file:
            fastavro.writer(
                input_avro_file,
                avro_schema,
                [
                    {"name": "Alice", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
                    {"name": "Bob", "timestamp": datetime.datetime(2024, 6, 3, 14, 14, 4, 765000, tzinfo=datetime.timezone.utc)},
                ],
                validator=True,
            )
            input_avro_file.flush()

            with self.assertRaises(TypeError) as context:
                with beam.Pipeline() as p:
                    avro_records = p | beam.io.ReadFromAvro(input_avro_file.name, as_rows=True)
                    avro_records | beam.LogElements()

            self.assertTrue("an integer is required" in str(context.exception))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

gergely-g commented 2 weeks ago

Part of the solution could be in avro_value_to_beam_value()

Checking for the beam_type matching this FieldType:

    logical_type {
        urn: "beam:logical_type:millis_instant:v1"
        representation {
            atomic_type: INT64
        }
    }

The converter could return Timestamp.of to convert from datetime.datetime

mls3odp commented 6 days ago

.take-issue

mls3odp commented 6 days ago

.take-issue