harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
263 stars 88 forks source link

Slice Callback Function with Batch Interval Functionality #129

Open epociask opened 3 years ago

epociask commented 3 years ago

Addressing issue #128

Added additional functionality for batch interval messages with a separate callback function that returns a slice of kinesis.Record than a single reference to a consumer.Record....

To use batch interval consumption, just specify your callback function as func([]*kinesis.Record) error and pass it as a parameter to ScanBatch

harlow commented 3 years ago

Hi thanks for submitting the PR; I'm struggling a little with the amt of duplication between the two function; my fear is "drift" between the two makes this code path harder to maintain over time.

Taking a step back I'm curious if something like this is possible:

// shape of the code
func Scan(ctx, fn ScanFn)
    items = ScanBatch(ctx, func(items) {
        for each items 
            fn(item)
    })

func ScanBatch(ctx, fn ScanBatchFn)
       // 

so the Scan func passes off to the ScanBatch func (which would eliminate the duplication of the two loops). I'll play around with this a little and see if anything emerges.

Or potentially the inverse where we add a convenience function ScanBatch which uses Scan


func Scan(ctx, fn ScanFn)
    // original function

func ScanBatch(ctx, fn ScanBatchFn)
       c.Scan()
       // do the batching here