harlow / kinesis-consumer

Golang library for consuming Kinesis stream data
MIT License
264 stars 90 forks source link

Refactor the Scan public interface #75

Closed harlow closed 5 years ago

harlow commented 5 years ago

As library has evolved to meet the custom needs of its users, the customizations have leaked into the public interface of the scan func. It has lead to a somewhat verbose interface with the concept of ScanStatus which wraps logic around checkpointing and flow control of the scan + errors.

Current scan interface:

// 1. an error here which returns either stream errors, or inner scan error
err := c.Scan(context.TODO(), func(r *consumer.Record) consumer.ScanStatus {
  fmt.Println(string(r.Data))

  // 2. ScanStatus which controls the flow of scan and whether to set checkpoints
  return consumer.ScanStatus{
    StopScan:       false,  // true to stop scan
    SkipCheckpoint: false,  // true to skip checkpoint
  }
})

I'd like to unravel the ScanStatus to simplify the callback func. If people need custom functionality around skipping checkpoints, I'd like to support this, but not force the additional cognitive overhead on all users of the library.

harlow commented 5 years ago

A few loops from stdlib for inspiration

Filepath Walk

https://golang.org/pkg/path/filepath/#Walk

err = filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
  if err != nil {
    fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
    return err
  }
  if info.IsDir() && info.Name() == subDirToSkip {
    fmt.Printf("skipping a dir without errors: %+v \n", info.Name())
    return filepath.SkipDir
  }
  fmt.Printf("visited file or dir: %q\n", path)
  return nil
})

if err != nil {
  fmt.Printf("error walking the path %q: %v\n", tmpDir, err)
  return
}

CSV Read

https://golang.org/pkg/encoding/csv/#Reader

r := csv.NewReader(strings.NewReader(in))

for {
  record, err := r.Read()
  if err == io.EOF {
    break
  }
  if err != nil {
    log.Fatal(err)
  }

  fmt.Println(record)
}
harlow commented 5 years ago

If we were to take a page out of the filepath.Walk book, we could create custom error types for StopScan and SkipCheckpoint as return values:

// SkipCheckpoint is used as a return value from Scan func to indicate that
// the current checkpoint should be skipped. It is not returned 
// as an error by any function.
var SkipCheckpoint = errors.New("skip this directory")

// StopScan is used as a return value from Scan func to indicate that
// the we should stop scanning the current shard. It is not returned
// as an error by any function.
var StopScan = errors.New("stop scanning shard")

err := consumer.Scan(context.TODO(), func(r *consumer.Record) error {
  fmt.Println(string(r.Data))

  // return consumer.StopScan
  // return consumer.SkipCheckpoint
  // return errors.New("this error stops all scans, bubbles up to caller") 

  // nothing to see here, continue scanning
  return nil 
}
sharonjl commented 5 years ago

Here is an alternative, similar to HTTP handlers.

// consumer.go
type Iterator interface {
    SkipCheckPoint()
    StopScan()
        // Other functions that should interact with the lib
}

// Scan code
err := consumer.Scan(context.TODO(), func(it consumer.Iterator, r *consumer.Record) error {
  fmt.Println(string(r.Data))

  if someCond {
    it.StopScan() 
    return nil
  }
  return nil 
}
sharonjl commented 5 years ago

I think the Iterator interface is a better alternative as it will provide extensibility into other features such as embedded context, for deadlines and timeouts, or even future Kinesis features. I personally believe we should limit the use of error codes as flags for control flow in libraries.

harlow commented 5 years ago

I personally believe we should limit the use of error codes as flags for control flow in libraries.

Totally agree. Thanks for sharing the idea of an iterator that can be passed in. Def interesting.

After reading your note I looked back into the consumer code to see what flow control is currently supported; it looks like the inner loop already supports cancellation through context which means the StopScan probably isn't needed.

https://github.com/harlow/kinesis-consumer/blob/master/consumer.go#L183-L184

sharonjl commented 5 years ago

Yes~ I think that works out well for cancellation:

w := it.Context.Done()
<- w

I wanted to propose some minor changes after thinking about this a bit more.

// consumer.go
type Iterator interface {
    ShouldSkipCheckPoint()
    ShouldStopScan()
        // Other functions that should interact with the lib
}

// Scan code
err := consumer.Scan(context.TODO(), func(it consumer.Iterator, r *consumer.Record) error {
  fmt.Println(string(r.Data))

  if someCond {
    it.ShouldSkipCheckpoint() 
    return nil
  }
  if readTooMuch {
    it.ShouldStopScan() 
    return nil
  }
  return nil 
}

I noticed SkipCheckpoint and StopScan are acted upon after the current scan is complete, so I figured for clarity we should prefix them with Should