opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.03k stars 1.67k forks source link

[RFC] Data Access Object Interface for Metadata Store #13336

Open dbwiddis opened 2 months ago

dbwiddis commented 2 months ago

Is your feature request related to a problem? Please describe

OpenSearch Plugins require persistent storage of some configuration and metadata related to their operation, which are logically different than the end user data storage. Currently, plugins use ether cluster state or system indices to manage this information.

Unlike typical data designed for optimized search over a large scale, plugin metadata tends to be more narrowly defined and include more key-value storage or a limited number of documents primarily referenced by their document ID. Plugins should be able to define other persistent storage alternatives than the cluster state or system indices. Options include remote clusters, non-relational databases (MongoDB, NoSQL, DynamoDB, Apache Cassandra, HBase, and many others), Blob Storage (see Remote Cluster State RFC https://github.com/opensearch-project/OpenSearch/issues/9143).

However, the plugin code should ideally be the same, with multiple alternate implementations of those interfaces implementing the logical separation of code from data.

Defining a standard Data Access Object (DAO) interface pattern can provide a path for plugins to migrate existing code with a cluster-based implementation, while providing future flexibility for using those plugins with other storage locations.

Note: the scope of this proposal is for key-value or id-based lookups. It specifically excludes data that requires efficient searching.

Describe the solution you'd like

  1. Define a Data Access Object interface in OpenSearch (or other common location that plugins would take as a dependency) defining basic CRUD operations independent of data store or client implementations.
    • Optionally, add an abstract base implementation for the NodeClient implementation to simplify plugin implementations
  2. Plugins would migrate existing code using NodeClient to instead call an implementation of these interfaces. As new storage options are considered, a new implementation could be written without changing the code.

Here is an example interface proposal:

/**
 * The Data Access Object interface.
 * <p>
 * This interface should be called by plugin code for CRUD operations on indices/tables.
 * <p>
 * The appropriate implementation will be used depending on cluster/serverless configuration.
 *
 * @param <T> A Java class implementing ToXContent Object, with a 1:1 correspondence with an index/table mapping.
 */
public interface Dao<T extends ToXContentObject> {

    /**
     * Create an item (document) in an index (table).
     *
     * @param t The object representing the item (document) to create.
     *
     * @return The unique id string representing the item (document).
     */
    String createItem(T t);

    /**
     * Get an item (document) from an index (table).
     *
     * @param clazz The item (document) class to return.
     * @param id The item (document) id.
     *
     * @return A Java object representing the retrieved item (document).
     */
    Optional<T> getItem(Class<T> clazz, String id);

    /**
     * Delete an item (document) from an index (table).
     *
     * @param clazz The item (document) class to delete.
     * @param id The item (document) id.
     *
     * @return An Optional Boolean.
     * If {@link Boolean#TRUE}, the item was deleted.
     * If {@link Boolean#FALSE}, the item was not found.
     * The Optional is not present on error.
     */
    Optional<Boolean> deleteItem(Class<T> clazz, String id);

    /**
     * Update an item (document) in an index (table).
     *
     * @param clazz The item (document) class to update.
     * @param id The item (document) id.
     * @param updatedContent A key-value map containing the fields to update
     *
     * @return An Optional Boolean.
     * If {@link Boolean#TRUE}, the item was updated.
     * If {@link Boolean#FALSE}, the item was not found.
     * The Optional is not present on error.
     */
    Optional<Boolean> updateItem(Class<T> clazz, String id, Map<String, Object> updatedContent);
}

Existing NodeClient implementations in plugins could be migrated to this XContent-based implementation (partially implemented example for Flow Framework):

public class FlowFrameworkXContentDao<T extends ToXContentObject> implements Dao<T> {
    private static final Logger logger = LogManager.getLogger(FlowFrameworkXContentDao.class);

    private Client client;
    private final FlowFrameworkIndicesHandler indicesHandler;

    public FlowFrameworkXContentDao(Client client, FlowFrameworkIndicesHandler indicesHandler) {
        this.client = client;
        this.indicesHandler = indicesHandler;
    }

    @Override
    public String createItem(T t) {
        final PlainActionFuture<String> future = new PlainActionFuture<>();
        if (t instanceof Template) {
            indicesHandler.putTemplateToGlobalContext((Template) t, ActionListener.wrap(indexTemplateResponse -> {
                indicesHandler.putInitialStateToWorkflowState(
                    indexTemplateResponse.getId(),
                    getUserContext(client),
                    ActionListener.wrap(response -> {
                        future.onResponse(indexTemplateResponse.getId());
                    }, future::onFailure)
                );
            }, future::onFailure));
        } else if (t instanceof WorkflowState) {
            // throw appropriate exception, state not created directly
        } else {
            // need to create type for WorkflowConfig
        }
        return future.actionGet();
    }

    @SuppressWarnings("unchecked")
    @Override
    public Optional<T> getItem(Class<T> clazz, String id) {
        String index = getIndexNameForType(clazz);
        GetRequest getRequest = new GetRequest(index).id(id);
        try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
            logger.debug("Calling Get from index:{}, id:{}", index, id);
            GetResponse r = client.get(getRequest).actionGet();
            if (r != null && r.isExists()) {
                if (clazz == Template.class) {
                    return Optional.of((T) Template.parse(r.getSourceAsString()));
                } else if (clazz == WorkflowState.class) {
                    return Optional.of((T) WorkflowState.parse(r.getSourceAsString()));
                }
            }
        } catch (Exception e) {
            // handle
        }
        return Optional.empty();
    }

    @Override
    public Optional<Boolean> deleteItem(Class<T> clazz, String id) {
        String index = getIndexNameForType(clazz);
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
            logger.debug("Calling Delete on index:{}, id:{}", index, id);
            DeleteResponse r = client.delete(deleteRequest).actionGet();
            if (r != null) {
                return Optional.of(Result.DELETED.equals(r.getResult()));
            }
        } catch (Exception e) {
            // handle
        }
        return Optional.empty();
    }

    @Override
    public Optional<Boolean> updateItem(Class<T> clazz, String id, Map<String, Object> updatedContent) {
        String index = getIndexNameForType(clazz);
        UpdateRequest updateRequest = new UpdateRequest(index, id);
        try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
            updateRequest.doc(updatedContent);
            updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            updateRequest.retryOnConflict(5);
            logger.debug("Calling Update on index:{}, id:{}", index, id);
            UpdateResponse r = client.update(updateRequest).actionGet();
            if (r != null) {
                return Optional.of(Result.UPDATED.equals(r.getResult()));
            }
        } catch (Exception e) {
            // handle
        }
        return Optional.empty();
    }

    private String getIndexNameForType(Class<T> clazz) {
        if (clazz == Template.class) {
            return GLOBAL_CONTEXT_INDEX;
        } else if (clazz == WorkflowState.class) {
            return WORKFLOW_STATE_INDEX;
        } else {
            // need to create type for WorkflowConfig
        }
        // handle exception
        return null;
    }
}

A remote cluster using the OpenSearch Java Client could use this style interface (partial implementation shown):

public class FlowFrameworkJavaClientDao implements Dao<T> {
    private OpenSearchClient openSearchClient;

    public FlowFrameworkJavaClientDao(OpenSearchClient openSearchClient) {
        this.openSearchClient = openSearchClient;
    }

    @Override
    public String createItem(T t) {
        if (t instanceof Template) {
            IndexRequest<Template> indexRequest = new IndexRequest.Builder<Template>().index(GLOBAL_CONTEXT_INDEX)
                .document(t).build();
            final IndexResponse indexResponse =  AccessController.doPrivileged((PrivilegedAction<IndexResponse>) () -> {
                try {
                    return openSearchClient.index(indexRequest);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            return indexResponse.id();
        } else if (t instanceof WorkflowState) {
            // throw appropriate exception, state not created directly
        } else {
            // need to create type for WorkflowConfig
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public Optional<T> getItem(Class<T> clazz, String id) {
        String index = getIndexNameForType(clazz);
        GetResponse<T> getResponse = AccessController.doPrivileged((PrivilegedAction<GetResponse>) () -> {
            try {
                return openSearchClient.get(getRequest -> getRequest.index(index).id(id), clazz);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        return getResponse.source() == null ? Optional.empty() : Optional.of(getResponse.source());
    }

    @Override
    public Optional<Boolean> deleteItem(Class<T> clazz, String id) {
        String index = getIndexNameForType(clazz);
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        DeleteResponse deleteResponse = AccessController.doPrivileged((PrivilegedAction<DeleteResponse>) () -> {
            try {
                return openSearchClient.delete(deleteRequest);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        return deleteResponse == null ? Optional.empty() : Optional.of(Result.Deleted.equals(deleteResponse.result()));
    }

    @Override
    public Optional<Boolean> updateItem(Class<T> clazz, String id, Map<String, Object> updatedContent) {
        String index = getIndexNameForType(clazz);
        Optional<T> t = getItem(clazz, id);
        // test for object presence and update here
        UpdateRequest updateRequest = new UpdateRequest.Builder<T, T>().index(index).id(id).doc(t).build(),
        UpdateResponse updateResponse = AccessController.doPrivileged((PrivilegedAction<UpdateResponse>) () -> {
            try {
                return openSearchClient.update(updateRequest, clazz);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        return updateResponse == null ? Optional.empty() : Optional.of(Result.Updated.equals(updateResponse.result()));
    }
}

CRUD implementation for NoSQL data stores would use the appropriate request/response model for those data stores, which largely align with the interface.

Related component

Plugins

Describe alternatives you've considered

Let each plugin define its own interface

This will likely be an intermediate solution further explored while this RFC is open for comments. It may be a reasonable solution as some plugins might require more arguments than this limited proposal. However, this would cause some duplication of effort and result in more copy/paste implementations.

Define the interface and base implementations either in core/common or a new repository to be used as a dependency

Given the small number of classes, this seems like a lot of overhead compared to including in an existing common dependency.

Wrapper Client

I explored using a wrapper client implementing the NodeClient for CRUD operations in this Proof-of-Concept PR https://github.com/opensearch-project/flow-framework/pull/631 . This allowed a minimum of changes in the plugin code, easing migration. However, it also required translation from the NodeClient Request objects to the specific implementation Request Objects (easy) and from the implementation Response Objects to generate NodeClient Response objects (required creating non-relevant data related to shards).

The proposed interface minimizes this extra object creation.

Inline Code

Each code point interacting with the index could have multiple conditional implementations. This complexity mixes code and data and makes migration difficult.

Additional context

This proposal is intentionally narrowly scoped to key-value or id-based CRUD operations and excludes search operations.

The only code proposed to be added to OpenSearch itself is the interface definition and possibly an abstract base class with some common NodeClient implementations. It could reside in any package commonly loaded by plugins.

saratvemulapalli commented 2 months ago

@fddattal is also working on similar[1] lines to abstract out plugins interfacing with core for state.

xinlamzn commented 2 months ago

While plugins may not need full-text search capability, some search features are nice to have and could simplify existing plugin migration. Could we define the minimal search capability that could be easily implemented across different backend storages?

arjunkumargiri commented 2 months ago

Thanks for creating this RFC, few follow up questions:

dbwiddis commented 2 months ago

While plugins may not need full-text search capability, some search features are nice to have and could simplify existing plugin migration. Could we define the minimal search capability that could be easily implemented across different backend storages?

We would probably want to include a listing feature ("search all" equivalent).

If we had keyword-based fields we could probably include a limited number of them as filtering of the list.

  • What is the proposal if plugin needs support for search functionality? Does each plugin need to extend the base implementation to add support for custom search functionality?

This somewhat relates to @xinlamzn 's comment above. We need some sort of basic search; the problem is setting performance expectations. NoSQL DBs are optimized for key-value lookup, not for searching across all the data.

  • In addition to NodeClient based default implementation, will any other default implementation be supported?

Possibly! I'd think things that are included already in OpenSearch would be prime candidates. A Java Client implementation for a remote cluster would probably have wide applicability. I'm not sure if Remote Storage is the right candidate for it but it could be explored. Beyond those, I'd expect it'd be driven by community requests.

peternied commented 2 months ago

[Triage - attendees 1 2 3 4 5 6 7] @dbwiddis Thanks for creating this RFC, seems like a great improvement for encapsulation

dbwiddis commented 2 months ago

Looking at #13274, we've got the same goals and would benefit from the same implementation. Currently leaning toward the interface in this comment: https://github.com/opensearch-project/OpenSearch/issues/13274#issuecomment-2076253291

Not all plugins will need all interfaces implemented, so will need to consider default implementations or an abstract base class with no-op implementations.

dbwiddis commented 1 month ago

Implementation using Custom (which extends ToXContent) for a cluster-based put (index) request:

public class XContentClient implements Client {

    private final org.opensearch.client.Client client;

    public XContentClient(org.opensearch.client.Client client) {
        this.client = client;
    }

    @Override
    public CompletionStage<PutCustomResponse> putCustom(PutCustomRequest request) {
        CompletableFuture<PutCustomResponse> future = new CompletableFuture<>();
        try (XContentBuilder sourceBuilder = XContentFactory.jsonBuilder()) {
            client.index(
                new IndexRequest(request.index()).setRefreshPolicy(IMMEDIATE)
                    .source(request.custom().toXContent(sourceBuilder, ToXContent.EMPTY_PARAMS)),
                ActionListener.wrap(
                    r -> future.complete(
                        new PutCustomResponse.Builder().id(r.getId()).created(Result.CREATED.equals(r.getResult())).build()
                    ),
                    future::completeExceptionally
                )
            );
        } catch (IOException ioe) {
            // Parsing error
            future.completeExceptionally(ioe);
        }
        return future;
    }
}
andrross commented 1 month ago

@dbwiddis What is the basic idea for plugins to migrate from their existing solution to using this? Will data movement be required (i.e. read all data from cluster state or system index being used today and rewrite it into this interface), or can this be a new facade on top of the existing data?

Regarding existing plugins migrating to this and the questions around search, have you done an inventory of the current plugins that exist within the OpenSearch project and their usage of metadata storage? Basically, I would tend to agree with your statement "plugin metadata tends to be more narrowly defined and include more key-value storage or a limited number of documents primarily referenced by their document ID" but it would be great if we can back that assertion up with some real numbers.

dbwiddis commented 1 month ago

@dbwiddis What is the basic idea for plugins to migrate from their existing solution to using this?

Funny you should ask as I was working on this Draft PR, that I hope shows a possible migration path. Open to feedback!

https://github.com/opensearch-project/ml-commons/pull/2430

Will data movement be required (i.e. read all data from cluster state or system index being used today and rewrite it into this interface), or can this be a new facade on top of the existing data?

The above PR maintains the data in the existing system index, no movement required. However, my next trick will be to use the OpenSearch Java Client to demonstrate the ability to read/write that same data to a remote cluster, and my previous POC would allow that same data to be read/written to a NoSQL store. Options abound.

TLDR: for existing cluster implementation it's a facade.

Regarding existing plugins migrating to this and the questions around search, have you done an inventory of the current plugins that exist within the OpenSearch project and their usage of metadata storage?

Broadly, yes, but I have not done a detailed look, however; for now I'm focusing on flow framework and ML Commons.

Basically, I would tend to agree with your statement "plugin metadata tends to be more narrowly defined and include more key-value storage or a limited number of documents primarily referenced by their document ID" but it would be great if we can back that assertion up with some real numbers.

I'll definitely add that to my long to-do list!

arjunkumargiri commented 1 month ago

Implementation using Custom (which extends ToXContent) for a cluster-based put (index)

Custom is specific to cluster metadata store, can we create a new entity for plugin metadata? Maybe Data or Document

msfroh commented 1 month ago

Plugins should be able to define other persistent storage alternatives than the cluster state or system indices. Options include remote clusters, non-relational databases (MongoDB, NoSQL, DynamoDB, Apache Cassandra, HBase, and many others), Blob Storage (see Remote Cluster State RFC https://github.com/opensearch-project/OpenSearch/issues/9143).

Just to clarify, should the plugins manage the persistent storage alternatives? Or should plugins just talk with the DAO interface and not care what the persistent storage is? (I hope the latter.)

Or are you saying that the DAO implementations could be plugins? (I hope so.)

dbwiddis commented 1 month ago

Just to clarify, should the plugins manage the persistent storage alternatives? Or should plugins just talk with the DAO interface and not care what the persistent storage is? (I hope the latter.)

"The plugins" here is overly broad, as plugins will still have a choice in the storage alternative, however that will be more of a configuration-level "management".

The vision here is:

Or are you saying that the DAO implementations could be plugins? (I hope so.)

Maybe. Probably. I'm not sure yet. That's why this RFC. But this is conceptually similar to the Repository interface which eventually finds its way down to an azure-repository plugin.

At this point, the DAO implementations are a single class, so creating a whole plugin around them feels like overkill. They probably do at least need to be in separate maven artifacts.

andrross commented 1 month ago

@msfroh @dbwiddis I think the telemetry effort might be a good parallel here: There is a TelemetryPlugin interface that allows injecting an implementation for transmitting/storing telemetry data to whatever other system you want. The server defines interfaces for emitting metrics (e.g. Tracer, MetricsRegistry) and wires up the implementation provided via TelemetryPlugin. Core pieces of the server now emit metrics through those interfaces. And finally, a TelemetryAwarePlugin interface was introduced to expose Tracer and MetricsRegistry to plugins so that they can emit metrics themselves.

So following that pattern, a new plugin would allow injecting an implementation for storing metadata into the core. The default could be cluster state and/or system indexes, but a plugin could just as easily provide an implementation for an external store. The core would define some new interface for reading/writing this metadata (I think this would look something like your Dao class) and wire up the appropriate implementation. Core features (such as search pipelines) would use this interface for reading and writing its metadata. And finally this new interface would be exposed to plugins for their own metadata storage needs.

One final point, @dbwiddis is coming at this from the perspective of plugin extensibility, but I believe this work also aligns with breaking apart the monolithic cluster state which is an impediment to scaling large clusters.

Bukhtawar commented 1 month ago

There are more use case like Remote store using a metadata store and we really need to think about the store capabilities for instance optimistic concurrency control/conditional updates/transactions which is something missing in the Repository interface. We should list down the common access patterns we envision with varied uses cases and see how that works with pluggable stores across cloud providers.

msfroh commented 1 month ago

@andrross -- Yes! That's exactly the approach that I was thinking of -- pluggable metadata storage, where the producers/consumers don't care how it works or where the metadata is stored, just that it honors some interface.

@Bukhtawar -- I don't think @dbwiddis is suggesting that the Repository interface specifically would be good for metadata (since it's probably not, being eventually consistent). It's just an example of a general "I want to store file-like stuff somewhere" interface. In this case, the hypothetical new MetadataStorage interface would probably need to promise some guarantees around transactionality and concurrency control, and it would be up to implementations to honor those guarantees in order to be valid. (So, e.g. S3 on its own probably isn't what you want -- DynamoDB or DynamoDB + S3 would be better implementations.)

andrross commented 1 month ago

@msfroh Thanks! I believe @Bukhtawar is saying that the current Remote Store implementation uses the Repository interface for storing metadata, and that it is a bad fit for that use case. So Remote Store is yet another feature that would be interested in using the new metadata store feature. But just like there are questions around what search features should the metadata store support, we'd need to figure out what transactional and concurrency control features it should support as well.

lukas-vlcek commented 1 month ago

Very interesting idea indeed.

But if there will be any (meta)data stored in external systems then I think we also need to think about identity management that needs to be part of the communication with external stores. Should this topic be part of this proposal as well?

The simplest example would be just a basic read/write permissions configuration for an external store (assuming the external system requires user authn/authz). Will that be part of OpenSearch configuration? Or specific configuration of particular metadata store implementation? Or will this be left on external systems to handle? (Ie. for example users will need to setup a proxy server in front of the external store to handle this)

andrross commented 1 month ago

@lukas-vlcek I think identity management would work similarly to how the repository plugins work in OpenSearch. Each implementation for any given remote metadata storage system would be responsible for defining how its specific credentials are provided, and the operator would be responsible for giving credentials with the necessary permissions for OpenSearch to interact with this system.