basho / riak_pipe

Riak Pipelines
Apache License 2.0
162 stars 60 forks source link

limited support for ring resizing #76

Closed jrwest closed 11 years ago

jrwest commented 11 years ago

Provides ability for clusters using riak_pipe to use ring resizing. Ultimately, this is a hack that includes just enough support for clusters to successfully transition between the old and new ring without riak_pipe getting in the way. It does this by lying to riak_core's handoff subsystem during resize. The lie it tells is that even when it has outstanding work/queue items it will report that it is empty. This will allow the pipe vnode to continue processing work in the old ring since vnodes are not deleted and shutdown until the new ring is installed. This does, however, come with the drawback that work that is in-flight when the new ring is installed has undefined behaviour. The work may complete or may be terminated by the cleanup processes that follows the installation of the new ring.

Better support for pipe has several challenges. Some can be easily addressed through extra support in riak_core but the fundamental challenge is that riak_pipe hands off worker archives queues. In a larger or smaller ring parts of those archrives queues would be owned by new partitions which would force those archives queues to be "re-opened" and disseminated to multiple targets during handoff.

beerriot commented 11 years ago

It has been too long since we discussed this for me to remember the details, so maybe we can/should recreate them here?

The problem with archives is interesting, but I think we can do better than completely opting out, since the only real fitting that currently uses them is riak_kv_w_reduce. For that fitting, there is no need to "re-open and disseminate". Instead, we could just do one of the following, based on the chashfun for the fitting:

  1. If chashfun is a constant (normal reduce), send the archive to the new owner of that hash. All senders will make the same decision to send to that new owner. This resize-forwarding strategy should work for any fitting that uses a constant chashfun.
  2. If chashfun is follow (pre-reduce), send the archive to any new owner within the hash space for this vnode. Strictly speaking, it would even be okay to send them all to any vnode, since they will all be sent to the same vnode for final reduce later anyway.

It looks like the second tricky thing is that fittings that don't use archives actually end up sending the atom undefined as an archive, which the receiving worker ignores. Sending those "anywhere" shouldn't cause problems, per se, but the code deciding where to send them would look crazy arbitrary, and would probably be better off deciding simply to not send them anywhere.

If the above are correct, then I think the trickiest bit of pipe resizing is not archive distribution, but may instead be the logic workers use to determine which non-pipe vnodes to talk to. The riak_kv_mrc_map, riak_kv_pipe_index, and riak_kv_pipe_listkeys fittings all talk to KV vnodes. For map, the decision may be easy, since the bucket/key pair can be rehashed, but for index and listkeys, the coverage plan may no longer be valid. Coverage requests that do not use pipe will have the same issue - how are they handled?

What other tricky things are there?

jrwest commented 11 years ago

The method you outline for dealing w/ constant/follow hash functions should work. My concern is more around regular hashing functions and worker queues, however. As I understand it, pipe takes the existing worker queue and hands it off to the new owner. It does this by "packaging" them all up and sending them as a single unit. This handed off queue of messages is what I was referring to as the "archive" (although I am now realizing that may be the wrong terminology). What to do w/ this queue (I'll stop calling it archive in case I'm wrong), is what I see as the main problem. In a larger ring, the queue would now need to be split up across multiple vnodes. In the shrinking case, the queues from many vnodes would have to be concatenated on a single vnode (the number of queues dependent on the shrinking factor). This I think can be sanely managed for constant/follow extending what you have above. This problem is further complicated, however, by two things: 1. ring resizing is the only operation that can be aborted (it is not assumed that it will always progress to the new ring). 2. the amount of time a vnode is considered to be in the "forwarding" state is much longer than typical ownership transfer, from the time it completes the transfer until the time all other ring resizing operations have completed. The result of this is that we need to persist writes in both the old and new ring. If we only write in the new ring (as is the case w/ typical ownership transfer in KV) and then abort, we lose that write. If we time this correctly we could lose all replicas. For pipe, I'm not so sure how this would need to play out and any input would be much appreciated -- my concern is about duplicated or lost work. For now, we get around this by never handing off, so we don't need to worry (except in the edge case, but somewhat likely, condition that a mapreduce spans the time the new ring is actually installed).

One other point you bring up is that the kv pipe modules have some assumptions about the ring. I am wondering if there are some changes there that may be necessary that I have missed. For now, assuming this hack, I'm suppose the changes may be as simple as: when these modules see the ring is resizing, just use the old ring.

beerriot commented 11 years ago

Ah, right, yes, I do remember discussing the queue breakup. Logically, it shouldn't be a big problem to figure out where to redirect inputs in either the growing or shrinking case. But, getting monitors to follow them, so that blocking sends recover correctly from node death and such, does look quite hairy.

Duplicated work is also a problem, since Pipe's current contract is at-most-once processing. Lost work wouldn't be as big of a deal, as long as it was logged (so not "lost", but "aborted") … except that MR shuts down as soon as any sort of error (like lost work) happens.

So, you've convinced me that punting for now probably is the right option. MR queries don't generally survive other cluster membership changes either, so this is not an entirely new issue.

jtuple commented 11 years ago

Bryan and I have both looked over this code and there has been a lot of discussion in this ticket + inside internal channels. As per the Mumble discussion this afternoon (Jordan, Joe B, Jon, Scott, Evan), I'm going to go ahead and +1 this so we can merge and cut 1.4pre1. However, we have several known issues that we are still investigating. One of which must be fixed in a future pull-request. Or, alternatively, initiating a ring resizing action will need to be disabled in a future commit before 1.4 final.

First off, the good parts. With this branch, nothing has changed when not resizing. Likewise, initiating pipe requests during a resize does not break the cluster -- as opposed to several failures if this branch doesn't merge because riak_core expects these changes. A more detailed review of ring resizing in general is in the primary pull-request for this feature: basho/riak_core#301

(Note: This following is a summary of known issues and plans based on the internal discussion this afternoon.)

However, there is a known feature blocking bug. If a cluster is performing map/reduce operations continuously, nodes stop triggering resize handoffs. The entire resize operation stalls. If map/reduce work is stopped, resize requests start running again within a few minutes. Normal handoff/etc are not affected. The cause of this bug is currently unknown. If this bug cannot be fixed easily, then dynamic resizing will end up being disabled for 1.4.

The second issue is a known design decision in ring resizing. As discussed both in the "The Rest Of The Story" section of basho/riak_core#301 as well as my final comment, the initial version of ring resizing does not delete data from a vnode that a vnode no longer owns after a resize. A consequence of this design is that list_keys and 2i operations can show duplicate keys. Eg. a request is sent to a vnode that holds an old copy of a key that has since been handed off during a resize. Normal Riak operations wouldn't ever ask this index for said key, because that key doesn't rightfully exist at that vnode given the current ring. However, list_key and 2i operations return data based on what's actually stored in the backend, which includes these old keys. This is a known issue that we've decided to ship with for 1.4, given that ring resizing is a dark/support-only feature in this release. The plan is to provide a support tool that can be run against a cluster to identify and delete these orphaned objects. We are also considering adding the option to enable additional code that would filter these keys as part of the operation. The concern is the performance impact of the additional filtering code. But, adding as something that can optionally be enabled is a reasonable idea.