opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.14k stars 1.69k forks source link

[RFC] "Extension" node role to support extensible workload #1119

Open rishabhmaurya opened 2 years ago

rishabhmaurya commented 2 years ago

Is your feature request related to a problem? Please describe. Currently OpenSearch nodes are used for multiple jobs in a cluster. They can be used for indexing, searching, managing cluster state, running ingest pipelines and more. Users running OpenSearch at scale will partition these roles into different node types: data nodes get used for indexing, searching, and running custom plugin logic, master nodes get used for cluster state management, and ingest nodes are used for preprocessing documents before the data nodes index them. This model works well in many situations. However, when running custom plugins that consume significant resources (i.e. ML models and other data analysis plugins), data nodes can see resource contention as memory and CPU pressure increases. In addition to the resource contention risks, custom plugins are run inside the same JVM context as the data node. By running in the same context, the custom plugins have full access to the data in the data node and they have the same permission level as the OpenSearch executable. Some users may want more codified sandboxing when running custom code.

As a potential solution to this problem my team is exploring the concept of dedicated extension nodes that are built specifically to run custom plugin logic and provide both resource and security isolation from data nodes. Additionally, decoupling these workloads will provide implicit benefits such as independent scaling, fault isolation and independent deployment. We are looking for feedback and to discuss this potential solution.

Describe the solution you'd like

This document proposes a new node role in OpenSearch - “extension node”. This proposal is applicable for plugins or other use-cases where dependency on cluster state is minimal and limited to pulling the latest value of some user config or settings. It will have following properties -

  1. They are not directly part of the OpenSearch cluster and thus the full cluster state will not be synced on these nodes.
  2. The jobs running on these nodes can subscribe to set of fields in cluster state. The master will asynchronously sync the latest values of these fields to all extension nodes.
  3. They will have access to all TransportActions available in the cluster (both default and added by other plugins).
  4. These nodes will not be connected to each other.
  5. Each node will be bootstrapped with the featureTags of the extension plugins which can run on it. So if there are multiple extension nodes connected to the cluster, each of them can serve different extension plugins. This heterogenous model provides flexibility in scaling the extension node fleet depending on the workloads and their resource consumption.
  6. They cannot behave as a coordinator node of opensearch cluster and will not directly host the RestController and API handlers. In case these jobs are introducing a new Rest APIs, they will be registered on regular cluster nodes and request would be forwarded to corresponding TransportAction hosted on “extensions” node.
  7. These nodes are not supposed to handle authentication. They can handle authorization and can be integrated with security plugin as far as the user info in passed to them while calling any transport action.

Plugins breakdown in OpenSearch

Plugins in opensearch can be categorized, on the basis of their role in the system, into following categories -

  1. Core plugins: One which enrich the capabilities of core opensearch services. Usually, these plugins are required to setup the opensearch cluster in a desired way and enrich the behavior of core opensearch services. E.g. Analyzers, Network, Auth, Mapper, Search, Repository plugins.
  2. Peripheral plugins: One which use these core opensearch services to provide additional capabilities. These plugins are never required to setup clusters or impact the behavior of any core opensearch services. E.g. Alerting, Index Management, AnomalyDetection etc.
  3. Hybrid plugins: Some plugin packages may contain both category of plugins. These can be broken down into Core and Peripheral plugins components.

There is a subtle difference in 1 & 2, the former is an integral part of the system and is a dependency for cluster setup or desired behavior of core services. Whereas, latter behaves more like a positive parasites in the body, they use the internal services of the system and provide enhancements, but are not a necessity to the system.

Architecture

Tenets

  1. OpenSearch cluster availability shouldn’t depend on availability of plugins node.
  2. Only lightweight state transfer between OpenSearch cluster and extensions node, to execute any transport action or event payload transferred to extension nodes. Full cluster state should not be synced onto the these node. If the extension plugin is dependent on some index metadata/cluster setting, its size should be minimal and should not change frequently.
  3. All features offered by the new framework should be optional in OpenSearch i.e. the same extension plugin should run without extension nodes attached to the cluster.
  4. Migration of existing plugins should not require any business logic change.
  5. These nodes are not directly exposed to end user and should not act as a coordinator node of OpenSearch cluster.

New Components

ExtensionClient (on extension node)

ExtensionState Extension plugins can configure this class and subscribe to required part of cluster state which will be synced by the ClusterApplierService (running on master node) asynchronously whenever there is a change in subscribed cluster state.

Note: State is not final, depending on the use-cases we can add more fields to it. ExtensionState doesn’t needs to be persisted separately on cluster nodes as it can be generated from the cluster state.

ExtensionTransport Service (on both extension and cluster node) This service will have following responsibilities when running - On cluster node -

On extension node -

ExtensionStateSynchronizer Service (on master node) Whenever there is a change in the ExtensionState of a given featureTag, ExtensionStateSynchronizer will publish the diff in the ExtensionState to all the nodes registered in the ExtensionTransportService for a given featureTag. Once published, it will also update the cluster state with the Latest committed extensionState version.

ExtensionEvent Service (on extension node) It processes all diffs received in the ExtensionState and calls all consumer listening to changes in metadata of cluster settings (existing or new ones introduced by extension plugin).

Bootstrapping process

Bootstrap

Green denotes new services Yellow denotes existing services

ExtensionNode

The extension node will only bootstrap minimal set of services required to support extension plugins. It will not start Index, Search, Cluster or any other services which are dependent on them. It will also bootstrap new services described above and will come up with ExtensionClient as the NodeClient for the node. It will only load the plugins which are associated with the featureTags configured in the .yml

PluginService:
PluginService will install all plugins which are installed on the cluster but will only load and use the essential components from these plugins which are needed by the services running on extension node like - ScriptPlugin, CircuitBreaker Plugin etc. And the ExtensionClient will need XContentRegistry, NamedWritables etc to call transport actions added by other plugins. Since, there will be no RestHandlers hosted on the extension nodes, the ActionModule will just register the TransportActions from extension plugins using PluginService.

ExtensionTransportService will be bootstrapped first in order to send a JoinRequest to the cluster. Once connected, ExtensionClient will be usable and it can proxy all TransportActions to the cluster nodes. Also, it will start receiving updates on ExtensionState for all featureTags. Once the extensionState is available, extension plugin will become fully functional.

Cluster Node All RestHandlers from extension plugins will be registered in the ActionModule of cluster nodes just like other RestHandlers. Cluster nodes should also be aware of all new TransportAction Request and Response classes. TransportAction execute() will just simply forward the request to one of the extension node using ExtensionTransportService. ExtensionTransportService will keep adding connection to extensionNode dynamically as it will receive updates in the cluster state about new extension node.

Master Node Master node will come up with 2 new service - ExtensionStateSynchronizer **** and ExtensionTransport service. It will also receive JoinRequest from extensionNodes. It will also maintain the state of connection in cluster state and publish it to other nodes in the cluster. ExtensionStateSynchronizer will be bootstrapped and will keep checking the diff in the extensionState against each of the featureTags, and will publish the diff to the extension nodes asynchronously and will update the extensionState version in clusterState.

Discovery

Discovery

Request flow

Diagram shows the request flow of new APIs added by the extension plugin and it also shows how ExtensionSynchronizerService would be listening and synchronizing the ExtensionState.

RequestFlow

red → denotes flow for extension state updates and events black → denotes REST APIs request flow

Limitations

  1. Extension nodes cannot support core plugin described above. So plugins like analyzers, mapper, search, repository cannot explicitly run on these nodes (check [Future enhancements section]).
  2. They cannot support any workloads which depend on or require full cluster state transfer.
  3. Migrating any workloads from regular cluster nodes to these nodes, if results in significant increase in state/data transfer between nodes, should be avoided. E.g. listening on indexing events which contains document as the payload of the event, is not a good use-case for extension nodes. Whereas, if its shard query results transfer, which happens even today between coordinator and other nodes of the cluster handing shard request, is good to have on extension nodes.

Existing plugins which can be migrated to this model

Alerting: https://github.com/opensearch-project/alerting Alerting doesn’t depends on the cluster state heavily and just uses it to fetch the metadata associated with system index it creates. It uses NodeClient to perform search and indexing. And adds new cluster settings and APIs in the cluster. This makes alerting plugin a perfect use-case to migrate onto this model.

TODO - evaluate feasibility of - Anomaly detection: https://github.com/opensearch-project/anomaly-detection
Index Management: https://github.com/opensearch-project/index-management SQL: https://github.com/opensearch-project/sql Reporting: https://github.com/opensearch-project/dashboards-reports

Open questions in implementation

  1. Since publishing of extensionState onto extensionNodes will not be a blocking call unlike publishing of cluster state to cluster nodes, so if extensionState is synchronized asynchronously at the time of API call, it can add to the latency of extensionPlugins API OR it can lead to availability issue if the request is getting rejected because of extensionState not being latest on the destination extension node and state is synchronized asynchronously by some background service.

The problems above may not be too prominent as we are not expecting changes in plugin specific system indices metadata frequently.

Future enhancements

Network call could be expensive for some of the use-cases - like Analyzers, where a TokenStream is created for each document to be indexed or a search query, transitioning them to a separate node and a network call, with payload as big as document size, in critical path of indexing/search could be too expensive. Thus core plugins like Analyzer plugins are not good use-cases for this model. Since these processes can run on the same node as opensearch process as its just a logical separation with communication happening via TransportChannel of opensearch, so instead of depending on the TransportChannel for communication, can an optimized IPC be replaced with TransportChannel? Communication channel can be abstracted in opensearch depending on whether other node is present locally or a remote connection is needed. If remote, regular TransportChannel would be used, otherwise IPC such as shared memory can be used to optimize on latency for use-cases such as Analyzers. This way even core plugins can be moved onto this model with lesser impact on the latency. It may require some code changes in the way analyzer registries are created and analyzer factories are initialized and directly used, instead a new Action has to be defined for anything to be migrated onto this model.

Appendix

  1. How are these nodes different from the coordinating only nodein elasticsearch? Coordinating only nodes are part of the cluster and master syncs the full cluster state onto them. They also run most of the services which runs on other nodes of the cluster. “Adding too many coordinating only nodes to a cluster can increase the burden on the entire cluster because the elected master node must await acknowledgement of cluster state updates from every node! The benefit of coordinating only nodes should not be overstated — data nodes can happily serve the same purpose.”
  2. Why do we need these nodes and not just use TransportClient to run such extensible workloads? Extension nodes are capable of much more than just being a client and executing APIs. They can add listeners for part of cluster state and can consume these events. Also, extension plugins can add new APIs to the cluster.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

spbjss commented 2 years ago

One open question: How did you define the solution? This doc only confirmed that the Alerting plugin can be migrated to the extension model. Probably it’s better to define the solution after checking the behavior/architecture of other peripheral plugins, and make sure the solution could support most of the peripheral plugins.

spbjss commented 2 years ago

Several more questions:

  1. How much effort does it need to migrate an existing plugin to this extension feature?
  2. Would this extension feature manage the workload balance across the extension nodes?
  3. More specific numbers on the limitation would be very helpful. (e.g. If the extension plugin is dependent on some index metadata/cluster setting, its size should be minimal and should not change frequently. minimal = ?, frequently = ?)
rishabhmaurya commented 2 years ago

One open question: How did you define the solution? This doc only confirmed that the Alerting plugin can be migrated to the extension model. Probably it’s better to define the solution after checking the behavior/architecture of other peripheral plugins, and make sure the solution could support most of the peripheral plugins.

You're right! But we are ready to have an open discussion on what it takes to make changes to this model to support most of the use-cases. Let's list all constraints which are too strict here and are preventing migration of any peripheral plugin and discuss ways to support them.

ps48 commented 2 years ago

I have a few questions particularly related to plugins like Notebooks:

  1. Notebooks uses it's own index for storing documents, it's not just using some metadata. Hence, it will need full access(create, read, write) over the subscribed index. How is this handled by extension node?
  2. I understand the transport APIs are taken care by transport action handler in extension node. But, Is there any effect on REST APIs provided by dashboard-plugins?
  3. Notebooks plugin uses the OpenSearch Dashboard Status API to check if the reporting plugin is available, Will this API endpoint be still accessible?
  4. For a plugin like SQL/PPL which needs access to mostly all indices, how is the subscribed indices part handled?
ylwu-amzn commented 2 years ago

1.They are not directly part of the OpenSearch cluster and thus the full cluster state will not be synced on these nodes. 4.These nodes will not be connected to each other.

For AD plugin, we need to distribute load between multiple nodes. Currently we depends on cluster change event to build hash ring and distribute load. After moving AD to extension node, AD needs to get similar change event for all extension nodes, like new extension node joins, node left cluster. These nodes should be able to connect to each other, so we can choose one extension node as coordinating node and distribute models/tasks to different extension nodes with AD feature tag.

Several plugins (Alerting/AD/IndexManagement) have periodical job built on top of JobScheduler plugin and JobScheduler plugin rely on job index event to schedule/de-schedule job. So I guess JobScheduler will be core plugin? Currently JobScheduler starts job on data nodes which has shard of job index which means job can only start on regular cluster data node rather than extension nodes. Do we have plan to migrate all such jobs to extension node?

5.Each node will be bootstrapped with the featureTags of the extension plugins which can run on it. So if there are multiple extension nodes connected to the cluster, each of them can serve different extension plugins. This heterogenous model provides flexibility in scaling the extension node fleet depending on the workloads and their resource consumption.

This model can shift memory/CPU load from regular cluster node, but seems it can’t shift the load on searching/writing index, right?

rishabhmaurya commented 2 years ago

For AD plugin, we need to distribute load between multiple nodes. Currently we depends on cluster change event to build hash ring and distribute load. After moving AD to extension node, AD needs to get similar change event for all extension nodes, like new extension node joins, node left cluster. These nodes should be able to connect to each other, so we can choose one extension node as coordinating node and distribute models/tasks to different extension nodes with AD feature tag.

I have been thinking about this use-case and thanks for bringing this up. Extension nodes not talking to each other shouldn't be a hard constraint, they should be allowed to establish connection to desired extension node as required. Also, the global extension state, which is maintained by opensearch cluster leader, contains list of active extension nodes reachable from leader. So ExtensionEventService running on each extension node will be notified of any membership changes in extension nodes. ExtensionTransportService can keep long-lived connections with desired extension node and will maintain list of all active and faulty ones, just like TransportService in opensearch cluster.

Several plugins (Alerting/AD/IndexManagement) have periodical job built on top of JobScheduler plugin and JobScheduler plugin rely on job index event to schedule/de-schedule job. So I guess JobScheduler will be core plugin? Currently JobScheduler starts job on data nodes which has shard of job index which means job can only start on regular cluster data node rather than extension nodes. Do we have plan to migrate all such jobs to extension node?

Yes, there will be a different version of jobscheduler running on candidate/leader nodes of opensearch cluster and will be scheduling jobs on extension nodes. It will ensure jobs are distributed uniformly and will be backward compatible in case there are no extension nodes attached to the cluster.

This model can shift memory/CPU load from regular cluster node, but seems it can’t shift the load on searching/writing index, right?

You're right. Although, we can use them for post-processing of queries, like running aggregations. This is contrary to the fact that they will not maintain the full cluster state and thus index metadata/mapping, routing information for shards to nodes mapping will not be present, which would make it challenging to support such use-cases. This is something we need to brainstorm more, I'll be adding more details on it on our possible options which will not increase the data transfer magnitude in order to solve such use-cases and I believe this could be the future enhancement.

rishabhmaurya commented 2 years ago

Notebooks uses it's own index for storing documents, it's not just using some metadata. Hence, it will need full access(create, read, write) over the subscribed index. How is this handled by extension node?

They will have full access to the NodeClient, using which it can create/read/write index. So, there is no change in behavior, although implicitly, all requests would be proxied to one of the cluster node to handle it.

I understand the transport APIs are taken care by transport action handler in extension node. But, Is there any effect on REST APIs provided by dashboard-plugins?

No, REST APIs should work as it is, the only difference would be that they will not be hosted on extension nodes, but on regular cluster nodes, and corresponding transport action would be called on extension node.

Notebooks plugin uses the OpenSearch Dashboard Status API to check if the reporting plugin is available, Will this API endpoint be still accessible?

Do they call localhost address? or do they make use of public endpoint? How these requests are signed?

For a plugin like SQL/PPL which needs access to mostly all indices, how is the subscribed indices part handled?

What do you mean by "access" here? Is it for search purposes? If yes, its the same question as the first one, and its handled by design.

chloe-zh commented 2 years ago

Shout out for this proposal and design! But I have a couple of questions:

  1. Does the plugin need to be aware of which role is in use in different stages within the plugin business logic? For example, the SQL plugin usually fetches indices with NodeClient at the very first stage of the physical plan, and then works on the in-memory post-processing right after the index scan. Would that be the SQL plugin responsibility to switch role to extension role at this point?
  2. Would it bring any impact in the existing circuit breakers and cost estimators something in the plugins? Thanks!
adityaj1107 commented 2 years ago

I have some high-level questions about the design proposed for the architecture of extension nodes:

arurus-aws commented 2 years ago

Kudos for initiating this RFC. Few questions: 1) If we want to run Core plugins (example analysers) on extension nodes. Will it work ? If yes, by how much percentage Latency will be impacted? If not, where do you see the problem and what it takes to support it.

2) Primary reason we want to use extension nodes to deploy plugins is Security and sandboxing / isolation. Dependency of this 2 way communication can impact security/availability

3) Why can't these extension nodes be just stateless compute nodes which supports standard Rest API's? i.e. coordinating node acts as Client, and extension nodes act as server. Assuming Co-ordinating nodes knows which extension node (urls) it should reach. And authentication is performed using mTLS.

4) this design is tightly coupled with Cluster state? in a way, extension nodes are part of this cluster

5) Future enhancements section, using shared memory IPC for analyzer plugins may not work if OpenSearch is running in a container.

Thanks.