a8m / kinesis-producer

An aggregated records producer for Amazon Kinesis
MIT No Attribution
149 stars 51 forks source link
amazon-kinesis go kinesis kinesis-producer kpl kpl-aggregation producer

Amazon kinesis producer Build status License GoDoc

A KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK
and using the same aggregation format that KPL use.

Useful links

Example

package main

import (
    "time"

    "github.com/sirupsen/logrus"
    "github.com/a8m/kinesis-producer"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

func main() {
    client := kinesis.New(session.New(aws.NewConfig()))
    pr := producer.New(&producer.Config{
        StreamName:   "test",
        BacklogCount: 2000,
        Client:       client
    })

    pr.Start()

    // Handle failures
    go func() {
        for r := range pr.NotifyFailures() {
            // r contains `Data`, `PartitionKey` and `Error()`
            log.Error(r)
        }
    }()

    go func() {
        for i := 0; i < 5000; i++ {
            err := pr.Put([]byte("foo"), "bar")
            if err != nil {
                log.WithError(err).Fatal("error producing")
            }
        }
    }()

    time.Sleep(3 * time.Second)
    pr.Stop()
}

Specifying logger implementation

producer.Config takes an optional logging.Logger implementation.

Using a custom logger
customLogger := &CustomLogger{}

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       customLogger,
}

Using logrus

import (
    "github.com/sirupsen/logrus"
    producer "github.com/a8m/kinesis-producer"
    "github.com/a8m/kinesis-producer/loggers"
)

log := logrus.New()

&producer.Config{
  StreamName:   "test",
  BacklogCount: 2000,
  Client:       client,
  Logger:       loggers.Logrus(log),
}

kinesis-producer ships with three logger implementations.

License

MIT