ironSource / parquetjs

fully asynchronous, pure JavaScript implementation of the Parquet file format
MIT License
346 stars 175 forks source link

Example usage of ParquetWriter.openStream? #76

Open aconanlai opened 5 years ago

aconanlai commented 5 years ago

I saw that @asmuth implemented it here: https://github.com/ironSource/parquetjs/blob/master/lib/writer.js#L52 But I'm wondering if someone has a simple example of how to use this.

I've got it mostly working but I had to implement a close method on a PassThrough stream, I feel might be doing it incorrectly.

I'd be happy to submit a PR with some nice doc as well once I have it all figured out since I imagine this is a common enough use case.

dogenius01 commented 5 years ago

Similar issue. How to write parquet file using stream?

ali-habibzadeh commented 4 years ago

The easier way I have found is not using ParquetWriter.openStream but via an undocumented ParquetTransformer class. This class extends node's Transform stream and can be used inside a pipe as a step.

import { createReadStream, createWriteStream } from "fs";
import { ParquetSchema, ParquetTransformer } from "parquetjs";
// stream-json has some nice streaming tools for working with JSON
import * as StreamArray from "stream-json/streamers/StreamArray";

const reader = createReadStream("data-json.json"); // contains JSON Array
const destination = createWriteStream("countries.parquet");

const schema = new ParquetSchema({
  value: {
    fields: {
      Country: { type: "UTF8" },
      Indicator: { type: "UTF8" },
      Value: { type: "FLOAT" },
      Year: { type: "INT64" }
    }
  }
});

reader
  .pipe(StreamArray.withParser())
  .pipe(new ParquetTransformer(schema))
  .pipe(destination);
rkbsoftsolutions commented 4 years ago

The easier way I have found is not using ParquetWriter.openStream but via an undocumented ParquetTransformer class. This class extends node's Transform stream and can be used inside a pipe as a step.

import { createReadStream, createWriteStream } from "fs";
import { ParquetSchema, ParquetTransformer } from "parquetjs";
// stream-json has some nice streaming tools for working with JSON
import * as StreamArray from "stream-json/streamers/StreamArray";

const reader = createReadStream("data-json.json"); // contains JSON Array
const destination = createWriteStream("countries.parquet");

const schema = new ParquetSchema({
  value: {
    fields: {
      Country: { type: "UTF8" },
      Indicator: { type: "UTF8" },
      Value: { type: "FLOAT" },
      Year: { type: "INT64" }
    }
  }
});

reader
  .pipe(StreamArray.withParser())
  .pipe(new ParquetTransformer(schema))
  .pipe(destination);

Can you share the full example , I am reading stream from mongodb and wanto convert into parquet after that upload on S3

aijazkhan81 commented 3 years ago

@staronline1985 did you get this working? I am unable to understand how to push the file to s3.

magno32 commented 1 year ago

Late to the party, example below.

This parses a CSV file stream to the ParquetTransformer in question

const parquetSchema = new ParquetSchema({
    Foo: { type: 'UTF8' },
    Bar: { type: 'UTF8' }
  });

// Going to use output as the source for S3 upload
const output = new PassThrough();

// body from successful GetObjectCommand
(csvObjectResponse.Body as Readable)
    // CSV in question is gzipped
    .pipe(zlib.createGunzip())
    .pipe(
      parse({
        // CSV Options...
        // If the csv in question has a header row, skip it
        fromLine: event.headerRow ? 2 : 1
      })
    )
    .pipe(
      transform((record, callback) => {
        // From a 2 column CSV file (header [foo, bar])
        const [foo, bar] = record;
        callback(null, {
          Foo: foo,
          Bar: bar
        });
      })
    )
    .pipe(new ParquetTransformer(parquetSchema))
    .pipe(output);

I used https://github.com/aws/aws-sdk-js/issues/2961#issuecomment-868352176 to help me get this part working:

import { Upload } from '@aws-sdk/lib-storage';

// ... pipe is above

  const multipartUpload = new Upload({
    client: new S3Client({}),
    params: {
      Bucket: 'my_bucket',
      Key: 'prefix/out.parquet',
     // `output` is the PassThrough created above
      Body: output
    }
  });
  await multipartUpload.done();

This throws a warning:

(node:8) [DEP0005] DeprecationWarning: Buffer() is deprecated due to security and usability issues. Please use the Buffer.alloc(), Buffer.allocUnsafe(), or Buffer.from() methods instead.
(Use `node --trace-deprecation ...` to show where the warning was created)

I have not dug any deeper to see where this is coming from, but it works.

vishald2509 commented 1 year ago

The easier way I have found is not using ParquetWriter.openStream but via an undocumented ParquetTransformer class. This class extends node's Transform stream and can be used inside a pipe as a step.

import { createReadStream, createWriteStream } from "fs";
import { ParquetSchema, ParquetTransformer } from "parquetjs";
// stream-json has some nice streaming tools for working with JSON
import * as StreamArray from "stream-json/streamers/StreamArray";

const reader = createReadStream("data-json.json"); // contains JSON Array
const destination = createWriteStream("countries.parquet");

const schema = new ParquetSchema({
  value: {
    fields: {
      Country: { type: "UTF8" },
      Indicator: { type: "UTF8" },
      Value: { type: "FLOAT" },
      Year: { type: "INT64" }
    }
  }
});

reader
  .pipe(StreamArray.withParser())
  .pipe(new ParquetTransformer(schema))
  .pipe(destination);

I have tried this the error StreamArray.withParser() is not a function. I am using JSON object-> buffer and creating a reader.

BrunoGabrielGodoi commented 1 year ago

I did this a little differently, I'll leave here in case it helps someone

import { Readable, PassThrough } from 'stream'      
import parquet, { ParquetTransformer } from 'parquetjs'
 ...     
const destination = new PassThrough()

//dataArray = the data you want as Parquet
const reader = Readable.from(dataArray)

let parquetSchema = new parquet.ParquetSchema({
        id: { type: 'UTF8' },
        created_at: { type: 'TIMESTAMP_MILLIS' },
      })

reader
        .pipe(new ParquetTransformer(parquetSchema))
        .pipe(destination)

const uploadParams = {
        Bucket: 'bucketName',
        Body: destination,
        contentEncoding: 'base64',
        Key: 'key',
        ContentType: 'application/x-parquet'
      }
return this.s3.upload(uploadParams).promise()
bhekanik commented 4 months ago

Here's how I did it, in case someone is still struggling with this:

import parquet, { ParquetTransformer } from 'parquets'
import { PassThrough, Readable } from 'stream'

export const createParquetFile = async ({ config, events }) => {
  const schema = createSchema(config.schema)

  const passThroughStream = new PassThrough()

  try {
    // events is an array of objects to be written into the parquet
    const reader = Readable.from(events)
    reader.pipe(new ParquetTransformer(schema)).pipe(passThroughStream)
    return passThroughStream
  } catch (error) {
    console.error('Failed to create parquet file:', error)
    return null
  }
}
import { S3Client } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

export async function uploadToS3({ bucketName, objectKey, passThroughStream, s3Client = new S3Client() }) {
  try {
    const uploader = new Upload({
      client: s3Client,
      params: {
        Bucket: bucketName,
        Key: objectKey,
        Body: passThroughStream,
      },
    })

    await uploader.done()
  } catch (err) {
    console.error('Error uploading file:', err)
  }
}