vmware / vmware-go-kcl-v2

vmware-go-kcl is a vmware originated open-source project for AWS Kinesis Client Library in Go. It has been widely used by many external companies and internally by Carbon Black. vmware-go-kcl-v2 is its companion project by utilizing AWS Go SDK V2 which introduces lots of breaking changes. To keep the repo clean, it is better to have a separated repo vmware-go-kcl-v2 with better golang project structure improvement.
MIT License
23 stars 17 forks source link

Closed Shards Prevent Startup #12

Open calebstewart opened 2 years ago

calebstewart commented 2 years ago

Describe the bug

To preface, I am new to Kinesis and the KCL, and this could be a configuration issue (either with Kinesis itself or with the KCL). If I'm doing something incredibly wrong, please let me know. I'd be happy if this was "user error". :stuck_out_tongue:

As a basic example, if I have a Kinesis stream with a single shard, everything works as expected. When the worker starts up, it takes a lease for that shard, and starts processing records. However, if I re-shard the stream to instead have two shards, then AWS closes the first shard, and creates two new shards.

The result is that the KCL worker repeatedly attempts to take out a lease on the closed shard, and then immediately closes it. This coupled with the fact that the worker will only start a job for a single shard per shard sync and always iterates over shards ordered by ID (meaning older shards first) means that the worker never actually starts jobs for the other (valid) shards. It just repeatedly tries to take a lease for the closed shard, then the polling job exits immediately since the shard is closed. On the next sync, it once again grabs a lease for the closed shard, and the process repeats forever. It does not seem to ever stop visiting the closed shard, and because it will only look at one shard per-sync, it never starts jobs for the other two valid shards.

Reproduction steps

  1. Create a Kinesis Data Stream with a single shard.
  2. Re-shard the data stream to have 2 shards (which will close the original shard).
  3. Start a KCL worker.

The worker is as simple as it gets. I observe the same behavior when using enhanced fan-out or polling.

package main

import (
  "os"
  "fmt"
  "os/signal"

  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
  "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
)

type RecordProcessor struct {}
type RecordProcessorFactory struct {}

func (f *RecordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor {
  return &RecordProcessor{}
}

func (p *RecordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {}
func (p *RecordProcessor) Initialize(input *interfaces.InitializationInput) {}
func (p *RecordProcessor) Shutdown(input *interfaces.ShutdownInput) {}

func main() {

  // Separately, I have no idea why, but the library seems incapable of figuring out the
  // Kinesis service endpoint on it's own. Not specifying it manually results in errors
  // where it seemingly is trying to use an empty string as a service endpoint, but that's
  // probably a problem for a separate issue.
  cfg := config.NewKinesisClientLibConfig("test", "caleb-testing", "us-east-2", "worker")
  cfg.KinesisEndpoint = "https://kinesis.us-east-2.amazonaws.com"
  kcl := worker.NewWorker(&RecordProcessorFactory{}, cfg)

  if err := kcl.Start(); err != nil {
    fmt.Printf("[!] failed to start kcl worker: %v\n", err)
    return
  }
  defer kcl.Shutdown()

  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt, os.Kill)
  for range signals {
    break
  }

  return
}

Running the above example with a single shard, before re-sharding looks like this: image

Running after re-sharding (where shardId-000000000000 is now closed) looks like: image

As you can see, no polling shard consumers are started, so no records are being processed because the worker only attempted to start a single consumer, which immediately exited. When a shard sync triggered ~60 seconds later, it tried to pick up the same closed shard, and still did not start processing records.

Expected behavior

The worker starts up, and takes leases up to MaxLeasesForWorker, and starts a polling (or enhanced fan-out) job for each of the leased shards immediately.

Additional context

There are multiple things that don't make sense to me going on here.

  1. Why does the KCL worker only start a job for a single shard on each sync? According to the documentation, it should take out leases up to MaxLeasesForWorker, and start jobs for each shard that it leases, but in practice it only takes out one lease at a time per-sync. If you have many shards, and use the default ShardSyncInterval (1 minute), then it will take a very long time for the worker to startup and listen on all the expected shards. Even if you had a small number of shards like 10, and 2 workers, then it would take 5 full minutes at a minimum to even startup a single worker assuming they each lease 5 shards (these are just made up numbers to explain the point; not saying that configuration makes sense in a production context).
  2. Why does the KCL worker attempt to start jobs for closed shards? Because it iterates over shards in order by ID and the above behavior of only starting one shard job, the worker gets stuck attempting to lease a closed shard forever, and never progresses to other shards.

Regarding the "only starting one job per sync" question, it seems this happens because of the break statement here. I'm struggling to understand why this happens at all, to be honest.

Is it normal to take an extremely long time to start up a worker? Is this a bug? Do people just set the shard sync interval much lower than default in practice? I can't find anything in the upstream AWS Java KCL documentation that mentions this. It all seems to indicate that on startup, a worker will take out leases up to the max lease limit, and start the appropriate processing jobs.

calebstewart commented 2 years ago

It does appear that eventually the worker will pick up the other shards, but it also keeps attempting to start consumers for the closed shard. I would imagine on a long-lived stream this would only keep happening more often, and cause more and more (unpredictable) start-up latency.

It also takes multiple shard syncs before it randomly chooses a different unclosed shard which further exacerbates the long startup time, so I'm still a bit confused about how or why that works the way it does. In this example, it takes almost 5 minutes to start listening on two shards, but I've seen (most) other instances of this test where it keeps trying to take shardId-000000000000 for much longer, which is why I opened this issue originally. A majority of the time it just keeps retrying that closed shard for so long that I ended up giving up, and killing the worker, since it was doing nothing. Also, after finally acquiring shardId-000000000001 and shardId-000000000002, it will always keep trying to take shardId-000000000000 at every shard sync.

It's also worth mentioning that I tried this with vmware-go-kcl and had the same result.

image

arl commented 1 year ago

@calebstewart

Hello. We had had the exact same problem with resharding in the previous version of vmware-kcl-go but afaict that hasn't been fixed.

To workaround that:

In that way, resharding are handled in an automatic way for us.

calebstewart commented 1 year ago

@spentakota any input on this problem? @arl's input is helpful but this seems like a real problem, and not something that the library user should need to concern themselves with. Resharding can potentially never succeed because consumers get stuck re-leasing closed shards forever. If this is intended behavior, it should be mentioned in the documentation, and an intended solution should be outlined (potentially what @arl suggested or some other recommended solution).

vmwjc commented 1 year ago

@calebstewart @arl - A closed shard might still have records to process. Records do not just go away as soon as you split the shard. New records get written to the post-split/post-merge shards, but the stream consumer may not have processed all the records on the old shard(s) yet. Records will stay available for consumers based on the stream's configured retention period. This is where checkpointing comes into play. It's up to your code to call the ProcessRecordsInput.Checkpointer.Checkpoint method. In the case of the DynamoDB checkpointer, it updates the entry in that table for the shard in question so that a future worker knows where to pick up. This also ensures that records are processed in order even when they span between parent shards (i.e. pre-split/pre-merge shards) and their child shards. The KCL will wait to process records from the child shard until the parent is checkpointed as closed using the sentinel value "SHARD_END" in the DynamoDB table.

komealy commented 1 year ago

@vmwjc I have this at the bottom of my ProcessRecords implementation:

    err := input.Checkpointer.Checkpoint(input.Records[len(input.Records)-1].SequenceNumber)
    if err != nil {
        logrus.WithError(err).Error("trouble with event loop checkpointing")
    }

The issue that I have is that when the scaling of kinesis shards happens, I still get this behavior of it looping over the Parent shards that no longer have any records to read. What method needs to happen to ensure the SHARD_END is recorded on the checkpoint table? The issue is that when I log what that SequenceNumber would be, it is never nil, it is the sequence number of the last record passed in...

The ProcessRecords never actually gets called for closed, completed shards, so my checkpoint call in there would never checkpoint the completed shard anyway.

I agree with @calebstewart that this should be handled within the library, probably within the Shutdown function itself. It would need to be able to handle the scenario where a shard is closed, but still has data to process, and not nil checkpoint in that scenario.

It does look like @arl has the right solution for making sure nil is passed to the Checkpoint function. I don't mind including that, but it does feel like the library should know when a shard is complete, and checkpoint with nil anyway, even when I didn't ever call Checkpoint.

Current state of my dynamo checkpoint table after scaling Kinesis shards, and not including the explicit call to Checkpoint with nil during the shutdown.

image

I also confirmed this has the same behavior in both the vmware-go-kcl and vmware-go-kcl-v2 versions of this library. If I get time, I'll open a PR with what I think may be a good suggestion.

calebstewart commented 1 year ago

As much as it makes sense to me conceptually, the upstream KCL documentation (which this library attempts to stick closely to) indicate that library users should:

  1. Manually checkpoint after processing records on some interval. The Java docs say "A processor could, for example, call checkpoint on every third call to processRecords" while the Python KCL docs show the record processor checkpointing on an interval of 1 minute.
  2. The shutdown method should handle the TERMINATE reason by checkpointing.

The above comes from the AWS docs here and here

Given that this library attempts to maintain rough API compatability with the upstream KCL, I think the current behavior makes sense. The documentation on such an important component feels a little lacking, but since this library defers to AWS KCL documentation, that's not technically a problem here.

That being said, an example project that uses this library would be immensely helpful. This would be akin to the examples found in the AWS docs here. Maybe converting the Python example to Golang would be helpful for new users. If I find some time, I'll make a PR with a basic example that mimics the official Python KCL consumer examples linked above.

edit: mixed up the upstream KCL version links

komealy commented 1 year ago

Ah, they even have the test case showing this: https://github.com/vmware/vmware-go-kcl-v2/blob/main/test/record_processor_test.go#L84-L96

That's good enough for me as an example. It also follows the same premise that @arl showed in their implementation. I have this in mine as well, and this issue is no longer a problem for me. The Dynamo table gets the SHARD_END entries appropriately now too.