Closed theodore-ando closed 4 years ago
Some notes from our discussion on this issue:
STATUS_REQUEST
message sent by the forwarder when the task queue for the endpoint was empty.running
to one of the waiting
states (this can happen legitimately when EP dies)There are some scaling issues that we need to think about. We want to avoid sending overly much information all the time through these HBs. We should avoid hacking/bodging some fixes for scaling issues by placing burden on wrong component. These are the things we considered here:
Here's some additional rationale for the current design:
One way heartbeats do not inform both parties about the availability of the other. If there are no ACKS to the heartbeat, it is possible that the interchange might not be able to distinguish no new communications vs the loss of availability of its counterpart. Secondly we generally assume higher availability for upper layers, and therefore we initiate heartbeats from the upper layer to the layer below. In our case we definitely can assume cloud hosted forwarder are more available that cluster endpoints that often go down for maintenance every other week.
This brings us back to our current design that has the forwarder sends a status request when it has no more tasks to push, and the interchange acks with a status report. The status report acts as a heartbeat. One risk here is that if tasks arrive continuously at the heartbeat interval, we would never send a status_request. We could modify this to always send a status_request once the heartbeat period has elapsed.
Ok, so we definitely need the forwarder and interchange to ensure each other's liveness.
HEARTBEAT_INTERVAL
. HEARTBEAT_INTERVAL
we will send a "soft" HB ("soft" = not guaranteed to happen every interval, only if we haven't sent recently). Importantly, this is a slight change on the previous soft heartbeat implementation because the two communications are completely decoupled: there is no pairing of request and response for which we might need to track sequence numbers. We are guaranteed ordering of messages by ZMQ, so we don't need to worry about pairing up which status report goes with which status request. We just need each side of the communication to ensure that the other is alive.
To track task states at the granularity we desire, we need not only to send information from the Interchange to the Forwarder, but from the Manager to the Interchange. With a basically similar reasoning as above, we will send regular task status deltas every HEARTBEAT_PERIOD
from the manager to the interchange. This will tell tell the interchange that the task is in the running
state, i.e. it has been passed to the actual worker.
Much of the new messaging related stuff can be found in funcx.executors.high_throughput.messages
.
Messages sent over zmq are converted to bytes, and are broken down into up to three pieces, a message type, optional header, and optional payload.
New message classes implementing the Message
interface must implement the pack
and unpack
methods which allow for easy conversion to and from bytes
.
pack
is called on objects that have implemented the Message
interface, so if you have, e.g. an EPStatusReport
object, you can call report.pack()
to convert it to a byte array.unpack
is called from the Message
class on a byte array. This will read the first byte to determine the type of message, then pass the remainder of the bytes to the appropriate class's unpack
method. The TaskStatusCode
enum defined in the messages module does not enumerate all of the task statuses that we might want. Moreover, the values for the statuses are integers. This is because on the endpoint, we don't need to know about the received
or waiting-for-endpoint
statuses, and we also want an efficient encoding.
The TaskStatusCode
s are converted to the TaskState
enum type via status_code_convert
in the tasks
module on the web side. The TaskState
includes the other states, as well as more human interpretable values.
ResultIncoming
queue now also receives task status updates, and the get
method first attempts to unpickle the message (how messages used to be sent) before attempting to use the new Message
unpacking. The task status updates which will always be sent out at heartbeat period is now the method for checking interchange liveness.executor.heartbeat()
sends the new Heartbeat
message to the interchange on the TasksOutgoing
queue. The heartbeat is async and does not have an ack from the interchange.executor.wait_for_endpoint()
now uses the HeartbeatReq
message for a synchronous heartbeat.heartbeat()
every heartbeat period.Message.unpack
, if that fails, it processes same as it used to. Only Message
this can receive right now is a Heartbeat
. WAITING_FOR_NODES
._command_server
thread, only command which was actually used was the synchronous HeartbeatReq
. A bunch of dead code were removed, with a todo placed to remind us to in the future maybe implement these. _status_report_thread
that every heartbeat period, creates an EPStatusReport
message with the task status deltas, as well as endpoint-wide stats, collected from Interchange.get_status_report
. This is placed on status_report_queue
status_report_queue
and place any status report there on the result outgoing zmq channel (this is for thread-safety, so that multiple threads aren't using same zmq socket). WAITING_FOR_LAUNCH
ManagerStatusReport
, if it is, then we use that to update
the status delta dict. We update the last
time a message was received from that manager. All other messages in batch are processed in old way. ManagerStatusReport
, we immediately send back a heartbeat to the manager so that the manager knows of the Interchange liveness.task_incoming
channel when Interchange hadn't communicated in a while._status_report_thread
which every heartbeat period, places task status deltas (should basically just be that the task is now actually running
) on the pending result queue.push_results
thread now ensures that if there is a manager status report to be sent to interchange, it is always the initial message in its batch of messages.
We would like have finer-grained information about task lifecycles, mentioned here. We would like to have the following states that a task can be in:
In order to distinguish between "waiting for nodes", "waiting for launch", and "running" we need to be sending more information through the interchange and back to the forwarder. To this end, I propose that we should be sending regular, explicit heartbeat messages every
HEARTBEAT_INTERVAL
(e.g. 30 seconds). These heartbeat messages should carry endpoint status information such as available capacity, as well as per task status. Namely, it should have changes in task status where:These two criteria help to ensure that we are not being forced to send too much data over the network because
HEARTBEAT_INTERVAL
likely never have their status updatedrunning
state for multiple heartbeats, then we aren't sending it.