elastic / elasticsearch-net

This strongly-typed, client library enables working with Elasticsearch. It is the official client maintained and supported by Elastic.
https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/index.html
Apache License 2.0
3.56k stars 1.15k forks source link

[API Design] Point In Time Helper(s) #5149

Open stevejgordon opened 3 years ago

stevejgordon commented 3 years ago

This proposal introduces several helpers to improve the experience of consuming Point in Time APIs for common scenarios. A Point in Time (PIT) internally works exactly like a scroll, except that the point in time used for scrolls is extended to more than one query. The lifecycle of the PIT is decorrelated from the query.

Point in Time Context

Consumption scenario: As a developer, I want to open a point in time, perform several search requests against that same PIT snapshot and close it after use to free up resources.

The primary helper should therefore simplify the process of opening, using and closing a PIT.

Create a disposable "context" through which search requests can be sent. During use, the context updates the point in time ID after each search operation*. Once the context is disposed of, the PIT is closed.

Proposed API(s)

namespace Nest
{
    public interface IElasticClient
    {
+        PointInTimeContext CreatePointInTimeContext(Time keepAlive = null, Indices indices = null);
    }
}

The point in time context will expose search methods as on the main client, these would apply the appropriate PIT ID. On first use, the PIT open request can first establish a PIT for subsequent searches to use.

public class PointInTimeContext : IDisposable
{
    public async Task<ISearchResponse<TDocument>> SearchAsync<TDocument>(ISearchRequest request, CancellationToken ct = default) where TDocument : class
    {       
    }

    public IAsyncEnumerable<SearchAllDocsResult<T>> SearchAllDocsAsync<T>(ISearchRequest<T> request, Time keepAlive = null, CancellationToken ct = default)
    {
    }

    // ... Expose other Search and SearchAsync methods ...

    public void Dispose()
    {
    }
}

Usage example

using (var pit = Client.CreatePointInTimeContext("1m"))
{
    var response = await pit.SearchAsync<Project>(new SearchRequest<Project>
    {
        Query = new MatchAllQuery()
    });

    Console.WriteLine(response.Hits.Count);

    response = await pit.SearchAsync<Project>(new SearchRequest<Project>
    {
        Query = new MatchAllQuery()
    });

    Console.WriteLine(response.Hits.Count);

    response = await pit.SearchAsync<Project>(new SearchRequest<Project>
    {
        Query = new MatchAllQuery()
    });

    Console.WriteLine(response.Hits.Count);
}

* Considerations: Thread-safety of mutable PIT ID within the context if multiple searches are made in parallel. It is valid to continue using the original PIT ID provided when PIT is open, for all requests even if an updated ID is provided in one of the responses. All IDs should remain valid.

This might mean that is safe to allow a race for the ID updates in the PIT context for multiple requests. Or we could opt to not update the ID with each search. The effect of not updating the ID is that it may slightly reduce performance since the updated PIT ID handles cases where data may have moved between requests.

Point in Time Paginated Searches

The documentation now prefers a PIT with search_after for deep pagination, rather than scroll. We should ensure we provide sufficient helpers to replicate the usage of scrolling, but implemented using PIT. Advantages are that you can perform multiple iterations over the same PIT and also restart a search if exceptions are through for example. We should consider how each case would be best consumed.

For the below, we assume that the overloads which do not accept an existing PointInTime will open one, likely against the specific inferred index for the type <T>.

Point in Time Search All

This is incomplete and a work in progress.

namespace Nest
{
    public interface IElasticClient
    {
+        IAsyncEnumerable<SearchAllDocsResult<T>> PointInTimeSearchAllAsync<T>(ISearchRequest<T> request, Time keepAlive, CancellationToken ct = default);
+        IAsyncEnumerable<SearchAllDocsResult<T>> PointInTimeSearchAllAsync<T>(ISearchRequest<T> request, Time keepAlive, PointInTime pit, CancellationToken ct = default);
    }
}

Usage Example

await foreach (var searchResponse in Client.PointInTimeSearchAllAsync(new SearchRequest<Project>
{
    Size = 2,
    Query = new MatchAllQuery(),
    Sort = new List<ISort> { new FieldSort { Field = Infer.Field<Project>(p => p.NumberOfCommits) }, new FieldSort { Field = Infer.Field("_id") } }
}))
{
    foreach(var doc in searchResponse.Documents)
    {
        Console.WriteLine(doc.Name);
    }
}

Point in Time Search All Docs

Support asynchronous iteration over documents using PIT and search after under the hood. A simple way for consumers to access all documents matching query conditions.

This may be unnecessary since we already expose the enumerable Documents on a SearchResponse.

One potential optimisation to consider is if there's any way we can dispatch the next search_after request before the consumer has enumerated all Documents, rather than waiting. This would more of an implementation detail though.

Proposed API(s)

This is incomplete and a work in progress.

namespace Nest
{
    public interface IElasticClient
    {
+        IAsyncEnumerable<SearchAllDocsResult<T>> PointInTimeSearchAllDocsAsync<T>(ISearchRequest<T> request, Time keepAlive = null, CancellationToken ct = default);
+        IAsyncEnumerable<SearchAllDocsResult<T>> PointInTimeSearchAllDocsAsync<T>(ISearchRequest<T> request, PointInTime pit, Time keepAlive = null, CancellationToken ct = default);
    }
}

Usage Example

await foreach (var doc in Client.PointInTimeSearchAllDocsAsync(new SearchRequest<Project>
{
    Size = 2,
    Query = new MatchAllQuery(),
    Sort = new List<ISort> { new FieldSort { Field = Infer.Field<Project>(p => p.NumberOfCommits) }, new FieldSort { Field = Infer.Field("_id") } }
}))
{
    Console.WriteLine(doc.Document.NumberOfCommits);
}

We should consider consumer the scenario where developers need to perform multiple 'search all' (scroll style) requests over the same PIT. We can achieve this by accepting a PIT as an optional argument which we then use in place of opening a new PIT inside the implementation. We could also provide methods on the PointInTimeContext which provide a short-hand way of searching all docs though an existing PIT.

We should consider the scenario where a consumer may wish to provide a 'starts after' argument to control the "search_after" starting point. The easiest choice here is for the consumer to provide the sort after values on the search request they pass in. When present, we prefer using those for the first search.

In cases such as this MatchAll example, where docs will have the same score, and users do not require an explicit sort, we can and should add a tiebreaker sort of _doc (or review the new automatic tiebreaker functionality) so that the search performs more efficiently.

Point in Time Observe All

Similar to the ScrollAll case we want to search through many documents efficiently.

We likely want an IObservable implementation to match ScrollAll, but also an IAsyncEnumerable to pull results.

Design not yet added

stijnherreman commented 3 months ago

I'm particularly interested in PointInTimeSearchAllAsync. What would a potential implementation look like? Would it hide most of the code in the example below, or does it need to be more flexible than that?

var openPointInTimeResponse = await client.OpenPointInTimeAsync<Foo>(request => request
    .Indices("foo_*")
    .KeepAlive("1m")
    );
ICollection<FieldValue> searchAfter = null;
while (true)
{
    var searchResponse = await client.SearchAsync<Foo>(search => search
        .Pit(pit => pit.Id(openPointInTimeResponse.Id))
        .Query(query => query
            /* query */
        )
        .TrackTotalHits(new(false))
        .Sort(sort => sort.Doc(scoreSort => { }))
        .SearchAfter(searchAfter)
    );
    if (searchResponse.Hits.Count == 0)
    {
        break;
    }

    /* collect or map documents */

    searchAfter = searchResponse.HitsMetadata.Hits.Last().Sort.ToList();
}
var closePointInTimeResponse = await client.ClosePointInTimeAsync(request => request.Id(openPointInTimeResponse.Id));