I want to use this library with AWS Lambda but producer cannot be reused after stop.
Below example code occuers Unable to Put record. Producer is already stopped when runs at second times.
package main
import (
"fmt"
"github.com/a8m/kinesis-producer"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"golang.org/x/sync/errgroup"
"os"
)
var pr = producer.New(&producer.Config{
StreamName: os.Getenv("KINESIS_STREAM"),
Client: kinesis.New(session.Must(session.NewSession())),
})
func handle(e events.KinesisEvent) error {
eg := errgroup.Group{}
pr.Start()
eg.Go(func() error {
for r := range pr.NotifyFailures() {
return r
}
return nil
})
for _, r := range e.Records {
// Any logic for each records
if err := pr.Put(r.Kinesis.Data, r.Kinesis.PartitionKey); err != nil {
return err
}
}
pr.Stop()
return eg.Wait()
}
func main() {
lambda.Start(handle)
}
Of course, if I generate Producer every time, it works well but I want to reuse the producer as much as possible.
var kc = kinesis.New(session.Must(session.NewSession()))
func handle(e events.KinesisEvent) error {
var pr = producer.New(&producer.Config{
StreamName: os.Getenv("KINESIS_STREAM"),
Client: kc,
})
eg := errgroup.Group{}
pr.Start()
eg.Go(func() error {
for r := range pr.NotifyFailures() {
return r
}
return nil
})
for _, r := range e.Records {
if err := pr.Put(r.Kinesis.Data, r.Kinesis.PartitionKey); err != nil {
return err
}
}
pr.Stop()
return eg.Wait()
}
Is it possible to make the Producer discretion by making the flush() method of the Producer public or by Stop() and then Start() again?
I want to use this library with AWS Lambda but producer cannot be reused after stop. Below example code occuers
Unable to Put record. Producer is already stopped
when runs at second times.Of course, if I generate Producer every time, it works well but I want to reuse the producer as much as possible.
Is it possible to make the Producer discretion by making the
flush()
method of the Producer public or byStop()
and thenStart()
again?