vmware / vmware-go-kcl-v2

vmware-go-kcl is a vmware originated open-source project for AWS Kinesis Client Library in Go. It has been widely used by many external companies and internally by Carbon Black. vmware-go-kcl-v2 is its companion project by utilizing AWS Go SDK V2 which introduces lots of breaking changes. To keep the repo clean, it is better to have a separated repo vmware-go-kcl-v2 with better golang project structure improvement.
MIT License
23 stars 17 forks source link

Workers Not Leasing All Shards #14

Open calebstewart opened 2 years ago

calebstewart commented 2 years ago

Describe the bug

The current worker implementation will only grab a lease for a single shard per shard sync interval. This seems like a bug. Based on the commit message (e2a945d824043765a899686bcda428bc68c5ea5b), the intent was to "prevent on host tak[ing] more shard[s] than it's configuration allowed". However, the result is that only a single shard is leased per interval. This causes start up times to balloon as more shards are introduced.

Reproduction steps

A basic worker on a stream with multiple shards exhibits this behavior:

package main

import (
  "os"
  "fmt"
  "os/signal"

  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
)

type RecordProcessor struct {}
type RecordProcessorFactory struct {}

func (f *RecordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor {
  return &RecordProcessor{}
}

func (p *RecordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {}
func (p *RecordProcessor) Initialize(input *interfaces.InitializationInput) {}
func (p *RecordProcessor) Shutdown(input *interfaces.ShutdownInput) {}

func main() {

  // Separately, I have no idea why, but the library seems incapable of figuring out the
  // Kinesis service endpoint on it's own. Not specifying it manually results in errors
  // where it seemingly is trying to use an empty string as a service endpoint, but that's
  // probably a problem for a separate issue.
  cfg := config.NewKinesisClientLibConfig("test", "caleb-testing", "us-east-2", "worker")
  cfg.KinesisEndpoint = "https://kinesis.us-east-2.amazonaws.com"
  kcl := worker.NewWorker(&RecordProcessorFactory{}, cfg)

  if err := kcl.Start(); err != nil {
    fmt.Printf("[!] failed to start kcl worker: %v\n", err)
    return
  }
  defer kcl.Shutdown()

  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt, os.Kill)
  for range signals {
    break
  }

  return
}

Expected behavior

A worker should lease as many shards as it can up to MaxLeasesPerWorker on every shard sync.

Additional context

I believe the solution is to refactor the lease loop (ref) to look something like this:

// max number of lease has not been reached yet
for _, shard := range w.shardStatus {
  // Don't take out work leases than allowed
  if counter >= w.kclConfig.MaxLeasesForWorker {
    break
  }

  // already owner of the shard
  if shard.GetLeaseOwner() == w.workerID {
    continue
  }

  err := w.checkpointer.FetchCheckpoint(shard)
  if err != nil {
    // checkpoint may not exist yet is not an error condition.
    if err != chk.ErrSequenceIDNotFound {
      log.Warnf("Couldn't fetch checkpoint: %+v", err)
      // move on to next shard
      continue
    }
  }

  // The shard is closed and we have processed all records
  if shard.GetCheckpoint() == chk.ShardEnd {
    continue
  }

  var stealShard bool
  if w.kclConfig.EnableLeaseStealing && shard.ClaimRequest != "" {
    upcomingStealingInterval := time.Now().UTC().Add(time.Duration(w.kclConfig.LeaseStealingIntervalMillis) * time.Millisecond)
    if shard.GetLeaseTimeout().Before(upcomingStealingInterval) && !shard.IsClaimRequestExpired(w.kclConfig) {
      if shard.ClaimRequest == w.workerID {
        stealShard = true
        log.Debugf("Stealing shard: %s", shard.ID)
      } else {
        log.Debugf("Shard being stolen: %s", shard.ID)
        continue
      }
    }
  }

  err = w.checkpointer.GetLease(shard, w.workerID)
  if err != nil {
    // cannot get lease on the shard
    if !errors.As(err, &chk.ErrLeaseNotAcquired{}) {
      log.Errorf("Cannot get lease: %+v", err)
    }
    continue
  }

  if stealShard {
    log.Debugf("Successfully stole shard: %+v", shard.ID)
    w.shardStealInProgress = false
  }

  // log metrics on got lease
  w.mService.LeaseGained(shard.ID)
  w.waitGroup.Add(1)
  go func(shard *par.ShardStatus) {
    defer w.waitGroup.Done()
    if err := w.newShardConsumer(shard).getRecords(); err != nil {
      log.Errorf("Error in getRecords: %+v", err)
    }
  }(shard)

  // Increase the number of leases we have
  counter++
}
theryangeary commented 1 month ago

@calebstewart I'm trying this out for my application and it seems to work much better, any chance of merging such a change?

calebstewart commented 1 month ago

I don't have any issues with that. I had never received any confirmation or response, so I was never certain my perspective was correct.