Open kri5t opened 2 years ago
I found a rather hacky way of doing this:
import { Injectable } from '@nestjs/common';
import * as avro from 'avsc';
import * as crc32 from 'buffer-crc32';
import * as snappy from 'snappy';
import { Readable } from 'stream';
@Injectable()
export class AvroSchemaFileExtractorService {
snappyDecoder: avro.Codec = (buf, callback) => {
const checksum = buf.slice(buf.length - 4, buf.length);
const payload = buf.slice(0, buf.length - 4);
try {
const inflated = snappy.uncompressSync(payload, {});
if (inflated) {
if (!checksum.equals(crc32(inflated))) {
callback(new Error('invalid checksum'));
return;
}
callback(null, Buffer.isBuffer(inflated) ? inflated : Buffer.from(inflated));
}
} catch (err) {
callback(err);
return;
}
};
async get(fileStream: Readable): Promise<string | null> {
return new Promise(async (resolve, reject) => {
let schema = null;
const blockDecoder = new avro.streams.BlockDecoder({
codecs: { snappy: this.snappyDecoder },
}).on('metadata', (type, codec, header) => {
console.log(header);
const extractedSchema = JSON.parse(header.meta['avro.schema'].toString());
schema = extractedSchema;
});
const blockDecoderStream = fileStream.pipe(blockDecoder);
for await (const _chunk of blockDecoderStream) {
if (schema) {
resolve(schema);
} else reject('No schema found');
}
});
}
}
I still think it would be great with a more official way of doing this.
Hi @kri5t. Your approach of using a BlockDecoder
and listening to the 'metadata'
event sounds right. Two quick suggestions:
events.once
could simplify the implementation significantly.I'm open to adding this as a helper function if you're interested in submitting a PR.
I've been looking through your library and I found the
extractFileHeader
and it works great. My only issue is that we are running in a cloud environment and dealing with rather large avro files (48gb). Having to download that file onto the docker image and inspect it is rather in efficient.I've been trying to modify your method to allow me to take a
Readable
instead of the actualpath
but it turns out too many inner methods are being used in theextractFileHeader
for it to be feasible. I've done something like this:But again the inner methods are not exposed and I cannot access them. Would it be possible to include a more cloud friendly version that accepts a stram instead of a path?