opensearch-project / search-processor

Search Request Processor: pipeline for transformation of queries and results inline with a search request.
Apache License 2.0
23 stars 24 forks source link

[RFC] Search pipelines #80

Closed msfroh closed 1 year ago

msfroh commented 1 year ago

Search pipelines

This RFC is intended to replace https://github.com/opensearch-project/search-processor/issues/12.

Overview

We are proposing a set of new APIs to manage composable processors to transform search requests and search responses in OpenSearch. Expected transformers include (but are not limited to):

The new APIs will aim to mirror the ingest APIs, which are responsible for transforming documents before they are indexed, to ensure that all documents going into the index are processed in a consistent way. The ingest API makes use of pipelines of processors. We will do the same, but for search.

Argument over alternatives

Everyone should just implement logic in their calling application

The most obvious counterargument to this proposal is “this logic belongs in the search application that calls OpenSearch”. That is a valid approach and this proposal does not prevent any developer from transforming search requests and responses in their application.

We believe that providing an API within OpenSearch will make it easier for developers to build and share components that perform common transformations, reducing duplicated effort in the calling search applications.

Put this logic in a library that people can use from their calling applications

In theory, we could provide a common “toolbox” of request and response processors as a library that application developers could use. That would mean building libraries for a specific languages/runtimes. By including search processors in OpenSearch itself, any calling application (regardless of implementation) can benefit. In particular, it is possible to modify query processing behavior without modifying the application (by specifying a default search pipeline for the target index(es)).

Write search plugins

Search plugins can significantly impact how search requests are processed, both on the coordinator node and on individual shards. Each processor we can think of could be implemented as a search plugin that runs on the coordinator node. The challenges with that approach are a) writing a whole search plugin complete with parameter parsing is pretty complicated, b) the order in which search plugins run is not immediately obvious to a user, and c) without some overarching framework providing guidelines, every search plugin may have its own style of taking parameters (especially with regards to default behavior).

Similarities with ingest pipelines

A built-in orchestrator can call out to processors defined in plugins

Ingest pipelines have a core orchestrator responsible for calling out to each ingest processor in the pipeline, but the processors themselves may be defined in separate ingest plugins. These plugins can implement specific transformations without needing to consider the broader pipeline execution. Similarly, search pipelines will run from the OpenSearch core, but may call out to named search processors registered via plugins.

Processed on entry (or exit)

Just as ingest pipelines operate before documents get routed to shards, the search pipelines operate “on top of” the index when processing a search request. That is, a SearchRequest gets transformed on the coordinator node before being sent to individual shards, and the SearchResponse gets transformed on the coordinator node after being aggregated from the shard responses.

Processing that happens on each shard is out of scope for this proposal. The SearchPlugin API remains the appropriate extension point for per-shard processing.

Pipelines are named entities stored in the cluster

To use an ingest pipeline, you will generally PUT to create or update the pipeline definition using a REST API. The body of that request defines the pipeline with a description and a list of ingest processors. We will provide a similar API to define named search pipelines built from search processors.

Can be referenced per-request or per-index

When using the index document API or the bulk API, you can include a request parameter like ?pipeline=my-pipeline to indicate that the given request should be processed by a specific pipeline. Similarly, we will add a pipeline parameter to the search API and the multi-search API.

Generally, we want to apply the same pipeline to every document being added to an index. To simplify that, the index API has a setting index.default_pipeline, that designates a pipeline to use if none is specified in an index document or bulk request. Similarly, we will add a setting, index.default_search_pipeline, to apply a pipeline by default for all search or multi-search requests against the given index.

Differences from ingest pipelines

Processing different things in different places

While an ingest processor only ever operates on a document, potentially modifying it, a search processor may operate on a search request, a search response, or both. We also assume that processing a search response requires information from the search request.

To support these different cases, we will provide different interfaces for search request processors, search response processors, and request + response (“bracket”) processors. The search pipeline definition will have separate sections for request and response processors. (Bracket processors must be specified in the request processor list, but may be referenced by ID in the response processor list to explicitly order them relative to response processors.)

The name “bracket processor” is chosen to indicate that they process things on the way in and on the way out, and must be balanced like brackets or parentheses. That is, given two bracket processors B1 and B2, we require that if B1 processes a search request before B2, then B1 processes the search response after B2.

Pipelines can be specified inline “for real”

The ingest API includes “_simulate” endpoint that you can use to preview the behavior of a named pipeline or a pipeline definition included in the request body (before creating a named pipeline). This makes sense, since we wouldn’t want to pollute the index with documents processed with a half-baked, untested pipeline.

Since search requests are read-only, we don’t need a separate API to test an ad hoc search pipeline definition. Instead, we will allow anonymous search pipelines to be defined inline as part of any search or multi-search request. In practice, we don’t expect this approach to be common in production scenarios, but it’s useful for ad hoc testing when creating / modifying a search pipeline.

API definition

Java search processor interfaces

package org.opensearch.search.pipeline;

// Copied from [org.opensearch.ingest.Processor](https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/ingest/Processor.java)

interface Processor {
  /**
   * Gets the type of a processor
   */
  String getType();

  /**
   * Gets the tag of a processor.
   */
  String getTag();

  /**
   * Gets the description of a processor.
   */
  String getDescription();
}

/**
 * Processor that (potentially) modifies SearchRequests.
 */
interface RequestProcessor extends Processor {
  SearchRequest execute(SearchRequest request);
}

/**
 * Processor that (potentially) modifies SearchResponses. Behavior may be
 * influenced by parameters from the SearchRequest.
 */
interface ResponseProcessor extends Processor {
  SearchResponse execute(SearchRequest request, SearchResponse response);
}

/**
 * Processor that may modify the request, response, both, or neither.
 */
interface BracketProcessor extends RequestProcessor, ResponseProcessor {

  /**
   * May be specified in the request pipeline and referenced in the response
   * pipeline to determine the order of response processing. 
   */
  String getId();
}

REST APIs

Search pipeline CRUD

// Create/update a search pipeline.
PUT /_search_processing/pipeline/my_pipeline
{
  "description": "A pipeline to apply custom synonyms, result post-filtering, an ML ranking model",
  "request_processors" : [
    {
      "external_synonyms" : {
        "service_url" : "https://my-synonym-service/"
      }
    },
    {
      "ml_ranker_bracket" : {
        "result_oversampling" : 2, // Request 2 * size results.
        "model_id" : "doc-features-20230109",
        "id" : "ml_ranker_identifier"
      }
    }
  ],
  "response_processors" : [
    {
      "result_blocker" : {
        "service_url" : "https://result-blocklist-service/"
      },
      "ml_ranker_bracket" : {
        // Placed here to indicate that it should run after result_blocker.
        // If not part of response_processors, it will run before result_blocker.
        "id" : "ml_ranker_identifier" 
      }
    }
  ]
}

// Return identifiers for all search pipelines.
GET /_search_processing/pipeline

// Return a single search pipeline definition.
GET /_search_processing/pipeline/my_pipeline

// Delete a search pipeline.
DELETE /_search_processing/pipeline/my_pipeline

Search API changes

// Apply a search pipeline to a search request.
POST /my-index/_search?pipeline=my_pipeline
{
  "query" : {
    "match" : {
      "text_field" : "some search text"
    }
  }
}

// Specify an ad hoc search pipeline as part of a search request.
POST /my-index/_search

{
  "query" : {
    "match" : {
      "text_field" : "some search text"
    }
  },
  "pipeline" : {
    "request_processors" : [
      {
        "external_synonyms" : {
          "service_url" : "https://my-synonym-service/"
        }
      },
      {
        "ml_ranker_bracket" : {
          "result_oversampling" : 2, // Request 2 * size results
          "model_id" : "doc-features-20230109",
          "id" : "ml_ranker_identifier"
        }
      }
    ],
    "response_processors" : [
      {
        "result_blocker" : {
          "service_url" : "https://result-blocklist-service/"
        },
        "ml_ranker_bracket" : {
          // Placed here to indicate that it should run after result_blocker.
          // If not part of response_processors, it will run before result_blocker.
          "id" : "ml_ranker_identifier" 
        }
      }
    ]
  }
}

Index settings

// Set default search pipeline for an existing index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "my_pipeline"
  }
}

// Remove default search pipeline for an index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "_none"
  }
}

// Create a new index with a default search pipeline.
PUT my-index
{
  "mappings" : {
    // ...index mappings...
  },
  "settings" : {
    "index" : {
      "default_search_pipeline" : "my_pipeline",
      // ... other settings ...
    }
  }
}

Proposed integrations

Kendra ranking

Our first implementation (already in the search-processor repository) provides connectivity to the Amazon Kendra Intelligent Ranking service. This will need to be reworked to match the BracketProcessor interface, because it modifies the SearchRequest as well as the SearchResponse. The processor modifies the SearchRequest to a) request the top 25 search hits (if start is less than 25), and b) request document source (to ensure that the body and title fields for reranking are available). The top 25 results in the SearchResponse are preprocessed (to extract text passages) and sent to the Amazon Kendra Intelligent Ranking service, which returns a (potentially) reordered list of document IDs, which is used to rerank the top 25 results. The originally-requested range of results (by start and size) is returned.

Metarank

To provide search results that learn from user interaction, we could implement a ResponseProcessor that calls out with Metarank.

Note that we would need to make sure that the SearchRequest API has the ability (via the ext property?) to carry additional metadata about the request, like user and session identifiers.

Querqy

Search pipelines could be a convenient interface to integrate with Querqy.

Individual Querqy rewriters could be wrapped in adapters that implement the RequestProcessor interface and added to a search pipeline.

Script processor

Ingest pipelines support processing documents with scripts. We could provide a similar capability to allow users to modify their search request or response with a Painless or Mustache script.

Block expensive query types

About 10 years ago, I worked on a search hosting service (based on Apache Solr) where we added a SearchComponent to our SearchHandler that would reject potentially expensive queries (e.g. leading wildcards, regex) by default. We would lift the restrictions by request and only after discussing the risks (and usually we could explain why another option would be better). A similar RequestProcessor that’s installed as part of a default search pipeline for an index could save an OpenSearch admin from impact from users accidentally sending expensive queries.

Proposed roadmap

Initial release (“soon”)

Based on feedback to this RFC, we intend to refactor the search-processor plugin to be similar to the APIs described above (with the assumption that there will be some changes required when imagination collides with reality). We should (hopefully?) be able to do this in time for the 2.6 release.

At this point, the REST APIs would still be considered “experimental” and we may break backwards compatibility (though we would like to avoid that if possible). The Java APIs may still be subject to change.

We would include additional processors in this repository.

Move to core

After getting some feedback from users of the plugin, we will move the pipeline execution logic into OpenSearch core, with individual processor implementations either in a “common” module (similar to ingest-common) or in separate plugins. Ideally, the OpenSearch SDK for Java will make it possible to implement search processors as extensions.

Search configurations

We’re thinking about using search pipelines as an initial model of “search configurations”, where the pipeline definition captures enough information about how a search request is processed from end-to-end to provide a reproducible configuration.

We can make it easier for application builders to run experiments, both offline and online, by running queries through one pipeline or another. For A/B testing, you could define a default search pipeline that randomly selects a search pipelines to process a request and then link user behavior to the pipeline used.

More complicated search processing

Just as ingest pipelines support conditional execution of processors and nested pipelines, we could add similar capabilities to search processors to effectively turn the pipeline into a directed acyclic graph. If that becomes the norm, we would likely want a visualization tool to view and edit a search pipeline (since nested JSON would be hard for a human to understand).

In the “middle” of the graph, there’s a component to call into OpenSearch to turn a SearchRequest into a SearchResponse. What if we want to use something other than OpenSearch, though? For example, we could precompute result sets for known one-word queries offline and do a lookup to return those results online.

Task list

navneet1v commented 1 year ago

Hi @msfroh , The idea of Search pipeline is awesome. After reading the proposal I see that we are proposing 2 types of Transformers one which will run before and one after. I was thinking of one more, which can run after the QueryPhase and before the Fetch phase. I want to know your thinking on the same. Did we think about it?

We have a use case where we want to normalize the Scores returned from the Query phase at Coordinator Level, before we want to start the Fetch phase. If this can be done it will be very useful. I want to understand if we can include it in this proposal and also your thoughts on this. Happy to contribute on this. :)

navneet1v commented 1 year ago

Hi @msfroh The proposed integrations that are defined there, we have another use case(https://github.com/opensearch-project/neural-search/issues/70) which we were planning to resolve via #12 , which can again be implemented using the above proposal(https://github.com/opensearch-project/search-processor/issues/12#issuecomment-1363226784). Can you please add what is the timeline for the interfaces addition in OpenSearch core?

msfroh commented 1 year ago

After reading the proposal I see that we are proposing 2 types of Transformers one which will run before and one after. I was thinking of one more, which can run after the QueryPhase and before the Fetch phase. I want to know your thinking on the same. Did we think about it?

Thanks, @navneet1v! I hadn't considered adding processors that run between the query and fetch phase, but I think it makes a lot of sense. I'm going to look through the existing search code to better understand what's involved and what such a processor would need to look like.

msfroh commented 1 year ago

Incidentally, one area where I'm doubting what I wrote above is in the "Initial release" vs "Move to core" steps.

Given that this facility arguably belongs in OpenSearch core, maybe I should just develop it there in the first place, rather than building it in a plugin just to move it over later? That's likely to just be wasted effort.

msfroh commented 1 year ago

Can you please add what is the timeline for the interfaces addition in OpenSearch core?

I'm thinking of trying to aim for the 2.7 release, which I think is in the first half of 2023. Of course, that's just my best guess -- we'll see how hard it is once we start implementing.

macohen commented 1 year ago

Can you please add what is the timeline for the interfaces addition in OpenSearch core?

I'm thinking of trying to aim for the 2.7 release, which I think is in the first half of 2023. Of course, that's just my best guess -- we'll see how hard it is once we start implementing.

Would we look at a 2.6 release for the initial release and 2.7 for features after that?

wbeckler commented 1 year ago

When you say "plugins" are you thinking "extensions" a la the new Extensions framework?

msfroh commented 1 year ago

@wbeckler -- if you're referring to plugins as mentioned in the section "A built-in orchestrator can call out to processors defined in plugins", I absolutely think the end goal would involve processors implemented in extensions. The motivators for extensions (especially API stability) would be very relevant for processor developers and users.

Also, being "high-level" transformers that run once per search request (versus, say, something that gets run on every matching document), I think IPC-based extensions would be fine.

Depending on timelines, if extensions aren't ready for primetime by the time we've built this, we always have the fallback of using something like IngestPlugin.

macohen commented 1 year ago

Would naming be more obvious if, instead of BracketProcessor, it was SearchRequestResponseProcessor? Just wondering if there's a way to make the naming more self-documenting.

msfroh commented 1 year ago

Oh, I wanted to clarify how bracket processors can be referenced in response chains and how things work if they're not referenced.

Suppose request processors are all named with I (for input) , response processors are named with O (for output), and bracket processors are named with B, then the following:

req=[I1, B1, I2, I3, B2]
resp=[O1, O2]

is implicitly equivalent to:

req=[I1, B1, I2, I3, B2]
resp=[B2, B1, O1, O2]

Basically, the system will eagerly "close the brackets" (in LIFO order) before handing off to the response processors.

If you want to move the output processors before the bracket processors you could explicitly say something like:

resp=[O1, B2, O2, B1]

(You could also put both O1 and O2 between B2 and B1, or put them both before B2 if you want.)

Note that it would be an error to put B1 before B2 in the response chain, since your brackets would be mismatched — it would be like saying [(]). That's where the name BracketProcessor comes from.

I'm open to other names that convey the "balancing" part, like how brackets or parentheses work.

reta commented 1 year ago

@msfroh This is really great proposal (which I somehow missed). How the search pipelines would deal with security plugin, more specifically document level security [1]? Any concerns that security checks could be bypassed by modifying request and/or responses? Thank you.

[1] https://opensearch.org/docs/1.2/security-plugin/access-control/document-level-security/

lukas-vlcek commented 1 year ago

@msfroh Nice proposal! The following are my comments on your proposal (I did not check the code details yet).

Why not using a null instead of "_none" in the below API proposal?

// Remove default search pipeline for an index.
PUT /my-index/_settings
{
  "index" : {
    "default_search_pipeline" : "_none"
  }
}

I think the null is currently used in "similar" situation when doing updates to /_settings or similar APIs (I would need to search the documentation for examples, do not remember it from the top of my head). But why to introduce a new reserved word when we can use null?

Error handling

Generally, I would like to ask about how you plan to handle errors and exceptions? It seems that you brought up several use cases where downloading an external file/data will be part of the request/response handling. This can lead to many unpredictable situations. A good troubleshooting will be necessary.

Monitoring

On a similar note, do you plan to introduce new metrics for pipelines? (If I am not mistaken Ingest pipelines provide at least processor level metrics). This type of information will be critical for any admin (things like "what if the external re-ranking model provider that my pipeline depends on suffers from a service delays?", it will be very important to monitor time spent in processors, number of successful and failed executions, ... etc).

msfroh commented 1 year ago

@lukas-vlcek Thanks a lot for the suggestions! I'll reply to each below.

For the error handling especially, I'd really like feedback to make sure we do it properly.

Why not using a null instead of "_none" in the below API proposal?

I "borrowed" that idea from ingest pipelines (which already introduces the reserved word _none), without giving it too much thought.

Thinking about it now, I think _none is less useful in the index settings, but more useful as a query parameter. That is, if an index has a default_search_pipeline specified and you want to bypass it (e.g., because you have an admin process that doesn't want any interference), you could specify ?search_pipeline=_none in the search request (where a null URL param is less distinguishable from a missing URL param).

I haven't implemented the index defaults yet. I suspect that setting the value to null should work as well. But if we're going to accept _none for the URL parameter (to bypass an existing default), then I think we can also accept it in the index settings.

Note: I need to update the proposal above to rename the query parameter from pipeline to search_pipeline. Once I started writing the code, I learn from the integration tests that the update_by_query API uses the same request for searching and indexing, so any URL parameters get passed to both the search request and indexing request. Since indexing already claimed pipeline as a parameter, I need to pick something else.

Error handling

Right! This is arguably a more complicated case than for ingest pipelines, since you're talking about availability of your searches (versus missing documents on ingest failure, where you may be serving stale data, but you're still available).

I've been leaning toward the idea that individual processors will have (potentially configurable) behavior when they encounter a failure. For an external reranker, for example, I think I would generally prefer to "fail open", and return the unprocessed results ranked by BM25 score. Unfortunately, we don't have a good way to communicate to the search requester that reranking failed. (For an existing example of this, here is the error handling for a call out to AWS Kendra Intelligent Ranking service. It writes to the server's application log, but the search client doesn't know anything failed.)

On the other hand, there are also cases where we might prefer to "fail closed". Imagine you're running a post-processor that redacts sensitive information from search results. It would be terrible to say "Oh, the redaction service failed, so here are all of the results with a bunch of sensitive info."

For the "fail open" case, I think one approach will be to add a new field to SearchResponse to return "warnings". A warning could communicate that the search response is (partially) unprocessed and explain why -- the network call to the other service failed because of timeout, invalid hostname, etc.; the other service rejected the call because of bad credentials, exceeded quota, request too big, etc. I've created an issue for that here.

I think we can also borrow the on_failure parameter from ingest pipelines, where you can define your own failure handling logic. Continuing the example of a hypothetical redaction service, maybe the fallback logic could exclude fields outside of a small "safe" list.

Incidentally, I think the "fail open" decision should be an option in the pipeline configuration itself, just as every ingest processor has a global ignore_failure parameter (which wraps the ingest processor in another CompoundProcessor that swallows the failure). I would prefer that we add the failure message to the proposed "warnings" property in the search response, though.

Monitoring

Yes, we do need to include metrics. @macohen created https://github.com/opensearch-project/OpenSearch/issues/6723 to call out that we should do at least as well as ingest pipelines.

navneet1v commented 1 year ago

Added a RFC to extend Search Pipeline to add a new Type of Processor Search PhaseProcessor.

RFC: https://github.com/opensearch-project/neural-search/issues/152

macohen commented 1 year ago

Closed now that the base Search Pipeline code has been merged in https://github.com/opensearch-project/OpenSearch/pull/6587