apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.77k stars 4.21k forks source link

[Bug]: Precision/context loss in schema when writing to BQ with Avro #25567

Open nbali opened 1 year ago

nbali commented 1 year ago

What happened?

When working with BigQueryIO.Write.withAvroFormatFunction() the function's input is AvroWriteRequest<T>, which is essentially T with an Avro Schema. The problem is how this provided schema gets created, and how it's being used.

For that a configurable withAvroSchemaFactory method exists, but what it accepts is essentially a SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema>, and this is the problem. TableSchema might already lost context. For example Avro knows INT and LONG, BQ recognizes no difference. So at the format function we might receive a schema that is different from our desired one, and there is no reversible way to transform it back.

Using our desired schema at the format function doesn't work either, because the DataFileWriter in AvroRowWriter already uses a DatumWriter using the provided Schema that might fail.

To give you a more exact example, I had a POJO with an int field, and already methods to provide me with the Beam Row and Schema and Avro Schema, and GenericRecord for that POJO as I need that for other purposes. The TableSchema returned by the DynamicDestinations obviously contains long due to supported BQ types, so the schemafactory-generated Avro schema also contains long. Meanwhile the Row and GenericRecord provided by my custom code contains int as the POJO did as well. ... and you guessed well, an exception happens when it tries to use an int as long during writing.

My "methods" actually use toAvroSchema and toGenericRecord from org.apache.beam.sdk.schemas.utils.AvroUtils, so it's not some custom code, but internal Beam code.

So to sum things up IMO the SchemaFactory should have the ability to use more context/infos than just a TableSchema. Given how it's being called at https://github.com/apache/beam/blob/40838f76447a5250b52645210f26dce3655d7009/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java#L143-L145 using the destination might already be helpful.

Code to reproduce:

package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Preconditions;

import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.io.gcp.bigquery.RowWriterFactory.AvroRowWriterFactory;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;

public class Example {

    public static class Foo {

        public int intField;

        public int getIntField() {
            return intField;
        }

        public void setIntField(int intField) {
            this.intField = intField;
        }
    }

    public static void main(String[] args) throws Exception {

        final JavaBeanSchema beamSchemaProvider = new JavaBeanSchema();
        final TypeDescriptor<Foo> fooTypeDescriptor = TypeDescriptor.of(Foo.class);

        final Schema schema = beamSchemaProvider.schemaFor(fooTypeDescriptor);
        System.out.println(schema.getFields().get(0));
        Preconditions.checkState(schema.getFields().get(0).getType() == FieldType.INT32);

        final SerializableFunction<AvroWriteRequest<Foo>, GenericRecord> avroFormatFunction = req -> {
            final Foo foo = req.getElement();
            final Row beamRow = beamSchemaProvider.toRowFunction(fooTypeDescriptor).apply(foo);

            System.out.println(beamRow.getSchema().getFields().get(0));
            Preconditions.checkState(beamRow.getSchema().getFields().get(0).getType() == FieldType.INT32);
            System.out.println(req.getSchema().getFields().get(0));
            Preconditions.checkState(req.getSchema().getFields().get(0).schema().getType() == Type.LONG); // !!!

            // fails with all
            // return AvroUtils.toGenericRecord(beamRow);
            // return AvroUtils.toGenericRecord(beamRow, req.getSchema());
            // return AvroUtils.toGenericRecord(beamRow, AvroUtils.toAvroSchema(schema));
            return AvroUtils.toGenericRecord(beamRow, AvroUtils.toAvroSchema(beamRow.getSchema()));
        };

        final DynamicDestinations<Foo, String> destinations = new DynamicDestinations<>() {
            @Override
            public String getDestination(ValueInSingleWindow<Foo> element) {
                return "foo";
            }

            @Override
            public TableDestination getTable(String destination) {
                throw new UnsupportedOperationException();
            }

            @Override
            public TableSchema getSchema(String destination) {
                final TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
                System.out.println(tableSchema.getFields().get(0));
                Preconditions.checkState(tableSchema.getFields().get(0).getType().equals(StandardSQLTypeName.INT64.name())); // !!!
                return tableSchema;
            }
        };

        final SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>> writerFactory =
                org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.GENERIC_DATUM_WRITER_FACTORY;

        AvroRowWriterFactory<Foo, GenericRecord, String> rowWriterFactory =
                RowWriterFactory.avroRecords(avroFormatFunction, writerFactory);
        rowWriterFactory = rowWriterFactory.prepare(destinations, DEFAULT_AVRO_SCHEMA_FACTORY);

        try (BigQueryRowWriter<Foo> rowWriter = rowWriterFactory.createRowWriter(null, null)) {
            Foo foo = new Foo();
            foo.intField = 1234;
            rowWriter.write(foo); // FAILS HERE: java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.lang.Long
        }
    }

    // copied from BigQueryIO
    private static final SerializableFunction<TableSchema, org.apache.avro.Schema> DEFAULT_AVRO_SCHEMA_FACTORY =
            (SerializableFunction<TableSchema, org.apache.avro.Schema>) input -> BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields());

}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

nbali commented 1 year ago

I could even contribute this, if it gets the greenlight that conceptually it is acceptable.

Abacn commented 1 year ago

I understand the current behavior is more like a promoting/casting INT -> LONG (and if also happens on other types) to align the avro schema to the final BigQuery table destination schema.

If the consideration is cost (INT uses smaller space than LONG), BigQuery storage API is more promising (though the work is ongoing). CC: @reuvenlax

nbali commented 1 year ago

Using long instead of int in the original class would be an ugly workaround IMO. The developers shouldn't have to consider this loss of information when writing the code. Not to mention it might be impossible for the developer to change the type of the problematic field. (Remapping to a new class is an even uglier workaround).

The mentioned example was just that. An example. There could be other lost info there as well. Anything that any schema representation contains, Avro could also contain, but does not exist in TableSchema.

nbali commented 1 year ago

The fact that the labels have changed means my idea is viable, and I can contribute it, or that the issue is valid, but it doesn't mean anything regarding my proposed solution? I still only want to contribute if it has a chance of being accepted.

nbali commented 1 year ago

(I would prefer to implement this while I'm still having available hours for OSS contributions, so any feedback would be appreciated to my previous questions.)