JuliaLang / Distributed.jl

Create and control multiple Julia processes remotely for distributed computing. Ships as a Julia stdlib.
https://docs.julialang.org/en/v1/stdlib/Distributed/
MIT License
27 stars 10 forks source link

Heartbeats between master and worker processes. #18

Open amitmurthy opened 11 years ago

amitmurthy commented 11 years ago

Heartbeats between master and workers have been mentioned before. Seeing https://github.com/JuliaLang/julia-ipython/issues/8 triggered it again.

Suggesting a design :

The whole heartbeat mechanism can be switched on via a command line argument to the julia executable (default is off)

It seems workable enough and we don't need the admin thread to listen on a different port. Do let me know what you guys think.

ViralBShah commented 11 years ago

@JeffBezanson This seems like a post 0.2 thing. Thoughts?

Let's keep thisPR around.

Keno commented 11 years ago

This is definitely not in scope for 0.2

amitmurthy commented 11 years ago

This is not yet a PR. Will convert it into a PR if folks are OK with the general design.

malmaud commented 8 years ago

Hey @amitmurthy , I'm thinking of trying to make a PR for heartbeat functionality based on the new message-passing idiom. Would that be stepping on your toes? Do you have any additional thoughts about heartbeat support since this issue was created?

amitmurthy commented 8 years ago

Would that be stepping on your toes?

Perish the thought. No issues whatsoever.

I would initially focus just on the distributed "progress meter".

Removing failed from the cluster currently happens when the remote socket connection is closed. This workers pretty well in most cases - the exception being when different transports are used (which is not very widespread).

Restarting workers is not straightforward since we have to recreate worker global state if any.

Reconnecting in the event of broken connections (due to serialization/deserialization) errors can and should be implemented. This would require

Note : I am currently traveling and will not have reliable internet over the next few days - my responses may be delayed.

malmaud commented 8 years ago

OK, that makes sense.

But wouldn't it to be simpler to just not close the connection in the first place in the event of a serialization error? If a worker can't deserialize a message from pid1 (ie, deserialize(r_stream) throws), why not send a message to that effect back to pid1 instead of exiting?

I guess I'm not really understanding why exceptions caused by evaluation of the thunk the worker has been told to evaluate should be treated so differently than exceptions caused by deserializing pid1's message.

(Understand you're traveling and am not expecting an immediate response)

malmaud commented 8 years ago

Oh, is it because the state of the stream is then in a corrupted state so serializing/deserializing messages over it is no longer possible?

In that case, one unifying solution might be to have a second stream to each worker, an out-of-band "status" stream. It could both be used for heartbeats and for indicating that the main message stream is no longer usable because of a serialization error and has to be reset on both sides.

amitmurthy commented 8 years ago
malmaud commented 8 years ago

Ya, I see that out-of-band would have to be on a separate thread. IJulia has example multithreaded heartbeat code (https://github.com/JuliaLang/IJulia.jl/blob/3a1ad9eac8c259c79a6a1c09342eac262b16f34b/src/heartbeat.jl). It might require a new C function somewhere in src. And at least in my testing, using a Julia function that ccalls sleep and then writes to a socket in a loop works fine, although maybe there's something unsafe about that which haven't shown up yet.

I see the complexity of dealing with a deserialization error. If we're throwing out possible schemes, we can take inspiration from http multipart encoding -

1) to put a message on the wire, first write a random boundary token, 2) then write the serialized message, 3) then write the boundary token again.

The receiver first reads the token, then deserializes the message. If it encounters an error during deserialization, it just reads the bytes from the stream until it finds the boundary token so it can start processing the next message.

ssfrr commented 8 years ago

If you're interested in a way to frame messages in a stream I've always been a fan of COBS, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.

In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish method that would delimit the frames. Sounds like fun.

malmaud commented 8 years ago

That looks cool, but why not just use a random 10 bytes as the frame delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell notifications@github.com wrote:

If you're interested in a way to frame messages in a stream I've always been a fan of COBS https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.

In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish method that would delimit the frames. Sounds like fun.

— Reply to this email directly or view it on GitHub https://github.com/JuliaLang/Distributed.jl/issues/18.

malmaud commented 8 years ago

Proposed implementation here for reference: https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228

On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud malmaud@gmail.com wrote:

That looks cool, but why not just use a random 10 bytes as the frame delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell notifications@github.com wrote:

If you're interested in a way to frame messages in a stream I've always been a fan of COBS https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.

In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish method that would delimit the frames. Sounds like fun.

— Reply to this email directly or view it on GitHub https://github.com/JuliaLang/Distributed.jl/issues/18.

malmaud commented 8 years ago

It also doesn't seem like it would work here because it needs to know the number of frame bytes in the message before encoding the message, but we don't have the length of the message since it's being written directly to the socket.

On Tue, Oct 27, 2015 at 7:56 PM Jonathan Malmaud malmaud@gmail.com wrote:

Proposed implementation here for reference: https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228

On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud malmaud@gmail.com wrote:

That looks cool, but why not just use a random 10 bytes as the frame delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell notifications@github.com wrote:

If you're interested in a way to frame messages in a stream I've always been a fan of COBS https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.

In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish method that would delimit the frames. Sounds like fun.

— Reply to this email directly or view it on GitHub https://github.com/JuliaLang/Distributed.jl/issues/18.

ssfrr commented 8 years ago

I'll preface this by saying that I'm not entirely convinced either that this is a better way to go, and it's possible that my ignorance of the larger context is getting in the way. Here are a couple of things that concern me with the random delimiter scheme though:

  1. though the chance of a collision is almost infinitesimally small, it's nonzero and I imagine lots and lots of these messages getting sent. I'm not sure what the target is here for failure probability, but this is probably just me being pedantic.
  2. if the process writing the frame gets borked in a way that prevents it from writing the closing delimiter, the stream never recovers. This might not be a failure mode that we need to worry about though.

Re: needing to know the number of frame bytes: that's why you'd need the 254-byte buffer on the writer side. You only need to lookahead for a 0x00 up to 254 bytes because if it's longer than that you just put 0xff as the frame byte and send the next 254 bytes as-is.

Anyways, mostly I just wanted to throw it out there as an alternative, not advocate to change course, so if these points aren't convincing I think the random delimiter is also a good way to go.

ssfrr commented 8 years ago

reading my comment, "code byte" is probably a better term than "frame byte". The frame delimiter is a literal 0x00, and the code byte can happen many times within the frame, each time indicating the number of nonzero bytes following.

malmaud commented 8 years ago

Oh ya, I see. Do you mind if I implement it before you? Now I'm excited about it.

ssfrr commented 8 years ago

I wouldn't be able to get to it until the beginning of next week, so go for it!

StefanKarpinski commented 8 years ago

That's a clever way to do this.

malmaud commented 8 years ago

Alright, https://github.com/JuliaLang/julia/pull/13795 can now recover gracefully from deserialize failures by keeping a log of ACKed messages and using frame delimiters to reset the message stream to a working state.

amitmurthy commented 8 years ago

Why do we need both? Won't just using frame delimiters be enough? As for the possibility of a (extremely unlikely) collision, it is to be noted that it exists only in the failure case, i.e., upon a deserialization error.

Chances of errors during serialization have reduced after https://github.com/JuliaLang/julia/commit/77b2527fd91878d69d69908b60c29d977d2f8e92

malmaud commented 8 years ago

Well, you want a way for the worker to signal to the client that a deserialization error happened, and to associate that signal with a specific message. The worker doesn't know the response_oid of the client's Call msg because that was part of the garbled message. So messages need to have a unique ID and senders need to remember the association between message IDs and messages at least until a successful Ack comes back.

Plus I'm hoping it might lay some groundwork for other applications that take advantage of a message log, such as debugging of the communication between workers and maybe eventually replaying messages to get workers into a known state.

On Wed, Oct 28, 2015 at 11:53 PM Amit Murthy notifications@github.com wrote:

Why do we need both? Won't just using frame delimiters be enough? As for the possibility of a (extremely unlikely) collision, it is to be noted that it exists only in the failure case, i.e., upon a deserialization error.

Chances of errors during serialization have reduced after 77b2527 https://github.com/JuliaLang/julia/commit/77b2527fd91878d69d69908b60c29d977d2f8e92

— Reply to this email directly or view it on GitHub https://github.com/JuliaLang/Distributed.jl/issues/18.

amitmurthy commented 8 years ago

It may be better to split the message into a "header" and "body" and deserialize in two steps. First we deserialize the header (which should never fail - if it does fail, treat it as a fatal error) and then the function/args. Errors while deserializing function and args are treated the same as errors while executing f(args), i.e., send back an appropriate RemoteException.

malmaud commented 8 years ago

Makes sense. If the header is going to be message-agnostic, there'd need to be a bit of refactoring of the message types so that response_oid is part of the header instead of the message bodies, and just won't have a meaning for message types that don't currently have a response_oid.

Otherwise each message type could have its own corresponding header type, but that seems a bit cumbersome.

On Thu, Oct 29, 2015 at 12:14 AM Amit Murthy notifications@github.com wrote:

It may be better to split the message into a "header" and "body" and deserialize in two steps. First we deserialize the header (which should never fail - if it does fail, treat it as a fatal error) and then the function/args. Errors while deserializing function and args are treated the same as errors while executing f(args), i.e., send back an appropriate RemoteException.

— Reply to this email directly or view it on GitHub https://github.com/JuliaLang/Distributed.jl/issues/18.

amitmurthy commented 8 years ago

Yes. The header needs to contain just the response_oid which identifies the RemoteRef that stores the result and notify_oid that identifies the ref that the caller task (on the sender side) is waiting on. They may be nothing in some cases where a matching response is not expected.

malmaud commented 8 years ago

Alright, https://github.com/JuliaLang/julia/pull/13795 implements the header mechanism.