amazon-archives / kinesis-storm-spout

Kinesis spout for Storm
Other
106 stars 64 forks source link

Support kinesis stream resharding #11

Open lexicalunit opened 9 years ago

lexicalunit commented 9 years ago

The README mentioned future work to support resharding (close, merge, split) events. I'm curious if work has been doing towards this goal, or if such work is currently planned. I may also be in a position to work on developing such a feature in the near future so any insights into this would be welcome if there are currently no plans to develop this feature.

lexicalunit commented 9 years ago

Looking through the code a little further, it seems there is functionality to re-sync the ZK state when a reshard event occurs. There's also code for detecting shard merges and splits have occurred, and setting the shard info appropriately. Is this functionality already providing support for resharding events and the README simply hasn't been updated, or is it simply the foundation work for supporting this?

lexicalunit commented 9 years ago

Ok, now I've done some testing. I am running a topology with kinesis storm spout in local mode. Before resharding, my events come off the kinesis stream and go through the topology just fine. After resharding none of my events seem to make it to the kinesis storm spout. So despite the existence of the code I described in my last comment, it does seem to be the case that resharding is not supported.

jangie commented 9 years ago

Based on my read of the comments on initialize in https://github.com/awslabs/kinesis-storm-spout/blob/master/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java , and based on some quick testing, my understanding is that once the stream list is stored in zk, that's it (currently). I have resharded, followed by a restart of the topology, and had no new data come in. I suspect if I stop the topology, and kill the zookeeper data associated with the shard list, then it will pick up the shard list again from describestream once the topology is restarted. I might be completely misreading this though.

edit: Just as a confirmation, killing the data within zookeeper (under /{spoutId}/{topologyName}/{streamName} allowed it to realize that there were new/different shards and start running data through again.

lexicalunit commented 9 years ago

I've actually implemented code to handle automatic handling of reshard events. Cleaning it up and getting it ready for a PR soon. Essentially, it's pretty easy to detect that a reshard event has occurred, and when it does, we need to rebuild the shard list getter, bootstrap again, etc...

atroschinetz-rmn commented 9 years ago

PR created: https://github.com/awslabs/kinesis-storm-spout/pull/12

dosani commented 9 years ago

Hi @lexicalunit, @jangie, @atroschinetz-rmn,

For now, I have added a manual way to pick up and redistribute shards to tasks after resharding. This involves invoking "storm rebalance" after resharding a Kinesis stream. The benefit is that it doesn't require killing the topology. The downside is that it is still a manual operation :) We are looking into automating this and one possible approach is to redesign the Spout to use the Kinesis Client Library underneath, which provides automated refreshing and load balancing of shards across workers.

The approach here is similar to what @jangie suggested ie deleting the shard list before reactivating, but this automates the deletion to happen during deactivation (so it doesn't have to be done manually). The only thing required is invoking "storm rebalance" after resharding a stream.

Here's the commit: https://github.com/awslabs/kinesis-storm-spout/commit/ceebb3583a6d3a0280b372e3aea00d1c2248d52c

Would appreciate any feedback.

Thanks, -Adnan.

jangie commented 9 years ago

Hi @dosani , Just for opinion's sake: The KCL, from my limited experience working with it so far, would add the further requirement of AWS resources in the way of DynamoDB. It would be nice if this was avoided, though I definitely understand the nicety of being able to reuse code. I don't quite grok what PR #12 does, but if it does allow for automatic (vs. manual) resharding, can this PR be examined?

lexicalunit commented 9 years ago

Re-opening issue for discussion and until a PR/commit is merged to fix this issue. I can also give a brief explantation of #12.

When getNextShardIterator() returns NULL on an closed shard, that indicates that a reshard event has occurred. See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-get-data.html#kinesis-using-sdk-java-get-data-reshard for details.

In the code, we detect this condition and earmark the shard as having been affected by a reshard event. Later, when the spout attempts to next access that shard, we handle the resharding event for that shard by reinitializing it. The initialization process rebuilds the shardList, which triggers a watch event and kicks off the whole bootstrapStateFromZookeeper() code path.

That code path has been modified to, rather than blow away the old state, update the state with whatever newly assigned shards have been picked up by the spout. This way we don't loose inflight records, which are records that have been emitted but not acked, meaning their state has not been committed to zookeeper yet.

That's about the long and the short of it. The result is that we can deploy the Kinesis Autoscaling WAR found here https://github.com/awslabs/amazon-kinesis-scaling-utils so our streams automatically scale up and down based on demand. And the kinesis spout automatically recovers from these reshard events without any manual intervention.

I would love for more people to test out #12 if able. We have done our internal testing and everything seems ok. But the more testing the better. Unfortunately, the codebase for kinesis-storm-spout is very spaghetti-like with no clear separation of concerns for objects, resulting in semantic concepts like "shard" sharing state across multiple in-code objects. This makes unit testing our changes nearly impossible.

lexicalunit commented 9 years ago

Future work would be refactoring how the spout distributes shards. For example, it could probably be fixed to not even assign closed shards with no more events on them. This would avoid the possibility of a worker being assigned nothing but closed and empty shards simply because the number of shards is much greater than the number of workers. Honestly, a better strategy would be to round-robin things so that assigning shards isn't even an issue...

npetryk commented 9 years ago

+1 for doing a round-robin over the shard list.

Aside from initializing ZK, the only reason the bootstrapping process exists (from what I can tell) is to allocate shards to spout tasks, which severely complicates the process of resharding / rebalancing as @lexicalunit has described.

If any given spout task could just pick the 'next' shard to read from, rather than having a set shard id, one could fluidly add/remove shards/spout tasks without needing to do a bunch of coordination to get the shard assignments & make sure that no data gets repeated or left behind.

lexicalunit commented 9 years ago

Yep. A thousand times yep. Would be so much easier.

dosani commented 9 years ago

@lexicalunit, thanks for providing the explanation for your pull request. I haven’t got a chance to look at the pull request in detail but I was wondering if it handles an edge case where after 24 hours, closed shards are removed from the stream. The entries in the shard list after the removed shard would shift left. As different tasks get notified of the new list at different times, there would be a short period where multiple tasks are processing the same shard. (In the case where shards are appended to the list, it’s ok if different tasks get notified at different times because it doesn’t result in duplicate assignment).

@lexicalunit, @matrix10657, regarding round robin assignment, if all tasks are reading from all shards one by one, then there would be a lot of duplicate reads and processing, even if the checkpoint state is written and refreshed very frequently by all the tasks for every shard. This is because there is a latency to read a batch of records and also for those records to be acked by the rest of the topology (and the checkpoint wouldn't be updated before that). In that time, other tasks could fetch the same batch. This can overwhelm the topology and also overload Zookeeper.

@jangie, you are correct to observe that using KCL would involve DynamoDB usage, though normally it should only be a small fraction of the Kinesis stream cost. Also, since it won't use Zookeeper any more, the load on Zookeeper would be reduced. Alternatively, a lease based shard assignment could be implemented on top of Zookeeper (similar to KCL), but we have no concrete plans to do that at the moment.

Thanks!

dosani commented 9 years ago

Btw, since Storm is not fully elastic (yet), in order to get good performance, one has to rebalance anyway (after the load or capacity changes).

lexicalunit commented 9 years ago

@dosani wrt the edge case you mentioned. The handle reshard logic wouldn't be entered based on closed shards being removed after 24 hours. That logic is only entered once; following an attempt to read data off a closed shard and not finding any more data on it (get shard iterator returns null).

In the case that shards are removed after 24 hours, what happens is a task will still be assigned to that shard. When next attempting to read data off it, the read will fail because the shard no longer exists. In that case it would be good to remove that shard from the assigned shard list... I don't think the code already does that. I didn't add that feature as part of #12.

dosani commented 9 years ago

Sorry, I meant if another reshard event happens after a closed shard is removed. That would trigger a refresh of the shard list.

jangie commented 8 years ago

I just want to make sure: Is this issue considered essentially closed with 1.1.1?