amazon-archives / kinesis-storm-spout

Kinesis spout for Storm
Other
106 stars 64 forks source link

Checkpointing and LATEST position. #9

Closed fjania closed 8 years ago

fjania commented 9 years ago

I'm seeing this behavior and I'm a bit confused by it. In either case I've set the position in shard to be LATEST, e.g. my spout config uses:

withInitialPositionInStream(          
    com.amazonaws.services.kinesis.stormspout.InitialPositionInStream.LATEST
);

Run my topology locally, using the Zookeeper instance created by LocalCluster - I'm getting the latest records.

Run my topology locally, using an instance of Zookeeper I've setup separately - I'm getting records that are 10 hours old.

I'd expect that if I choose the LATEST position in the shard, I shouldn't be getting such old data. Is there something I'm missing?

buzzstop03 commented 9 years ago

Sorry for the late response. Are you still running into this?

fjania commented 9 years ago

Last I tried, yes, but I also may be misunderstanding.

From the source of com.amazonaws.services.kinesis.stormspout.InitialPositionInStream it says "This is used during bootstrap (if a checkpoint doesn't exist for a shard)."

So if I'm understanding that comment right, then if there is a checkpoint stored, regardless of your initial position, you'll start at the checkpoint?

My confusion is this - how do you avoid using a checkpoint then? E.g. you started testing a topology and you only want to work on the LATEST so you do some work, test it, kill it and then go home for the night. Then you come back in in the morning and try to test it again, but rather than getting LATEST you get data from 14 hours ago.

I guess I've assumed something like:

LATEST always means the head of the stream TRIM_HORIZON always means the tail of the stream

But in the absence of another option I'm not sure intuitively what they should do. I was thinking an option like CHECKPOINT_OR_LATEST would look for a checkpoint and in it's absence fall back to LATEST.

I can see how the default as they're set up might be desirable, but I've also run into cases where so much data gets backed up that my topology as deployed can't handle the load and I don't find that out until it's too late.

npetryk commented 9 years ago

Hi @fjania, I've also been running into this during topology development.

I believe the reason it was implemented this way is because this spout tries to ensure that it processes all data, even through topology resets (and rebalances). I agree that there should be a more intuitive, explicit option to throw out all checkpointed data when the topology starts though.

My only work-around has been to delete the spout state in my zookeeper clusters manually. I think this is acceptable because in a production environment you'd want to be very careful about when you essentially lose track of all your processing -- which is what your asking for the spout to do normally. You could easily automate this task in a shell script, as I have.

The relevant lines in the source where your LATEST / TRIM_HORIZON is being ignored are here

ZookeeperStateManager#321

Here the ZookeeperStateManager, which is the class responsible for tracking shard state locally and keeping that in sync with the shard state in zookeeper, uses your starting point option ONLY IF it find that a checkpoint for that shard has not been set already, as you suspected.

I think this is because there is no facility currently implemented to tell if the topology is running, which if you could would let you know if it were appropriate to reset your checkpoints (because the topology was not currently running and thus could be safely reset), but again, this is pretty dangerous and should probably only be done explicitly.

fjania commented 9 years ago

@matrix10657 Good points. I agree that relying on the checkpoint as the topology rebalances is probably a good thing. I've noticed rarely though that a running topology will sometimes seem to "jump" to an older checkpoint, but tracing this down has proved to be too big a task given how rare it has been for me.

On startup though, I've resorted a hack that prefixes zookeeper like this:

.withZookeeperPrefix(
    topologyName
    + "_kinesis_spout_"
    + configEnvironment
    + "_"
    + System.currentTimeMillis())

So I can start fresh.