ironSource / parquetjs

fully asynchronous, pure JavaScript implementation of the Parquet file format
MIT License
345 stars 173 forks source link

Streaming new records into an existing parquet file in S3 #125

Open designreact opened 3 years ago

designreact commented 3 years ago

I'm attempting to aggregate records by id as they are processed from SQS via lambda into S3.

I do get a merged file uploaded into S3 as I can see the filesize increasing each time the lambda runs, but when using parquet-tools to inspect the data I only see one result. I have a feeling this is due to multiple headers being set in the file and parquet-tools is only reading the latest entry.

Can anyone help me figure out a way to adapt my approach using part of the parquetjs library? The aim is to correctly stitch together the streams, I think I need to parse the oldStream chunks into parquet rows and then write them into a new write stream but being new to parquet and parquetjs I don't know where to start.

My approach could well be a poor one, but if possible I'd rather not create / maintain another process, e.g. a cloudwatch scheduled event to aggregate and repartition my data.

I think this may relate to: https://github.com/ironSource/parquetjs/issues/120

Thank to all the contributers for all your hard work, this is a great library from what I've seen so far 👍🏻

My current approach (though a little broken):

Read existing parquet file as stream

const oldStream = new Stream.PassThrough();
getS3FileStream(Bucket, Key) // Key: guid=xyz/year=2021/month=04/2021.04.xyz.parquet
  .pipe(oldStream);

Create new parquet stream from SQS record

const recordStream = new Stream.PassThrough();
createRecordStream(record) // formats SQS Record data inline with schema
  .pipe(StreamArray.withParser())
  .pipe(new parquet.ParquetTransformer(schema, { useDataPageV2: false }))
  .pipe(recordStream);

Merge streams together

const combinedStream = new Stream.PassThrough();
mergeStream(oldStream, recordStream)
  .pipe(combinedStream);

Upload to S3

const upload = s3Stream.upload({
  Bucket,
  Key // Key: guid=xyz/year=2021/month=04/2021.04.xyz.parquet
});
combinedStream.pipe(upload);
designreact commented 3 years ago

Using the parquets fork which adds an openBuffer method. I've managed to get this working using ParquetReader and ParquetWriter, but not via streaming 😔

If my understanding is correct this means the whole file will be loaded into memory to perform updates. Given my S3 Key partitioning the size of my sqsRecords this may not be an issue in the short term but is not really the solution I was hoping for.

const upload = s3Stream.upload({ Bucket,  Key });
const existingRecords = await getS3FileBuffer(Bucket, Key);

const reader = await parquet.ParquetReader.openBuffer(existingRecords);
const writer = await parquet.ParquetWriter.openStream(schema, upload);

let cursor = reader.getCursor();
let rec = null;
while (rec = await cursor.next()) {
  writer.appendRow(rec)
}
writer.appendRow({ value: transformRecord(sqsRecord) });
writer.close();
paulocoghi commented 3 years ago

Did you find any alternative solution, @designreact? I also plan to save to S3.

aijazkhan81 commented 2 years ago

@paulocoghi , @designreact , did you find any way to upload the file to s3?

mmuller88 commented 2 years ago

that seems to work:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';

const createUploadStream = (
  fileName: string,
): { passStream: stream.PassThrough; parallelUploadS3: Upload } => {
  const passStream = new stream.PassThrough();
  const uploadKeyName = fileName + '.parquet';
  const parallelUploadS3 = new Upload({
    client: new S3Client({}),
    queueSize: 4,
    params: {
      Bucket: downloadBucketName,
      Key: uploadKeyName,
      Body: passStream,
    },
  });

  parallelUploadS3.on('httpUploadProgress', (progress) => {
    console.log(`Download part: ${progress.part}`);
  });

  return { passStream, parallelUploadS3 };
};

export const handler = async (event: DownloadLambdaEvent) => {
  console.log('Processing event: ', JSON.stringify(event, null, 2));
  try {

const { parallelUploadS3, passStream } = createUploadStream(event.fileName);

https.get(`${DownloadUrlPrefix}${event.fileName}.zip`, (fileStream) => {
      fileStream.pipe(unzipper.ParseOne()).pipe(ndjson.parse()).pipe(new parquet.ParquetTransformer(schema, opts)).pipe(passStream);
    });
    ...
maelp commented 6 months ago

Does someone know how we would do this for Google Cloud bucket?