bacalhau-project / bacalhau

Compute over Data framework for public, transparent, and optionally verifiable computation
https://docs.bacalhau.org
Apache License 2.0
641 stars 85 forks source link

Handling Ungraceful Terminations of Log Streams on NATS #4003

Open udsamani opened 1 month ago

udsamani commented 1 month ago

How does Bacalhau implement log streaming using NATS ?

Untitled

Untitled (1)

In the process of implementing log streaming using NATS in Bacalhau, several steps occur. When a user submits a specific request, such as bacalhau logs 963c5972-9ffc-4984-bd8c-6123cadb5fc1, the system triggers a series of actions.

The first action is initiated by the requester node, which publishes a request to retrieve the logs for a certain job. This request is made via the node subject, which in this case is node.compute.node-3.ExecutionLogs/v1.

The message that is sent through this subject has a specific structure. It includes a subject, a reply, and data. The subject is node.compute.{nodeId}.ExecutionLogs/V1. The reply is _SINBOX.GwLlKanYanG8WcN0tZOOhh.KK7F9jas. The data section contains several pieces of information.

{
    "subject":  "node.compute.{nodeId}.ExecutionLogs/V1"
    "reply":  "_SINBOX.GwLlKanYanG8WcN0tZOOhh.KK7F9jas"
    "data": {
        "SourcePeerId": "node-0"
        "TargetPeerId": "node-3"
        "ExecutionId" : "e-1d4c360f-549b-42d1-9034-31bc390cb68"
        "Tail": "false"
        "Follow": "false"
    }
}

The sourcePeerId is the identifier of the node that initiated the request. The TargetPeerId is the identifier of the node that the request is targeting. The ExecutionId is the unique ID of the execution event that the logs are being requested for. The Tail and Follow fields are flags used to control the behaviour of the log fetching process.

Upon receiving this message, the compute node begins its operations. It responds to the incoming message on the reply subject, _SINBOX.GwLlKanYanG8WcN0tZOOhh.KK7F9jas. The compute node's response is determined by the type of execution engine it employs.

Based on this, the compute node retrieves the corresponding logs. Once the logs have been successfully accessed, the compute node dispatches them as messages on the reply subject. This process ensures that the logs are effectively streamed back to the requester node.

What is missing with current implementation ?

There are two primary concerns with the current implementation of log streaming using NATS in Bacalhau:

  1. What happens if the requester no longer needs the logs?

    This is a significant issue that needs to be addressed. The current system doesn't account for the possibility that the requester might not need the logs after a certain point. If this occurs, the compute node will continue to publish messages on the subject, even though the requester node no longer requires this information. This leads to an unnecessary consumption of resources, as the system is essentially producing and transmitting data that no one is consuming. This scenario could lead to inefficiencies in the system.

  2. What if the requester node goes down?

    This is another scenario which needs to be addressed. If the requester node goes down, the compute node has no idea about it and would unnecessarily keep on publishing the logs to the inbox subject.

  3. What if the compute node goes down, i.e., there's no publisher on the reply subject?

    This is another problematic scenario that the current system does not account for. If the compute node, for whatever reason, goes down, the requester node is left with an open stream, waiting for logs that will never arrive. This would be a bad user experience as the user would be waiting for logs when the logs cannot arrive due to the compute node being down.

Heart Beat Signals

In order to solve the above problems we take the approach of 2-way heart beat signals.

The requester and the compute node would send heart beats to a common subject. If during certain time duration if no heartbeat is received :

Then the requester node will close all streams being server by the particular compute node.

The compute node will stop sending messages for all inbox subjects related to that requester node.

We do not cover the case of Requester node no longer needing the logs. It can be later taken as an optimization of this approach.

### Tasks
- [ ] https://github.com/bacalhau-project/bacalhau/issues/4025
- [ ] https://github.com/bacalhau-project/bacalhau/issues/4026
- [ ] https://github.com/bacalhau-project/bacalhau/issues/4028
- [ ] https://github.com/bacalhau-project/bacalhau/issues/4037
- [ ] https://github.com/bacalhau-project/bacalhau/issues/4067
wdbaruni commented 1 month ago

Nice work documenting the issue and the current flow. Few comments about the heartbeats flow:

  1. A network can have thousands of compute nodes managed by a single or few requester nodes. We shouldn't have the requester node heartbeat to all compute nodes about its active log streams
  2. An alternative would be to only have the compute nodes heartbeat to the requester using a request/response call with a timeout set to single digit seconds and shorter than the heartbeat window. The heartbeat request will contain the IDs for the logstreams the compute node is serving, and the response from the requester should contain the logstream IDs the requester is no longer interested in.
  3. Keep in mind that we already have a heartbeat mechanism to track the liveliness of the compute nodes. This is a different heartbeat and should only kick in if the compute node have open logstreams.