crccheck / kinesis-streams

AWS Kinesis readable and writable streams
Apache License 2.0
8 stars 6 forks source link

Option for setting custom partitionkey? #34

Open plingampally opened 6 years ago

plingampally commented 6 years ago

First, I just wanted to say thanks for this module! It's short and simple, and very usable. It looks like currently the partitionKey is hard coded to 0, can you add an option to set this to a user defined string/function?

Thanks!

crccheck commented 6 years ago

My original intent was that you'd do it by adding it to the instance or subclassing, but I didn't give it much thought or documentation since I don't have a use case for it myself yet.

The original logic was inspired by https://github.com/voldern/kinesis-write-stream#partition-key

You can do it now with this modified example from the README:

const AWS = require('aws-sdk')
const { KinesisWritable } = require('kinesis-streams')
const client = new AWS.Kinesis()
const writable = new KinesisWritable(client, 'streamName', options)
writable.getPartitionKey = (data) => data.foo.substr(5)
inputStream.pipe(writable)
crccheck commented 6 years ago

With esnext object destructuring, the constructor is kinda heavier than I'd like right now. I could see your own partition key function being an option maybe:

const writable = new KinesisWritable(client, 'streamName', {
  getPartitionKey: (data) => data.foo.substr(5)
})
plingampally commented 6 years ago

Thanks for the prompt reply, adding it to the instance worked for me! On a similar note, is it possible to get the entire data from the readable kinesis stream? E.g. The sequenceNumber, paritionKey and data? Right now, i'm only able to get the data.

crccheck commented 6 years ago

not if you listen to the stream. Getting that metadata is outside the scope. Maybe if there was a RawReadableStream it would make sense. I'm not doing an .emit() event when I get something but that could be a way to do that.

source: https://github.com/crccheck/kinesis-streams/blob/master/lib/readable.js#L110

technicallyfeasible commented 5 years ago

For me the partitionKey is part of the data so I subclassed like this:

export default class PartitionedWritable extends KinesisWritable {
  private _prepRecord(data: Record) {
    return {
      Data: data.data,
      PartitionKey: data.partitionKey,
    };
  }
}