mtth / avsc

Avro for JavaScript :zap:
MIT License
1.27k stars 147 forks source link

How to add custom attributes to serialised schema type eg. sqlType #431

Closed buzzware closed 1 year ago

buzzware commented 1 year ago

For avro files destined for BigQuery import, Google suggests defining the type as

{"type": "string", "sqlType": "JSON"}

https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#extract_json_data_from_avro_data

This is very useful as it means avro files including JSON fields can be loaded straight into BigQuery using the schema in the file. While AVRO doesn't literally support json fields, strings in the file loaded into JSON type fields in the database is a decent compromise.

However, avsc ignores the sqlType attribute.

How do I maintain that attribute when the buffer schema is written? Or how do I create a Type with additional attributes that will will be serialised ?

buzzware commented 1 year ago

I see a similar question in #307 but it seems to have been closed without an answer

buzzware commented 1 year ago

This works with to/fromBuffer, but fails when I use BlockEncoder - will submit a test case later. It seems I need to override _resolve() but how do I implement it ? I see a lot of test cases using to/fromBuffer but none using BlockEncoder with logical types.

It would be nice if I could just :

1) override StringType and add attributes, but only LogicalType supports attributes OR 2) just provide the exact json schema I want and have the library write it verbatim, perhaps with a rawSchema flag.

class GoogleJson extends LogicalType {

    _export(attrs) {
        attrs.sqlType = 'JSON'
    };

    _toValue(input) {
        return JSON.stringify(input);
    }

    _fromValue(input) {
        return JSON.parse(input);
    }
}

const schema = {
    name: 'Thing',
    type: 'record',
    fields: [
        {name: 'amount', type: 'int'},
        {name: 'calc', type: {type: 'string', logicalType: 'google-json'}}
    ]
};

const thingAvroType = Type.forSchema(
    //@ts-ignore
    schema,
    {logicalTypes: {'google-json': GoogleJson}}
);

describe('GoogleJson', () => {

    it('buffer', async () => {
        const thing = {
            amount: 32,
            calc: {a: 1, b: 2}
        };
        const buf = thingAvroType.toBuffer(thing);
        const thing2 = thingAvroType.fromBuffer(buf);
        expect(thing2).toMatchObject(thing);
    });
});
mtth commented 1 year ago

Hi @buzzware. The best option with decorated schemas is typically to keep a copy alongside the generated type and reference it directly when the custom attributes are needed. However this doesn't work well with BlockEncoders which expect a single schema or type argument currently. I think it would be worth extending the BlockEncoder API to better support this, for example via an additional schema option. In the meantime here are a couple workarounds:

  1. If you don't need any type options when writing records, you can pass in the raw schema directly to the BlockEncoder which will then be written as-is.
  2. If you do need type options, it's a bit trickier but you can use two encoders where the first will only write the header. Something like:
async function pipedBlockEncoder(type, schema, writable) {
  const syncMarker = crypto.randomBytes(16);
  // Header-only encoder (note the schema argument)
  const prelude = new BlockEncoder(schema, {writeHeader: true, syncMarker});
  prelude.end();
  await pipeline(prelude, writable, {end: false}); // from node:stream/promises
  // Data encoder (we pass in the type here, not the schema)
  const content = new BlockEncoder(type, {writeHeader: false, syncMarker});
  content.pipe(writable);
  return content;
}
buzzware commented 1 year ago

Thanks @mtth, I just got it working for the first time with that, including import to BigQuery with an auto-generated JSON column.

import {createFileDecoder, createFileEncoder, Schema, schema, streams, Type, types,} from "avsc";
import fs = require("fs");
import path = require("path");
import BlockEncoder = streams.BlockEncoder;
import {randomBytes} from "crypto";
const { finished, pipeline } = require('node:stream/promises');

describe('GoogleJson', () => {

    async function pipedBlockEncoder(type, schema, writable) {
        const syncMarker = randomBytes(16);
        // Header-only encoder (note the schema argument)
        const prelude = new BlockEncoder(schema, {writeHeader: true, syncMarker});
        prelude.end();
        await pipeline(prelude, writable, {end: false}); // from node:stream/promises
        // Data encoder (we pass in the type here, not the schema)
        const content = new BlockEncoder(type, {writeHeader: false, syncMarker});
        content.pipe(writable);
        return content;
    }

    it('mtth file example', async () => {
        const thing = {
            amount: 32,
            calc: JSON.stringify({a: 1, b: 2})
        };
        const testFile = '/Users/gary/Downloads/avro_test.avro';
        fs.rmSync(testFile,{force: true});

        const schema: Schema = {
            name: 'Thing',
            type: 'record',
            fields: [
                {name: 'amount', type: 'int'},
                {name: 'calc', type: {type: 'string', sqlType: 'JSON'}}
            ]
        };
        const type = Type.forSchema(schema);
        let writeable = fs.createWriteStream(testFile, {encoding: 'binary'});
        let encoder = await pipedBlockEncoder(type,schema,writeable)
        encoder.write(thing);
        encoder.write(thing);
        encoder.write(thing);
        encoder.end();
        await finished(writeable);
        console.log('end');
    });
})