ZJONSSON / parquetjs

fully asynchronous, pure JavaScript implementation of the Parquet file format
MIT License
34 stars 61 forks source link

Add openS3 Compatibility for AWS SDK v3 #64

Open entitycs opened 3 years ago

entitycs commented 3 years ago

Without changing parameter list for openS3 function, allows the function to take as input required arguments for use with version 3 of the AWS SDK S3 client (S3Client, HeadObjectCommand, and GetObjectCommand).

Checks for existence of client.getObject (v2) in the client argument before attempting to destructure the client argument into an S3Client, HeadObjectCommand, and GetObjectCommand. The actual implementation is abstracted down from the ParquetReader class (called by the developer) to the ParquetEnvelopeReader class (called by ParquetReader).

The S3Client should be an instantiated client object, and the commands should be passed in just as they are after being destructured/modularized from the require statement. This allows for the ParquetEnvelopeReader class to set ranges on "getObject" calls for v3, as it does for v2 S3 clients (as command inputs are readonly in version 3 of the SDK and must be set per 'new' command). It also allows the developer to set middleware on their client if they wish.

The following example was added to README.md

//v3 example
const {S3Client, HeadObjectCommand, GetObjectCommand} = require('@aws-sdk/client-s3');
const client = new S3Client({region:"us-east-1"});
let reader = await parquet.ParquetReader.openS3(
  {S3Client:client, HeadObjectCommand, GetObjectCommand},
  params
);
thenickdude commented 3 years ago

The readable() loop you use to receive the response from S3 delivers the response buffer as soon as there are no bytes waiting to be read from S3, this truncates the read before the response from S3 actually finishes and so loses data.

I built a testbed for this here which demonstrates the problem. It reads a local Parquet file, which is successful, then reads that same file through an S3 emulator, and that fails:

https://github.com/thenickdude/parquetjs-s3v3-test

Logging shows that it asks for the byte range '21578-201998', the first chunk available from the S3 stream is 65209 bytes long, and the read loop terminates here without reading the rest of the reply (the full reply length is 180421).

thenickdude commented 3 years ago

The test passes if I replace that routine with:

        let body = await new Promise ((resolve, reject) => {
          let bodyBuffer, data;
          getObjectCommand.Body.on('end', function() {
            resolve(bodyBuffer);
          });
          getObjectCommand.Body.on('readable', function() {
            while (data = this.read()) {
              if (bodyBuffer) {
                bodyBuffer = Buffer.concat([bodyBuffer, data]);
              } else {
                bodyBuffer = data;
              }
            }
          });
        });