opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.43k stars 1.73k forks source link

[RFC] Real-time Insights into Top N Queries by Latency and Resource Usage #11186

Open ansjcy opened 9 months ago

ansjcy commented 9 months ago

Problem Statement

Currently, OpenSearch lacks a direct means of providing insights into “top queries” that have a significant impact on latency and resource usage (including CPU consumption, and memory usage etc). At present, users can utilize OpenSearch's slow log to obtain certain degree of insights into the execution time of slow queries. However, the slow log offers limited context for the queries, lacking ways to track back to the original requests and the corresponding full query body, and doesn't encompass any resource usage dimensions for the queries. Additionally, the current slow query log doesn't provide insights into resource usage and latency for individual phases of a query execution, preventing the answering of critical questions like, "Which phase of the slow search query consumes the most time?" Moreover, slow query logs lack an aggregated view of the "Top N slow queries by a specific resource usage or latency" over a specified time frame. These limitations make it challenging for OpenSearch users to identify, diagnose, and correlate slow queries, and to further optimize those queries to prevent negative impacts on overall system health.

Objective

The objective of this RFC is to propose and define the architecture, components, and key features of the “Top N queries” framework, aiming to provide real-time insights into the most impactful queries based on resource usage and latency. By aggregating crucial metrics such as query latencies, and with planned expansions to include CPU consumption and memory usage in future iterations, this feature will empower users to identify and optimize resource-intensive queries, thereby enhancing the debugging and tracing experience.

The framework's scope encompasses the following key aspects:

With the above framework components, a typical workflow for the top N query feature would be:

  1. Per-request metrics (latency and resource usages) are collected and sent to the in-memory data store on the coordinator node, The data store aggregates these metrics, maintaining a record of "queries with top resource usages." Granularity levels, such as per minute, per hour, and per day, can be configured based on user preferences.
  2. The "queries with top resource usages" data is publish to different sinks at a schedule based on user’s configuration.
  3. Customer can query the "queries with top resource usages" data via an API for further analysis.

Detailed Solution

Data model to store query information

The data model designed for storing query information should contain essential data for those query requests, such as search query type, total shards, and indices involved in the search request. Moreover, the data model should be structured to accommodate customized fields, facilitating adding other customized properties in the future to enhance extensibility. Additionally, the data model for the query data should feature a comprehensive breakdown of latency and resource usage across individual phases of the query execution. The data models to store query information are outlined below.

// based class to store request information for a request
abstract class RequestData {
    private SearchType searchType;

    private int totalShards;

    private String[] indices;

    // property map to store future customized attributes for the
    // requests. This is for extensibility consideration.
    // for example, we can add user-account which initialized the request in the future
    private Map<String, String> propertyMap;

    // Getters and Setters.
}

// child class to store latency information for a request
class RequestDataLatency extends RequestData {
    Map<String, Long> phaseLatencyMap;

    Long totalLatency;

    // Getters and Setters.
}

// child class to store cpu resource usage information for a request
class RequestDataCPU extends RequestData {
    Map<String, float> phaseResourceUsageMap;

    float totalResourceUsage;

    // Getters and Setters.
}

In-memory data store on Coordinator node

The proposed data store, SearchQueryAnalyzer, is a generic, extendible, priority queue based Object for tracking the "Top N queries" based on specific resource usage within a defined time window. As previously mentioned, this data store should provide support for exporting data to various sinks and also offers a dedicated method for querying data via an API.

abstract class SearchQueryAnalyzer {
    // Number of top queries to keep
    private int topNSize;

    // window size in seconds
    private int windowSize;

    // getter and setters
}

public class SearchQueryLatencyAnalyzer extends SearchQueryAnalyzer {
    PriorityQueue<RequestDataLatency> topRequests;

    // Async function to ingest latency data for a request
    public void ingestDataForQuery(SearchRequest request, Map<String, String> customizedAttributes, Long latency){};

    public List<RequestDataLatency> getTopLatencyQueries(){};

    public void export(Sink sinkName){};
}
public class SearchQueryCPUAnalyzer extends SearchQueryAnalyzer {
    PriorityQueue<RequestDataCPU> topRequests;

    // Async function to ingest cpu usage data for a request
    public void ingestDataForQuery(SearchRequest request, Map<String, String> customizedAttributes, float cpuUsage){};

    public List<RequestDataCPU> getTopNCpuUsageQueries(){};

    public void export(Sink sinkName){};
}

It's crucial to note that the topN and windowSize value for each analyzer store should be customizable through a configuration file. To enhance flexibility, an API will also be provided, allowing users to dynamically adjust these configuration values without restarting the OpenSearch process. Further details regarding the configuration API will be elaborated in the "APIs for Configurations" section below.

Metrics Collection

Different metrics collection workflow will be implemented tailored to different type of metrics. The initial focus will be on the metrics collection workflow for latency metrics (considering latency as the primary resource usage dimension in the first iteration), However, future iterations can incorporate additional dimensions, such as CPU usage, based on metrics available from the resource tracking framework

To capture per-request phase latency information, a listener-based approach will be employed. Based on the newly introduced framework to track search query latency, a new component, SearchRequestResourceListener, will be defined. This relevant listener functions will be executed during each phase of search queries, recording latency and storing all relevant information into the SearchQueryLatencyAnalyzer. The high-level class design is presented below.

public class SearchRequestResourceListener implements SearchRequestOperationsListener {
    private long startTime;

    // Query phase
    void onQueryPhaseStart(SearchPhaseContext context) {
        // record the timestamp when query phase starts
    }
    void onQueryPhaseEnd(SearchPhaseContext phase, long tookTime) {
        // calculate and store the latency for this phase
    }
    void onQueryPhaseFailure(SearchPhaseContext context) {}

    void onRequestStart(SearchPhaseContext context) {
       // Set the start time 
    }
    void onRequestEnd(SearchPhaseContext context) {
       // Report latency
       SearchQueryLatencyAnalyzer.ingest();
    }
}

Data export - pull model

To facilitate user access to the "top queries with resource usage" information, we will implement dedicated API endpoints in OpenSearch. The structure of these API endpoints is outlined below:

Top Queries - general API

Endpoint:

GET /insights/top_queries

Parameters:

Example Usage:

GET /insights/top_queries?type=latency

Response:

Upon querying the GET /insights/top_queries?type=latency API, the response will be a structured set of data providing insights into the top queries based on latency within the specified parameters:

{
  "timestamp": "2023-11-09T12:30:45Z",
  "top_queries": [
    {
      "query_id": "12345",
      "latency": 600,
      "timestamp": "2023-11-09T12:29:15Z",
      "search_type": "QUERY_THEN_FETCH",
      "indices": [
        "index1",
        "index2"
      ],
      "shards": 5,
      "phases_details": {
        "query": 100,
        "fetch": 200,
        "expand": 300,
      },
      // Additional customized attributes details specific to the query
      "attributes": {
        "user_id": "value"
      }
    },
    // Additional top queries results based on latency
  ]
}

Top Queries by CPU resource usage (future iterations):

Endpoint example:

GET /insights/top_queries?type=cpu

Parameters:

Response:

The response will contain details about the top queries based on CPU usage within the specified time window and limit.

{
  "timestamp": "2023-11-09T12:30:45Z",
  "top_queries": [
    {
      "query_id": "12345",
      "cpu_usage": 0.6,
      "timestamp": "2023-11-09T12:29:15Z",
      "search_type": "QUERY_THEN_FETCH",
      "indices": [
        "index1",
        "index2"
      ],
      "shards": 5,
      "phases_details": {
        "query": 0.1,
        "fetch": 0.2,
        "expand": 0.3,
      },
      // Additional customized attributes details specific to the query
      "attributes": {
        "user_id": "value"
      }
    },
    // Additional top queries results based on latency
  ]
}

The below sequence diagram illustrates the workflows and detailed interactions among various components within the OpenSearch backend when the “Top N queries” feature is enabled. It captures the process of calculating and storing latency when OpenSearch processes a search query, and also how the results will be returned when queried.

Data export - push model

The Java Scheduler can be used to automate the export of Top N queries at scheduled intervals, pushing the data to various sinks. In the initial iteration, we will establish support for a fundamental sink, enabling the writing of top N queries to a log file on disk.

Subsequent iterations will introduce more advanced sinks, enhancing the export capabilities. Examples of advanced sinks include OPTL and SQL/time series databases. This iterative approach allows this feature to evolve and accommodate a broader spectrum of user needs, expanding the export functionality to encompass diverse and advanced storage and analysis solutions in the future.

APIs for Configurations

As previously mentioned, The topN and windowSize value and the data export interval for each SearchQueryAnalyzer store should be highly configurable. This can be done through a configuration file that is read when the OpenSearch process starts. Additionally, an API will be provided to dynamically configure these values without requiring a restart of the OpenSearch process. The proposed API endpoint to configure the windowSize, topN value and export_interval is described as follows:

PUT /_cluster/settings{
    "persistent":{
        "search.top_n_queries.latency.enabled" : "true",
        "search.top_n_queries.latency.window_size" : 60,
        "search.top_n_queries.latency.top_n_size" : 5,
       "search.top_n_queries.latency.export_interval" : 30,
    }
}
ansjcy commented 9 months ago

Any feedback or suggestions would be appreciated! cc @getsaurabh02 @rishabhmaurya @msfroh @deshsidd

msfroh commented 9 months ago

Does this need to hook into the whole query execution path?

Once you have a SearchResponse, you have the took time. At that point, if you have the SearchRequest and the SearchResponse, couldn't you just make a call to some sink to persist the query?

Instead of weaving the logic into the search flow, couldn't this all be implemented in a SearchResponseProcessor?

I just created a (somewhat) related item on query logging using search pipelines: https://github.com/opensearch-project/OpenSearch/issues/11188

khushbr commented 9 months ago

@ansjcy Thank you for putting together the RFC.

I have a question, Is the "top N queries" meant as a reactive Monitoring tool or employed into the real-time query resiliency feature like query cancellation and admission control ? Also, did we explore options to off-load the insight aggregation and metric storage to an independent monitoring and insight layer since the above listed approach will burden the Coordinator node further.

ansjcy commented 9 months ago

Instead of weaving the logic into the search flow, couldn't this all be implemented in a SearchResponseProcessor?

Thanks @msfroh for the comments. That is a good point! With search pipeline we don't need to write our own liseners, instead we can simply get the per-phase latency from the phase_took map from the request level latency tracking response , define the "write to priority queues and sinks" logic as processors - no extra configuration APIs are needed since configurations are already well supported in the search pipelines.

The only issue is currently the search pipelines doesn't have a good support for "cluster level" processors. In injest workflow, there's a final pipeline that always runs after the request or the default pipeline. We can build something similar in search pipelines as well.

Also, with the cluster level search pipeline processors supported, the coordinator slow log can be refactored to such processors so that the current logic would be more modularized and configurable.

thoughts? @getsaurabh02 @deshsidd

ansjcy commented 9 months ago

Thanks for your comments! @khushbr

Is the "top N queries" meant as a reactive Monitoring tool or employed into the real-time query resiliency feature like query cancellation and admission control ?

Currently, the primary objective for the first iteration is to "provide a reacive monitoring tool". But yeah given the real-time nature of the data, any other features can leverage it and make decisions like query cancellation based on it. Additionally, other downstream components can utilize the data by writing their customized export-to-sink logic.

Also, did we explore options to off-load the insight aggregation and metric storage to an independent monitoring and insight layer since the above listed approach will burden the Coordinator node further.

Great question! We ought to limit the storage to only retain the essential data for top N queries (with N bounded to a reasonable value) within a short time frame, and extensive benchmark tests should be performed to decide the optimal threshold to set. Also in theory if we use search pipeline (which is outside of core logic), an async thread would be handling the priority queue operations and the export to sinks workflow. In this way, we can minimize the impact on the search request itself.

khushbr commented 9 months ago

Currently, the primary objective for the first iteration is to "provide a reacive monitoring tool". But yeah given the real-time nature of the data, any other features can leverage it and make decisions like query cancellation based on it.

These 2 problem areas have different constraints - SLA, Cost, Availability. Thus, one solution isn't going to be best fit for both of them. We should carefully evaluate the trade-offs and keep the problem statement scoped.

Also in theory if we use search pipeline (which is outside of core logic), an async thread would be handling the priority queue operations and the export to sinks workflow. In this way, we can minimize the impact on the search request itself.

I like the idea of Async operation to buffer and publish the metrics in a separate thread. Ideally, only metric instrumentation should be part of the core-process and everything else - post-processing, publish to off-cluster sink, data analysis and correlation- should be moved outside the core (critical) process.

ansjcy commented 9 months ago

The aim of this project is to establish real-time insights into top N queries that have significant resource consumption or exhibit high latency. Also as part of this project we want to lay the groundwork for future query insight features, ensuring future-proof designs. To make sure we are all on the same page, I'll elaborate on the existing state of work in core search path, the introduced logic, and the proposed alternative ways to implement the metrics collection and processing logic in query insights.

Presently, in the core search path, we are implementing custom listeners tailored for distinct use cases (for query insights and non-query insights features), such as the SearchTimeProvider listener to capture per-phase request latency. However, the current design ties each listener to a specific feature, even when some metrics collected are similar. Moreover, we are adding synchronous logic in the core search request workflow to capture insights related to the request, for example, the searchQueryCategorizer to capture request shape.

image

These approaches, as the features in query insights expand, present several drawbacks:

To address these challenges, I propose we should keep the following tenets in query insights (arguably when implementing features in core search workflow):

Any feedback would be appreciated @msfroh @getsaurabh02 @deshsidd @sgup432 @dzane17

arjunkumargiri commented 9 months ago

Thanks for the detailed design @ansjcy . Top N queries fetch API's will be used infrequently by users as the use case would be to identify top queries for a given time period. To support a point in time retrieval functionality, having to process all requests seems to be a waste of CPU resources. Building in memory priority queue per use case(latency, CPU, memory) does not provide flexibility to add new functionalities such as query consumption by a specific query type. I agree with @msfroh that we should consider to off load query instrumentation data to a different store and rather than pre-processing the data query insights plugin could read required information from the store. Users should have flexibility to configure data retention through cluster settings as suggested for sink store.

ansjcy commented 9 months ago

Thanks a lot for all your feedback! Currently, we are already processing all requests with listeners to obtain per-request latency in both the "coordinator slow log" feature and the "request level latency" feature. Based on benchmark tests, it appears that instrumenting per-request latency won't significantly impact resources usages. We will be reusing the existing per-request latency tracking logic to avoid adding extra complexity as well.

From a user's point of view, this feature will enable them with a "top N view" of their requests. This view could manifest as an OpenSearch index, the response from an API, or charts on future query insight dashboards. As a result, the aggregated data serves internal purposes, requiring only the storage of essential "point in time" information. By employing asynchronous logic to avoid blocking the core search workflow (mentioned here in the comment), the impact on resource usage is anticipated to be minimal.

Also, offloading the query instrumentation logic to external sinks would require us to retain all necessary data for every request. Currently, we lack a state-of-the-art on-node store capable of holding all request information for aggregation and calculating the top N when user requested. Moreover, we refrain from introducing dependencies on a specific sink in this feature. Additionally, implementing the workflow mentioned in this comment will be beneficial for future cluster insights features cause they can all built on top of this workflow by introducing their own metrics/spans/collection workflow.

image

A potential enhancement in the future is, we can leverage the OPTL collector when it becomes available and migrate certain aggregation logic from the query insight plugin to OPTL collectors outside of OpenSearch process. With this approach, we can send traces/spans to OPTL collectors, where the collector takes responsibility for necessary calculations, aggregations and export. This strategy could further reduce the impact on the OpenSearch process.

image
ansjcy commented 7 months ago

We will add several new endpoints as part of this feature:

Any feedback would be appreciated!

rishabhmaurya commented 7 months ago

@ansjcy Can you provide sample request and response object for these APIs? I wanted to see how what the query object would look like? The one you have mentioned in the description looks little outdated -

"top_queries": [
    {
      "query_id": "12345",
      "latency": 600,
      "timestamp": "2023-11-09T12:29:15Z",
      "search_type": "QUERY_THEN_FETCH",
      "indices": [
        "index1",
        "index2"
      ],
      "shards": 5,
      "phases_details": {
        "query": 100,
        "fetch": 200,
        "expand": 300,
      },
      // Additional customized attributes details specific to the query
      "attributes": {
        "user_id": "value"
      }
    },

The one you are using here - https://github.com/opensearch-project/OpenSearch/pull/11904 has following format -

{
      "timestamp" : 1705012838928,
      "searchType" : "QUERY_THEN_FETCH",
      "source" : "{\"size\":20}",
      "totalShards" : 1,
      "indices" : [
        "my-index-0"
      ],
      "propertyMap" : { },
      "phaseLatencyMap" : {
        "expand" : 0,
        "query" : 827,
        "fetch" : 0
      }
    },

This information doesn't look useful to me. I would like to see the query itself which took long, if not full query object then at least the shape of the query with information such as - different clauses of the query, aggregation, sort. Type of query (different than searchType).

ansjcy commented 7 months ago

Hi @rishabhmaurya , in the latest implementation, one example of such query record would be

{
      "timestamp" : 1706597987369,
      "phase_latency_map" : {
        "expand" : 0,
        "query" : 16,
        "fetch" : 1
      },
      "search_type" : "query_then_fetch",
      "source" : "{\"size\":20,\"query\":{\"bool\":{\"must\":[{\"match_phrase\":{\"message\":{\"query\":\"document 2\",\"slop\":0,\"zero_terms_query\":\"NONE\",\"boost\":1.0}}},{\"match\":{\"user.id\":{\"query\":\"cyji\",\"operator\":\"OR\",\"prefix_length\":0,\"max_expansions\":50,\"fuzzy_transpositions\":true,\"lenient\":false,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}}}",
      "total_shards" : 1,
      "indices" : [
        "my-index-0"
      ],
      "latency" : 23
    },

for search query

curl -X GET "localhost:9200/my-index-0/_search?size=20&pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "message": "document 2"
          }
        },
        {
          "match": {
            "user.id": "cyji"
          }
        }
      ]
    }
  }
}'

The source field would contain information about the query itself, like "different clauses of the query, aggregation".

smacrakis commented 7 months ago
  1. If I want to record the previous hour's queries every hour ("on a predefined schedule"), how do I configure the place they are recorded?
  2. If I record on a predefined schedule, how do I reset the priority queue so that I don't see the same queries in the next time period (and avoid race conditions)?
  3. How can the caller correlate their queries with the ones reported here? I'm guessing that query_id is internal (not supplied by the caller) -- is that correct? The timestamp can't be used either, because it is the time that the query phase starts, which is also internal (there may be queuing etc.) -- maybe this should be the time that the query arrives at the API? Does the caller need to preserve the source of every query it sends? Keep in mind, too, that the identical query may perform differently at different times (different state of the index, software locks, etc.).
smacrakis commented 7 months ago

What does "The source field would contain information about the query itself, like 'different clauses of the query, aggregation'." mean? Doesn't it contain the literal query itself, not "information about" it? Is it scrubbed in any way (removing text fields which might be PII etc.)? -- not saying that it should, just asking.

smacrakis commented 7 months ago

More fundamentally, the notion of top-N isn't necessarily what I want. Maybe I want all queries that take more than 1 s, whether there are 2 of them in a particular hour or 200. Or all "interactive" queries that take more than 300ms, but I don't care if "reporting" queries take 10 s. I also typically want to group by "query shape" (as Froh calls it).

smacrakis commented 7 months ago

Re #12084, that is client-side logging of user behavior, notably the actions that the user takes on the search results -- did they click on result 3? did they add result 5 to their shopping basket? did they click on 6 and immediately hit the back button? Did they LIKE something? etc.

Of course there are insights that can be derived by combining server-side measurements (like processing time in different search phases) and client-side measurements, but the two kinds of data are quite different.

ansjcy commented 7 months ago

Thank you for the feedback!

If I want to record the previous hour's queries every hour

I might not explain the concept of "window" well in the previous discussions. A "window" actually has fixed start and end. For example, if you set window size to one hour, it means the framework will collect top queries in every actual hour, like (1am - 2am, 2am - 3am ... etc). Currently we are keeping the current and the last 1 window in memory. In the next releases we can set a limit on the memory usage of query insights plugin and keep even more historical windows in memory.

If I record on a predefined schedule, how do I reset the priority queue so that I don't see the same queries in the next time period (and avoid race conditions)?

The Priority queue will be reset and data will be rotated to the historical windows when a new window starts.

How can the caller correlate their queries with the ones reported here? I'm guessing that query_id is internal (not supplied by the caller)

That's a great question and this is actually one of the top ask we heard from the community. We plan to add support to provide user identifier in the top n queries response. In that way it would be much easier to correlate a "top request" with the actual user who sent it.

Maybe I want all queries that take more than 1 s, whether there are 2 of them in a particular hour or 200.

We can add customizable configuration to support this as well to filter out those records with low latency :)

Re https://github.com/opensearch-project/OpenSearch/issues/12084, that is client-side logging of user behavior ..

Thank you for the response! I just updated a comment in #12084 as well.