vmware / vmware-go-kcl-v2

vmware-go-kcl is a vmware originated open-source project for AWS Kinesis Client Library in Go. It has been widely used by many external companies and internally by Carbon Black. vmware-go-kcl-v2 is its companion project by utilizing AWS Go SDK V2 which introduces lots of breaking changes. To keep the repo clean, it is better to have a separated repo vmware-go-kcl-v2 with better golang project structure improvement.
MIT License
23 stars 17 forks source link

Worker Rebalance Fails on Unassigned Shards #50

Open calebstewart opened 1 year ago

calebstewart commented 1 year ago

Describe the bug

I am seeing errors like this when restarting a worker instance:

Error in rebalance: AssignedToNotFoundForShard

Investigating the root cause, it seems that the DynamoCheckpoint.ListActiveWorkers method returns this error when a shard has no assigned lease owner. The name of the error makes sense conceptually, but is this really an error? Why does the ListActiveWorkers method need to fail in this scenario? My (possibly naive) assumption would be that if a shard is unassigned, that does not affect the list of active workers. The method should return whatever set of active workers it finds, ignoring unassigned shards. For example:

// ListActiveWorkers returns a map of workers and their shards
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
    err := checkpointer.syncLeases(shardStatus)
    if err != nil {
        return nil, err
    }

    workers := map[string][]*par.ShardStatus{}
    for _, shard := range shardStatus {
        if shard.GetCheckpoint() == ShardEnd {
            continue
        }

        leaseOwner := shard.GetLeaseOwner()
        if leaseOwner == "" {
                        // Original code
            // checkpointer.log.Debugf("Shard Not Assigned Error. ShardID: %s, WorkerID: %s", shard.ID, checkpointer.kclConfig.WorkerID)
            // return nil, ErrShardNotAssigned
            checkpointer.log.Debugf("ListActiveWorkers: Shard Not Assigned. ShardID: %s, WorkerID: %s", shard.ID, checkpointer.kclConfig.WorkerID)
            continue
        }

        if w, ok := workers[leaseOwner]; ok {
            workers[leaseOwner] = append(w, shard)
        } else {
            workers[leaseOwner] = []*par.ShardStatus{shard}
        }
    }
    return workers, nil
}

Reproduction steps

  1. Start multiple workers.
  2. Restart a worker

Expected behavior

If the intention is to restrict rebalancing until all shards have leases, I don't think an error from rebalance is appropriate. If that is the case, this is not an error condition. At worst, I would argue this is a warning, but in my honest opinion, this would just be something like this in Worker.rebalance():

func (w *Worker) rebalance() error {
  // ... snip ...

  workers, err := w.checkpointer.ListActiveWorkers(w.shardStatus)
  if errors.Is(err, checkpoint.ErrAssignedToNotFound) {
    // Not all shards have leases yet, so don't rebalance
    return nil
  } else if err != nil {
    log.Debugf("Error listing workers. workerID: %s. Error: %+v", w.workerID, err)
    return err
  }

  // ... snip ...
}

Additional context

This may be intended functionality, but it seems odd to me, so I figured I'd open a bug report to ask. If this is intended, I'd appreciate some explanation of the error in question. It happens regularly during restarts, and has thus far seemed to be a red haring, and not a real error. So, it gives me a little fright every time I check up on the logs.

Also, if this is not something you have observed and you think I may be doing something wrong, please let me know. Happy to fix my code and close this issue if need be :smile:

Ghilteras commented 10 months ago

Looks like this repo is not maintained anymore?