opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.03k stars 1.67k forks source link

[Design Proposal] Offline Background Tasks #13554

Open linuxpi opened 2 months ago

linuxpi commented 2 months ago

Background

Offline/Background Nodes(RFC) to offload background work provides the required segregation to allow core operations(ingestion/search) to run and scale predictably on the dedicated online fleet(data nodes) while Offline nodes provisioned takes care of crunching through all the background work.

The offline nodes behave much differently from the existing Nodes in the cluster(Data/ClusterManager). They don't host any shards, not eligible to be Cluster Manager and nor can they participate in its voting. They exists to just perform background work.

Challenges

Introduction of Offline Nodes will require us to tackle multiple problems. Here are two High Level Problems we need to solve:

Apart from that, we need to consider solutions for some low level problems which arise with Offline Nodes in place.

  1. Multiple Writers for single Shard - Today, we work with the assumption that only a single writer exists for a single Shard’s Remote Segments Store. But with Offline Nodes, multiple writers can exist for Remote Segment Store.

We will share a dedicated proposal for both these low level problems.

To keep this discussion manageable, we pertain ourselves to “Merge” Background Task to begin with. Later similar approaches can be followed for other Background Tasks.

Task Management

Requirements
Proposal

To ensure the above requirements are satisfied in a system like Opensearch, any background “work” submission should go through a Persistent Task Store which acts as Queue, allowing available “work“ers to perform work at their own pace without getting overwhelmed and allowing system to smoothly handle any spikes in amount of work to be completed.

Task Store/Queue can be implemented in multiple ways and different implementations might appeal to different set of users, each constituting of different set of complexities in terms of validations, security etc. A few examples of a Queue implementation might be:

Allowing users to build their preferred implementation of Queue which is extensible enough to allowing them to seamlessly integrate and cater to their use is of paramount importance. Keeping this in mind, we provide Abstractions over Task Management which helps users leverage Offline Nodes without compromising on their internal intricacies.

class_abstraction_uml

Above Abstractions are provided as part of a new library in :libs (offline-tasks). Using the above abstractions, each user is free to plugin their own implementations of the Task Management.

A sample local implementation POC using PersistantTasksCustomMetadata is provided here. Similarly a more scalable implementation could be put in place by using a dedicated system index and building queuing logic around it.

Task Modelling

Requirements
Proposal

To run operations on Offline Nodes, we need to extract the operation out and model as a “Task”. This allows independent execution of these operations without the unnecessary dependencies/components they are entangled with today.

To achieve that, each operation, ex Merge, Snapshot, RemoteGC etc., would be extracted out as independent Task Unit in a separate Module/Plugin, preferably in dedicated Repository, to allow independent development, maintenance and release. Each Offline Node installs the configured Task Modules/Plugins to run the submitted Tasks.

There is an ongoing effort to break apart the Opensearch :server monolith, proposed in #5910, targeting to refactor the :server monolith and move out components to :libs :modules and :plugins. Since the vision with our proposal is to achieve a segregation b/w core and background operations, we can align with the ongoing refactoring effort and strive to refactor and move out all Merge Components to a separate Module or a Plugin.

Here is an example, using Segment Merges, how the above abstraction would work in an Opensearch Cluster with and without dedicated Offline Nodes.

NOTE: The actual interfaces of Merge may look different once https://github.com/opensearch-project/OpenSearch/issues/12726 completes, but the interaction and extension points for Task Management remain intact.

class_merge_uml(1)

We can leverage an existing construct, MergeScheduler which handles all Merges triggered by the Index Engine. Once the Engine asks MergeScheduler to schedule a merge, the MergeScheduler injected by our Merge Module would decide whether to perform this merge locally on the data node, or to send to Offline Nodes via TaskClient .

This decision making in MergeScheduler.runLocally() can be tweaked dynamically via few cluster level dynamic settings we register as part of the Merge Module.

The MergeModule would be installed on both Data Node and Offline Node. On the Offline Node it registers the MergeTaskWorker to the BackgroundTaskService. BackgroundTaskService coordinates the execution of various Tasks which are to be executed by the current Node.

The Merge Task sent by the DataNode via TaskClient, eventually is assigned to one of the Offline Nodes for execution and sent to BackgroundTaskService on that particular node.

Architecture

The lifecycle of a Task since its conception to its completion can be divided into 4 main stages:

  1. Creation/Submission
  2. Distribution
  3. Execution
  4. Completion

merge_flow_uml

BackgroundTaskService runs on each Offline Node, and periodically checks for available Tasks. TaskClient allows Nodes to interact with TaskStore/Queue to perform various Task Management Operations.

Task Submission - For Segment Merges, MergeScheduler on the Data Node decides to offload Merge operation for a particular Shard to Offline Nodes. Same TaskClient is provided at Data Node to enqueue a new Task to our TaskQueue/Store for Offline Nodes to pick up and execute asynchronously. TaskClient hides all the implementation details related to TaskQueue/Store

Once the Task is submitted successfully, BackgroundTaskService on Offline Node receive this new Task, eligible to be executed. Since multiple nodes could have received this Task for execution during polling, we need to ensure no 2 Nodes start to execute it parallelly, duplicating work. To overcome this, each Node which received the Task will try to ClaimTask and our Queue Implementation would ensure no 2 Nodes are able to Claim the same Task. Each implementation of Queue would handle this differently, for ex: Single Threaded Cluster Manager in PersistentTaskCustomMetadata, or conditional writes in Apache Cassandra etc.

TaskWorker - After the Merge Task gets picked successfully by a Node, BackgroundTaskService find the registered TaskWorker for the particular TaskType . For Merge Task, MergeTaskExecutor is invoked to perform the merge operation.

Hearbeats - During the Execution of Task, a periodic heartbeat is published to the TaskStore/Queue. This ensures the Node is still actively performing the Task and has not Abandoned it due to any Failure. If due to any Failure, the Node is not able to publish the heartbeat for X minutes, the Task is considered Abandoned and is eligible to be picked up by another Offline Node.

Task Completion - Once the Task completes, it updates the Task status in the TaskStore/Queue and Publishes a Task Completed Event. So that the Listeners at Data Node can know about completion of the Task. Since all this is asynchronous communication, due to transient failures, Completion Event might be lost and the submitter Data Node might never get to know about Task Completion. This happens today as well with any ClusterManager Event Listeners.

Each flow needs to handle such failures by either checking the Task State via TaskClient or handle redundancies in TaskWorker implementation to avoid same Task to be executed again.

Other Considerations

Additional Cost

There is obviously an added cost, which would be directly dependent on the no of Offline Nodes provisioned(which would offset somewhat since resources are freed up on data nodes). Apart from that, with Offline Nodes, there would be 2 additional downloads. Consider Segment Merges:

  1. Offline Node would have to download the Segments to be Merged, today since the segments are already present in local, there is no download needed.
  2. Once the Merged Segments are uploaded to Remote Store, the data node with corresponding Shard would download those merged segments

Not all the users would want to spin up separate nodes for background operations, so however we choose to implement/execute this, we would ensure status quo is maintained.

Resource consumption and throttling

Each Offline Node would monitor its resource consumption to be informed about current load at the node and would pick Task only if there are enough Resources to execute the Task. If not, it waits for currently executing Task to complete and free up some resources.

Related component

Storage

linuxpi commented 1 month ago

Meta Issue - https://github.com/opensearch-project/OpenSearch/issues/12725