flux-framework / flux-core

core services for the Flux resource management framework
GNU Lesser General Public License v3.0
167 stars 50 forks source link

Synchronizing producers and consumers via the KVS #206

Closed garlick closed 7 years ago

garlick commented 9 years ago

See use case described in PR #205

In this particular case, JSC wants to take action when new jobs are created, and as the jobs move through a well defined set of states. Wreck is updating the state in the KVS, and JSC is consuming it.

There is a key lwj.next-id which contains an integer that is incremented each time a job is started.

For each job there is a key lwj.N.state where N is the job id from lwj.next-id. This key is created right after lwj.next-id is incremented, then is updated each time the job state changes.

JSC installs a kvs watch on lwj.next-id, and upon callback, installs a new watch on each new lwj.N.state.

A race arises because wreck may increment lwj.next-id, and the job may begin moving through states before JSC has a chance to install the watch on lwj.N.state. JSC will miss the earlier states.

garlick commented 9 years ago

In the current KVS design, the only way to watch such that all state changes trigger a callback in the above scenario would be to watch the lwj directory. This works because every change to a value changes its directory entry (a name to SHA1 mapping) and thus its parent directory object, so a change to lwj.next-id causes a change to lwj as does a change to lwj.N and lwj.N.state. The downside is that state is not the only key stored under lwj.N, and every change will trigger a callback which will potentially wake up the JSC consumer when it has nothing to do.

Internally, the watch is implemented as follows. Every KVS commit generates a kvs.setroot event. A kvs module instance watches for this event and looks through its "watchers", testing whether the watched key's SHA1 has changed since the last watch response, which is cached. So in the kvs module, each watcher generates a small amount of activity on each commit, including walking the namespace up to the directory containing the watched key and faulting in any of those objects that have changed.

One suggestion for the above problem was to allow a key glob or regex to be registered in a watch along with a directory. For example JSC could `kvs_watch (h, "lwj.*.state") or something like that. This could serve to reduce the number of unnecessary JSC callbacks but would be tricky to implement and adds complexity to the KVS without IMHO really addressing the problem head on.

grondo commented 9 years ago

Yeah, I guess the alternative is to design services such that they won't need the transactional kvs_watch(). This ties into the KVS RFC proposed by @trws, where we can document practices that should be avoided.

I am to blame for the current bugs in wrexec because the design was made with no thought to 3rd party transactional users of lwj.*.state. I'm sorry for that, but there is no reason we can't make the proposed changes in wrexec and certainly in its replacement.

:disappointed:

dongahn commented 9 years ago

I personally think this came out in an opportune time. Before building too many services, we want to understand the good patterns we want to use.

I am dumping my brain into a set of slides to get this conversation going.

lipari commented 9 years ago

Considering just the original problem and not the larger issue of KVS practices, what if we modified wreck to use the JSC? Then we would have a sole maintainer of the lwj. hierarchy w/r/t writing the state changes. Wouldn't this eliminate the problem of the JSC getting out of phase as to the job state?

grondo commented 9 years ago

Would that work if there are multiple consumers?

lipari commented 9 years ago

Yeah, I see what you mean. I don't suppose we'd entertain elevating the JSC to be a comms module...

garlick commented 9 years ago

If it's OK with you guys I would like to have #234 focused on the specific problem of synchronizing JSC and wreck, and this issue on the general use of the KVS for synchronization: discussing best practices leading to RFC input, and possible enhancements to cover more uses, if that can be done within "eventually consistent" design boundaries.

trws commented 9 years ago

Moving over from #234.

We need to establish how to use asynchronous scratchpads like this safely, it's a tricky pattern to get right. Also, it would be good to work out what mechanism to use for what. Honestly I think this might be better served by both; having the state logged with timestaps as the transitions happen, as has already been proposed; and using the event mechanism to broadcast state transition events. I think we've been looking at the KVS watch construct as a way to implement a general event/slots mechanism, and I'm not sure it's best suited to that.

@garlick's note on #234 about providing a kvsdir with the root sha handle is a very good idea from a read atomicity standpoint, and could help a lot with keeping locking down. If it's something that isn't too hard to add, it would give many of the consistency benefits of a system like spanner as well. The only trick there is that it would eventually have to deal with key versions potentially having been garbage collected, but that issue is a pretty long way off, and if the reference is old enough for that a failure is probably acceptable.

The spanner/bigtable thing also reminds me, a simple GC scheme we might look into would be collecting versions of a path more than "x" versions out of the tree. As in, if 'lwj.next-id' has been replaced 5 times, drop the first. Effectively a short-tail history.

dongahn commented 9 years ago

From JSC experiences, I see at least three issues that one will have to overcome in developing robust producer-consumer patterns using KVS primitives.

  1. General races around installing a chain of watches. This led to two related symptoms: event-sequence non-determinism and longer black-out period. The transactional watch will be ideal to get over this, but I agree we should weigh its benefit against its engineering cost.
  2. State drift. When JSC user wants to react to an event notification which includes access the KVS state, it will suffer from a drift if the producer has updated the state in between. @garlick proposal can address this. This will probably have to be combined with some explicit synchronization below to some extent, though. Most use cases won't survive if the drift is too large. (e.g., worst case, the program isn't even running any longer.)
  3. For this reason, I also see general need for some explicit synchronization between rexec and JSC. I understand producers and consumers don't want to proceed in a lock-step fashion. But there are cases where the hand-holding needs to happen and many producer-consumer patterns would need this. Otherwise, we can only safely support producer writes and consumer read-only. For such sync, I am not convinced if KVS is the right mechanism. So I want to first list all of the synchronization mechanisms we can use between producers and consumers as part of flux messaging primitives and see what maps to what. And then we need to consider several more complex patterns: a single-producer, multiple-consumer pattern and a single-producer no-consumer pattern, and see if a solution and best practice we identify works for most or all of the cases.
trws commented 9 years ago

I think we're at a point where it would be a really good idea to all just stand together with a whiteboard for a while and hash some of this out. It seems like we have a good idea of what the use-cases are, and what the mechanisms are, but I at least don't know them well enough to know the tradeoffs and design decisions in the guts of all of them. As to lock-step between JSC and rexec, I don't think we will need to go that far, but we do need a mechanism that delivers reliable event/slot/notify/whatever you want to call it behavior.

If the exec mechanism needs to support interposing code between states, then maybe we need a way to register hooks with it through the KVS or some other mechanism. Making it lock-step execution with the JSC through any kind of messaging or KVS interface seems unnecessarily heavy-weight to me.

On 1 Jul 2015, at 16:38, Dong H. Ahn wrote:

From JSC experiences, I see at least three issues that one will have to overcome in developing robust producer-consumer patterns using KVS primitives.

  1. General races around installing a chain of watches. This led to two related symptoms: event-sequence non-determinism and longer black-out period. The transactional watch will be ideal to get over this, but I agree we should weigh its benefit against its engineering cost.
  2. State drift. When JSC user wants to react to an event notification which includes access the KVS state, it will suffer from a drift if the producer has updated the state in between. @garlick proposal can address this. This will probably have to be combined with some explicit synchronization below to some extent, though. Most use cases won't survive if the drift is too large. (e.g., worst case, the program isn't even running any longer.)
  3. For this reason, I also see general need for some explicit synchronization between rexec and JSC. I understand producers and consumers don't want to proceed in a lock-step fashion. But there are cases where the hand-holding needs to happen and many producer-consumer patterns would need this. Otherwise, we can only safely support producer writes and consumer read-only. For such sync, I am not convinced if KVS is the right mechanism. So I want to first list all of the synchronization mechanisms we can use between producers and consumers as part of flux messaging primitives and see what maps to what. And then we need to consider several more complex patterns: a single-producer, multiple-consumer pattern and a single-producer no-consumer pattern, and see if a solution and best practice we identify works for most or all of the cases.

Reply to this email directly or view it on GitHub: https://github.com/flux-framework/flux-core/issues/206#issuecomment-117852528

dongahn commented 9 years ago

@trws Yes, that's why I'm working on some slides :-)

Just to make sure, I am not proposing lock-step for all events, but instead have some programmable sync points between JSC and rexec (or in general to support various producer/consumer patterns) where they need to be sync'ed up. I'm not convinced if such sync is best done within KVS, though. (I'm open but I just need more convincing :-)

I generally like the idea of registering a code so that the code can be evaluated on some events avoiding sync. I'm not sure if this can cover most of the bases. Like you said, we need some concreate use cases and see what mechanism sticks subject to performance costs. I will make sure to send the slides for us to chew on.

garlick commented 9 years ago

Since we haven't mentioned it yet I will throw out kvs_get_version() and kvs_wait_version().

This is a simple mechanism for sharing data across the session based on two design points of the KVS:

Say A wants to share data with B. A can put some data, commit it, read the version number with kvs_get_version(), and transmit the version number to B. Upon receipt of the version number, B can wait for its cache to be updated to >= the specified version with kvs_wait_version(), then get the data. This works equally well for 1:1 or 1:N producers and consumers.

I think this mechanism could be enhanced with the "snapshot" idea as well. Perhaps a commit should return the new root kvsdir_t. We could have a way to serialize its SHA1 and send it in a message, receive the SHA1 and turn it back into a kvsdir_t, then walk that version of the root like a snapshot with kvsdir_get_*() as described before.

Both of these ideas (returning a snapshot ref in the kvs_watch() callback and this one) fit the KVS "eventually consistent" design and should be fairly easy to implement within the current KVS code.

dongahn commented 9 years ago

@garlick Thanks. I will think about where we can use this scheme. I remember I actually used this to establish a happened-before relation beyond kvs fense in my KAP test: causal consistency. The sync was done by producer not putting data until it gets the version message. -- i have to look at the code to remember exactly though .

Just so that you know, in my slides, I've identified three producer-consumer patterns where I had some issues: watch-chaining, notify-read and notify-control patterns. Then, in the next slides, how some of the proposed ideas help address these issues with some pros and cons.

I think it would be good to think about various techniques at least from the perspective of how they map to these issues.

dongahn commented 9 years ago

From @SteVwonder in response to the slides I put together: Is there a way to get the "historical values" of a given key in the KVS? My assumption is that this operation would be prohibitively expensive, but if it was cheap, it could solve the “blackout” problem. I think it would also be a “cleaner” solution than appending new states to the existing state value.

Also, this looks like the exact race condition I ran into last summer with the simulator. So once we have a good solution for this, I can refactor the simulator, and we can remove the SIMULATOR_RACE_WORKAROUND ifdef from flux-core and flux-sched.

-Stephen

dongahn commented 9 years ago

From @trws:

One alternative that hasn't been mentioned is having wreck write its states to lwj.state. instead of lwj..state. That would solve this whole issue at a stroke, but only for this specific issue.

-Tom

dongahn commented 9 years ago

Thanks Stephen and Tom.

Yes, restructuring the lwj layout this way and avoiding watch-chaining pattern once and for all will solve the race issue with this pattern. I can settle w/ this solution for this specific issue. But I think it would be good to continue to discuss what we really want to do for all three patterns — we may end up telling KVS users not to use those patterns, but this will be at the expense of reducing its utility…

In any case,

for the notify-read pattern, one that I like the most is the state-snapshot watch proposal.

For the watch-chaining and notify-control patterns, one that I like the most is the programmable sync points.

garlick commented 9 years ago

@SteVwonder: there isn't currently any log of historical values for the KVS root or other keys. We do assign a monotonically increasing version to each root so that we can be sure to always run the cache forward while taking updates both from published kvs.setroot events, and from kvs.commit responses, the latter being necessary to ensure the a client that does a commit always sees a version of the cache equal or newer than the one that contains the commit. So there is an order imposed on root SHA1's, but no log is kept.

It is worth noting that keeping a log of successive root SHA1's would be sufficient to walk through successive values of any key, because of the hash tree design. However it wouldn't be very efficient on a busy KVS. To iterate on an arbitrary key, one would have to iterate through historical root values, walking to the key in question, aborting the walk when encountering an SHA1 unchanged from the previous walk. If unlucky, a lot of data could be faulted in unnecessarily.

garlick commented 9 years ago

Please correct me if I missed anything, but I think what we concluded in our discussion this morning was:

I propose that we close this issue and that I move forward with the following KVS work:

Related topics (food for thought as we ponder distributed synchronization):

dongahn commented 9 years ago

Just want to make sure to document the results of today's discussion here. Please feel free to add/subtract if I missed some key points. We talked about some of the problems with the three patterns hit by JSC, which uses only KVS to implement a producer-consumer pattern with rexec (wrexec for now more specifically): watch-chaining, notify-read and notify-control patterns.

(BTW, please let me know if anyone knows if and how one can post a slides deck into this PR.)

Points:

1) One thing we agreed to work on is state-snapshot watch support mentioned in Issue #234. It seems very clear that this capability will solve the state drift issue with the notify-read pattern and also has other utilities.

2) There was general objection of using the watch-chaining pattern as @tws called it an anti-pattern. We will probably want to document this (perhaps as part of a RFC?) so that others won't suffer from its side effects.

3) We will then still have to solve the need for reliable job state notification mechanism without having to rely on the watch-chaining. One that seems most appealing to me was the following. Rexec uses the flux-event network to notify the event and jsc subscribes to this event. The event would cover both the new job creation as well as other regular state changes. If the event also piggybacks a handle to the KVS snapshot (similar to 1 above), this seems best. (@garlick, Is this supported?) This can eliminate the side-effects of the watch-chaining pattern (by not using the watch for eventing at all...) and also the state drift issue with the notify-read pattern.

4) State appending idea. rexec will append the state data (instead of update it). This way, jsc will consume the events in order without worrying about the black-out period. We will probably still use a watch and stuff though. (Is this still on the table?)

5) For notify-control pattern, we talked about the idea of fine-grained programmable sync points support within rexec. That is, users of jsc can add some behavior (e.g., stop) on a programmable set of events for a particular job. We at least agreed that using these sync points for all of the jobs will incur unneccessarily high overheads and we want to do this in a fine-grained manner if we do this.

If this is combined with 3) above, the user will probably have to send a "state control mask" (the set of states by which the user want rexec to be synchronized) as part of a new job creation request. I was further talking to @SteVwonder, and it seems it is kind of premature to decide how sync points ideas can be materialized. When we have more concrete use cases with runtime tools and dynamic scheduling implementation, we may know more about what's needed.

This can be discussed further later, but IMHO we will want to at least ensure earlier solutions we may work on -- e.g., 3) will not confine us down the road if we need to support this fine-grained programmable sync points within rexec-jsc users.

dongahn commented 9 years ago

@garlick: Sorry our messages got crossed.

Your proposal looks good. In addition, perhaps we can create jsc - rexec synchronization (#3 and #5) above as a placeholder to further discuss the idea of "eventing outside KVS" and "programmable sync point" support ?

Lamport clock can help debugging distributed events and piggybacking a scalar clock (32 or 64bit integer) typically doesn't add much overheads. Having said that, I don't yet have specific use cases as to how I can make use of a partial global order other than debugging and logging.

Test-and-set and compare-and-swap are well-known atomic primitives and can be used to build common shared memory synchronization mechanisms like lock/unlock. I am not sure if we want to go down on that path, though. More use cases of KVS will tell in the future, I think.

garlick commented 9 years ago

It might be good to open a new issue to cover your point 5, starting with specific use cases that need to be handled (you alluded to tool synchronization other than the ptrace-stop one?).

To answer the question in your point 3, KVS snapshots are not yet exposed in the API. That is part of the proposed work.

For fun you might try hooking into modules/wreck/wrexecd.c::update_job_state(), adding some code to construct a job state change event message and publish it using flux_event_encode() and flux_send(). For a topic use something like "lwj.state.lwjid", and encode the state name in the JSON payload. This looks like it would be very straightforward to do. Then in JSC, subscribe to "lwj.state." and replace your kvs watch callback with an event handler. Later if it is useful we could add in the KVS snapshot reference but this may be enough to solve the problem at hand.

The other way to solve it now would be to do as @grondo suggested earlier and turn the "state-time" keys into a log of state changes. The downside of this pointed out by @trws is then you still manage watches on each job's log, and that style leads to confusion and unnecessary overhead, although the race can now be dealt with by iterating through the log entries that occurred during "blackout".

dongahn commented 9 years ago

@garlick:

Thanks. Good that snapshot can be covered for both watch as well as other cases!

Prototyping like this on wrexec seems reasonable, which later can be factored into the newer rexec service.

State log ideas are simple and actionable now, but I would like to explore this if the above leads us nowhere.

SteVwonder commented 9 years ago

I agree with trying the events first before the state logs. If you aren't careful with the state logs, there could be a race condition where you register a watch on a key, a modification is made to the key, and then you look at the current state log of the key to check for any changes during the blackout. This will result in you handling the most recent state change since it is in the log and getting a watch callback for the same state change. You can code around this, but it doesn't seems as "clean" to me as using the events.

@garlick, the "named pipe" that you mentioned might be useful in this case. At the risk of going too far off track, could you or someone else elaborate a little bit on this idea and how it would be different from existing pub/sub or point-to-point message passing already supported by the broker?

garlick commented 9 years ago

I think actually as long as you use the value (which is a snapshot of the log in its entirety) passed in to the watch callback, as opposed to reading it again from within the watch callback, the next callback would only occur only after a new state transition has been appended. Of course you'd have to keep track of your position in the log between calls.

On named pipes, this would be an independent broker service with the following properties:

So not quite like the event stream since events are broadcast and late joiners miss out. It's different than point to point RPC because producers and consumers would be interacting via a third party (the broker service for named pipes) and only indirectly, asynchronously, and unidirectionally with each other.

trws commented 9 years ago

Several messages have popped in here since I last read, so some of this is in response to older text.

@garlick, I think having some atomic primitives is a good idea, but that we would be better off only supporting high-bandwidth primitives like atomic addition, subtraction, exchange and the like. Exchange can be used to build a lock, but it is not as dangerous as the more general compare and swap would be to scalability. To strike a bit of a balance, I think we could do some very interesting things with just the basic atomic primitives and possibly conditional variants thereof. It should be possible, for example, to build some very efficient lock-free collective data structures with an atomic operation of the form " if( target < x) target += y". No hardware supports this, but I pretty well proved last year you can make a blazingly fast non-blocking, though technically not lock-free, queue with it.

As to the named pipe service, would the semantic be that every consumer would get every message, or that each item placed in by a consumer would be consumed by exactly one consumer? I mean, would it be a named subscription event service or a distributed multi-producer/multi-consumer queue? The latter would be a nice addition in support of things like load-balancing extensions I think

@dongahn: Thinking more about the notify-control pattern, my issue in the discussion yesterday was that the only solutions proposed all resulted in rexec blocking. I agree that we need a way for tools and other support infrastructure to impose waits and synchronization, but we can do that without actually blocking the rexec service at any point. This hinges on the ability to add actions to be run as part of the job on each state transition in rexec. Instead of rexec blocking on the actions, it would asynchronously trigger the action on the job, then continue work. Now in the context of the job synchronization with JSC or with anything else could occur with impunity without impacting the scalability of the flux service as a whole, or the execution of other jobs. Does that sound like it would be able to accomplish what you want? To be clear, you would have arbitrary synchronization points available with respect to the job, they just wouldn't be able to block the execution service itself.

dongahn commented 9 years ago

@trws and @garlick: I like the idea of striking a balance. Perhaps, we can learn some good patterns from CUDA model where they have such scalable atomics between thread blocks in combination with fine-grained sync between threads within a block. -- Of course I am preaching to the choir here, but it seems there should be some good studies in CUDA to model after.

On the second comment, like I said before, I generally like the idea that a consumer can inject some code into the producer (though @SteVwonder expressed concerns over security). To me, the concept seems similar to the idea of probe points (e.g., systemtap.)

I think we will need to understand our use cases at a more detail level to see if "asynchronous" actions can capture all of the synchronization needs of JSC users. But a question I have now is, what's the harm to support both sync and async actions at fine granularity (being able to program an action for a particular job and a particular event)?

Yes, users can abuse sync actions but only on the jobs that they want to have more control. If there are some actions that cannot be formulated as an action triggered within rexec's address space, the user can simply install a sync action which then gets cleared up by the consumer's matching action.

In fact, one of the suggestions from Redhat when we had trouble scaling debugging on dynamically-linked executable (because of too many stops) was to use systemtap to sync a debugger more precisely with the dynamic loader.

If security isn't an issue, such a system with sync and async support will be a superset of the programmable sync points idea and will be much more expressible than the simple programmable wait.

Again, this is one of these things, I really want to pin down the initial use cases with runtime tools and dynamic scheduling to decide the proper scope. (#249)

Overall, your inputs are essential @trws!

dongahn commented 9 years ago

Re-reading the comment, I realized sync vs. async is a bit blur in this context because what JSC users will want is to be sync'ed up with the job not with rexec. But then, if rexec isn't sync'ed up to some extent, this service also can act on the job while the action is being operated on the job...

trws commented 9 years ago

The idea would be that the rexec service would not be allowed to advance the job to the next state until the actions on each transition complete, but that it would not itself be required to synchronize with the JSC.
In reality this is already true, it's just that some of the transitions don't have user or service-visible indirection points (as far as I know).

As far as rexec would be concerned, it could just react to completion events on each stage as it always does, sitting in the reactor dealing with other requests the entire time. The job itself, or wrappers around it or hooks in other modules or what have you, could then contain actions to be run on any given transition, and those could synchronize however they see fit without impacting the main service. It's a bit of a brain bender, but unless I've missed how these are being tied together or a design consideration that would make it unduly complicated, I think it would work just fine.

On 7 Jul 2015, at 13:44, Dong H. Ahn wrote:

Re-reading the comment, I realized sync vs. async is a bit blur in this context because what JSC users will want is to be sync'ed up with the job not with rexec. But then, if rexec isn't sync'ed up to some extent, this service also can act on the job while the action is being operated on the job...


Reply to this email directly or view it on GitHub: https://github.com/flux-framework/flux-core/issues/206#issuecomment-119331667

dongahn commented 9 years ago

I now seem to see your points completely. Yes, not advancing the state of the job is what I will need for sync, but even than only minimally. Your proposal should work and an action in this case would be "async" to the service and "sync" to the job. I like it.

garlick commented 9 years ago

@trws: re pipes: I meant a multi-consumer queue, modeled after mkfifo(3). I created issue #250 for this idea, and #251 for atomic KVS operations.

Remember we have issue #249 open for programmable sync points.

garlick commented 7 years ago

This discussion fizzled over a year ago, and while I think there are some interesting possibilities for adding atomic ops to the KVS similar to etcd transactions or redis eval, I think this discussion didn't lead to actionable work so let's close it.