apache / lucene

Apache Lucene open-source search software
https://lucene.apache.org/
Apache License 2.0
2.45k stars 973 forks source link

Support for criteria based DWPT selection inside DocumentWriter #13387

Open RS146BIJAY opened 1 month ago

RS146BIJAY commented 1 month ago

Description

Issue

Today, Lucene internally creates multiple DocumentWriterPerThread (DWPT) instances per index to facilitate concurrent indexing across different ingestion threads. When documents are indexed by the same DWPT, they are grouped into the same segment post flush. As DWPT assignment to documents is only concurrency based, it’s not possible to predict or control the distribution of documents within the segments. For instance, during the indexing of time series logs, its possible for a single DWPT to index logs with both 5xx and 2xx status codes, leading to segments that contains a heterogeneous mix of documents.

Typically, in scenarios like log analytics, users are more interested in a certain subset of data (errors (4XX) and/or fault requests (5XX) requests logs). Randomly assigning DWPT to index document can disperse these relevant documents across multiple segments. Furthermore, if these documents are sparse, they will be thinly spread out even within the segments, necessitating the iteration over many less relevant documents for search queries. While the optimisation to use BKD tree to skip non competitive documents by the collectors significantly improves query performance, actual number of documents iterated still depends on arrangement of data in the segment and how underlying BKD gets constructed.

Storing relevant log documents separately from relatively less relevant ones, such as 2xx logs, can prevent their scattering across multiple segments. This model can markedly enhance query performance by streamlining searches to involve fewer segments and omitting documents that are less relevant. Moreover, clustering related data allows for the pre-computation of aggregations for frequently executed queries (e.g., count, minimum, maximum) and store them as separate metadata. Corresponding queries can be served from the metadata itself, thus optimizing both on the latency and compute.

Proposal

In this proposal, we suggest adding support for DWPT selection mechanism based on a specific criteria within the DocumentWriter. Users can define this criteria through a grouping function as a new IndexWriterConfig configuration. This grouping criteria can be based on the anticipated query pattern in the workload to store frequently queried data together. During indexing, this function would be evaluated for each document, ensuring that documents with differing criteria are indexed using separate DWPTs. For instance, in the context of http request logs, the grouping function could be tailored to assign DWPTs according to the status code in the log entry.

Associated OpenSearch RFC

https://github.com/opensearch-project/OpenSearch/issues/13183

Improvements with new DWPT distribution strategy

We worked on a POC in Lucene and tried integrating it with OpenSearch. We validated DWPT distribution based on different criterias such as status code, timestamp etc against different types of workload. We observed a 50% - 60% improvements in performance of range, aggregation and sort queries with proposed DWPT selection approach.

Implementation Details

User defined grouping criteria function will be passed to DocumentWriter as a new IndexWriterConfig configuration. During indexing of a document, the DocumentWriter will evaluate this grouping function and pass this outcome to the DocumentWriterFlushControl and DocumentWriterThreadPool when requesting a DWPT for indexing the document. The DocumentWriterThreadPool will now maintain a distinct pool of DWPTs for each possible outcome. The specific pool selected for indexing a document will depend on the outcome of the document for the grouping function. Should the relevant pool be empty, a new DWPT will be created and added to this pool. Connecting with above example for http request logs, having a distinct pools for 2xx and 5xx status code logs would ensure that 2xx logs are indexed using a separate set of DWPTs from the 5xx status codes logs. Once a DWPT is designated for flushing, it is checked out of the thread pool and won't be reused for indexing.

Further, in order to ensure that grouping criteria invariant is maintained even during segment merges, we propose a new merge policy that acts as a decorator over the existing Tiered Merge policy. During a segment merge, this policy would categorize segments according to their grouping function outcomes before merging segments within the same category, thus maintaining the grouping criteria’s integrity throughout the merge process.

Guardrails

To mange the system’s resources effectively, guardrails will be implemented to limit the numbers of groups that can be generated from grouping function. User will need to provide a predefined list of acceptable outcomes for the grouping function, along with the function itself. Documents whose grouping function outcome is not within this list will be indexed using a default pool of DWPTs. This limits the number of DWPTs created during indexing, preventing the formation of numerous small segments that could lead to frequent segment merges. Additionally, a cap on DWPT count keeps the JVM utilization and garbage collection in check.

mikemccand commented 1 month ago

I like this idea! I hope we can find a simple enough API exposed through IWC to enable the optional grouping.

This also has nice mechanical sympathy / symmetry with the distributed search engine analog. A distributed search engine like OpenSearch indexes and searches into N shards across multiple servers, and this is nearly precisely the same logical problem that Lucene tackles on a single multi-core server when indexing and searching into N segments, especially as Lucene's intra-query concurrency becomes the norm/default and improves (e.g. allowing intra-segment per query concurrency as well). We should cross-fertilize more based on this analogy: the two problems are nearly the same. A shard, a segment, same thing heh (nearly).

So this proposal is bringing custom document routing feature from OpenSearch, down into Lucene's segments.

jpountz commented 1 month ago

This is an interesting idea!

You do not mention it explicitly in the issue description, but presumably this only makes sense if an index sort is configured, otherwise merges may break the clustering that you are trying to create in the first place?

The DocumentWriterThreadPool will now maintain a distinct pool of DWPTs for each possible outcome.

I'm a bit uncomfortable with this approach. It is so heavy that it wouldn't perform much better than maintaining a separate IndexWriter per group? I wonder if we could do something within a single DWPT pool, e.g. could we use rendez-vous hashing to optimistically try to reuse the same DWPT for the same group as often as possible, but only on a best-effort basis, not trading concurrency or creating more DWPTs than indexing concurrency requires?

RS146BIJAY commented 1 month ago

Thanks Mike and Adrian for the feedback.

You do not mention it explicitly in the issue description, but presumably this only makes sense if an index sort is configured, otherwise merges may break the clustering that you are trying to create in the first place?

Not exactly. As mentioned, in order to ensure that grouping criteria invariant is maintained even during segment merges, we are introducing a new merge policy that acts as a decorator over the existing Tiered Merge policy. During a segment merge, this policy would categorize segments according to their grouping function outcomes before merging segments within the same category, thus maintaining the grouping criteria’s integrity throughout the merge process.

I wonder if we could do something within a single DWPT pool, e.g. could we use rendez-vous hashing to optimistically try to reuse the same DWPT for the same group as often as possible, but only on a best-effort basis, not trading concurrency or creating more DWPTs than indexing concurrency requires?

I believe even if we use a single DWPT pool with rendezvous hashing to distribute DWPTs we would end up creating same number of DWPTs as having different DWPT pools for different group. Consider an example where we are grouping logs based on status code for an index and 8 concurrent indexing thread is indexing 2xx status code logs. This will create 8 DWPTs. Now 4 threads starts indexing 4xx status code logs concurrently, this will require 4 extra DWPTs for indexing logs if we want to maintain status code based grouping. Instead of creating new DWPTs, we can try reusing existing 4 DWPTs created for 2xx status code logs on best effort basis. But this will again mix 4xx status code logs with 2xx status code logs defeating the purpose of status code based grouping of logs. Also to ensure that number of DWPTs created are in check, we will be creating guardrails on number of groups that can be generated from grouping function. Let me know if my understanding is correct.

jpountz commented 1 month ago

Thanks for explaining.

The concern I have given how we're planning on never flushing/merging segments from the same group is that this would essentially perform the same as maintaining one IndexWriter per group, which is quite heavy, and can already be done easily on top of Lucene?

To get similar benefits from clustering but without incurring the overhead of segments, I feel like we should rather improve our support for clustering at the doc ID level, ie. index sorting. And maybe ideas like this criteria-based selection of DWPTs could help speed up the creation of sorted indexes?

RS146BIJAY commented 1 month ago

Thanks for the suggestion. Above suggestion for clustering within the segment does improves skipping of documents (especially when combined with BKD optimisation to skip non competitive documents). But it still limits us from building multiple optimisations which could be done by having separate DWPT pools for different groups:

Actually, we won't be able to build multiple optimizations on top of the segment topology if we store them together. Let me know if this makes sense.

jpountz commented 1 month ago

I agree that better organizing data across segments yields significant benefits, I'm only advocating for doing this by maintaining a separate IndexWriter for each group instead of doing it inside of DocumentsWriter.

RS146BIJAY commented 4 weeks ago

I agree that better organizing data across segments yields significant benefits, I'm only advocating for doing this by maintaining a separate IndexWriter for each group instead of doing it inside of DocumentsWriter.

Sorry missed answering this part in my earlier response. We did explore this approach of creating an IndexWriter/Lucene Index (or OpenSearch shard) for each group. However, implementing this approach would lead to significant overhead on the client side (such as OpenSearch) both in the terms of code changes and operational overhead like metadata management. On the other hand, maintaining separate DWPT pools for different groups would require minimal changes inside Lucene. The overhead will be lesser here as Lucene shard will still be maintained as a single physical unit. Let me know if this makes sense.

RS146BIJAY commented 3 weeks ago

Attaching a preliminary PR for the POC related to above issue to share my understanding. Please note that this is not the final PR.

jpountz commented 3 weeks ago

However, implementing this approach would lead to significant overhead on the client side (such as OpenSearch) both in the terms of code changes and operational overhead like metadata management.

Can you give more details? The main difference that comes to mind is that using multiple IndexWriters requires multiple Directorys as well and OpenSearch may have a strong assumption that there is a 1:1 mapping between shards and folders on disk. But this could be worked around with a filter Directory that flags each index file with a prefix that identifies the group that each index file belongs to?

mikemccand commented 3 weeks ago

I like @jpountz's idea of just using separate IndexWriters for this use-case, instead of adding custom routing logic to the separate DWPTs inside a single IndexWriter and then also needing a custom MergePolicy that ensures that only the like-segments are merged. A separate IndexWriter would cleanly achieve both of these?

The idea of using a single underlying multi-tenant Directory with multiple FilterDirectory wrappers (one per IndexWriter) is interesting -- do we have such a class already (that would distinguish the tenants via filename prefix or so)? That's a nice idea all by itself (separate from this use case) -- maybe open a spinoff to explore that?

You would also need a clean-ish way to manage a single total allowed RAM bytes across the N IndexWriters? I think IndexWriter's flushing policy or RAM accounting was already generalized to allow for this use case, but I don't remember the details.

Searching across the N separate shards as if they were a single index is also possible via MultiReader, though, I'm not sure how well intra-query concurrency works -- maybe it works just fine because the search-time leaves/slices are all union'd across the N shards?

jpountz commented 3 weeks ago

do we have such a class already (that would distinguish the tenants via filename prefix or so)? That's a nice idea all by itself (separate from this use case) -- maybe open a spinoff to explore that?

I don't think we do. +1 to exploring this separately. I like that we then wouldn't need to tune the merge policy because it would naturally only see segments that belong to its group.

You would also need a clean-ish way to manage a single total allowed RAM bytes across the N IndexWriters? I think IndexWriter's flushing policy or RAM accounting was already generalized to allow for this use case, but I don't remember the details.

Right, IndexWriter#flushNextBuffer() and IndexWriter#ramBytesUsed() allow building this sort of thing on top of Lucene. It would be nice if Lucene provided more ready-to-use utilities around this.

Searching across the N separate shards as if they were a single index is also possible via MultiReader, though, I'm not sure how well intra-query concurrency works -- maybe it works just fine because the search-time leaves/slices are all union'd across the N shards?

Indeed, I'd expect it to work just fine.