elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
1.12k stars 24.83k forks source link

Reranker retriever query fails if window size > top N in inference endpoint #111202

Closed demjened closed 3 months ago

demjened commented 3 months ago

Elasticsearch Version

8.15

Installed Plugins

No response

Java Version

bundled

OS Version

N/A

Problem Description

The text_similarity_reranker retriever query fails if rank_window_size is greater than top_n in the rerank inference endpoint's task settings.

When creating the inference endpoint we have the option to specify top_n to return only N documents. By default this is omitted and the issue doesn't occur. However if e.g. top_n = 10 is specified in the endpoint and the reranker query defines a rank_window_size greater than 10, the reranker process fails due to an array index of bounds error.

Steps to Reproduce

  1. Create a deployment (serverless or 8.15+).

  2. Index some documents, e.g. short passages with a text field.

  3. Create a rerank inference endpoint with top_n set in the task settings:

    PUT _inference/rerank/cohere-rerank-inference-top-10
    {
    "service": "cohere",
    "service_settings": {
    "model_id": "rerank-english-v3.0",
    "api_key":  <COHERE_API_KEY>
    },
    "task_settings": {
    "top_n": 10
    }
    }
  4. Run a rerank retriever query with a window size larger than the top N value from above:

    POST rerank/_search
    {
    "retriever": {
    "text_similarity_reranker": {
      "retriever": {
        "standard": {
          "query": {
            "match": {
              "text": "Most famous landmark in Paris"
            }
          }
        }
      },
      "rank_window_size": 20,
      "field": "text",
      "inference_id": "cohere-rerank-inference-top-10",
      "inference_text": "Most famous landmark in Paris"
    }
    },
    "size": 20
    }

Expected: the query succeeds and returns the top 10 documents.

Observed: the query fails with an error similar to this:

{
  "error": {
    "root_cause": [],
    "type": "search_phase_execution_exception",
    "reason": "Computing updated ranks for results failed",
    "phase": "rank-feature",
    "grouped": true,
    "failed_shards": [],
    "caused_by": {
      "type": "array_index_out_of_bounds_exception",
      "reason": "Index 16 out of bounds for length 10"
    }
  },
  "status": 500
}

Logs (if relevant)

No response

demjened commented 3 months ago

I haven't tested this, but ~the likely cause of the bug is here - the code expects sortedDocs to be the same size as topResults. We need to handle if it's not the same and add cover these cases with tests.~

Update: the culprit is this line. The inference response contains document indices relative to the rank_window_size inputs. However if top_n is specified, it only returns the top N of those. For example with rank_window_size==20 and top_n==10 this is a valid response:

[RankedDoc{index='1', relevanceScore='0.9961155', text='null', hashcode=-1335808012},
RankedDoc{index='15', relevanceScore='0.9865199', text='null', hashcode=-1340785186},
RankedDoc{index='3', relevanceScore='0.049773447', text='null', hashcode=1815090117}, 
... (7 more RankedDoc-s)]

The above line creates a scores array with a length of 10, but processing the 2nd item (index='15') triggers the out-of-bounds exception.

Atharv914 commented 3 months ago

Sorry, I'm fairly new here but is this a possible fix? Let me know if it isn't:

` /* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one or more contributor license agreements. Licensed under the Elastic License 2.0 and the Server Side Public License, v 1; you may not use this file except in compliance with, at your election, the Elastic License 2.0 or the Server Side Public License, v 1. */

package org.elasticsearch.search.rank.context;

import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.search.rank.feature.RankFeatureDoc; import org.elasticsearch.search.rank.feature.RankFeatureResult; import org.elasticsearch.search.rank.feature.RankFeatureShardResult;

import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List;

import static org.elasticsearch.search.SearchService.DEFAULT_FROM; import static org.elasticsearch.search.SearchService.DEFAULT_SIZE;

// code: RankFeaturePhaseRankCoordinatorContext is a base class that runs on the coordinating node and is responsible for retrieving // code: rank_window_size total results from all shards, rank them, and then produce a final paginated response of [from, from+size] results.

public abstract class RankFeaturePhaseRankCoordinatorContext {

protected final int size; // Number of results to return
protected final int from; // Pagination offset
protected final int rankWindowSize; // Total number of results to rank

// Constructor to initialize the size, from, and rankWindowSize parameters
public RankFeaturePhaseRankCoordinatorContext(int size, int from, int rankWindowSize) {
    this.size = size < 0 ? DEFAULT_SIZE : size;
    this.from = from < 0 ? DEFAULT_FROM : from;
    this.rankWindowSize = rankWindowSize;
}

// Abstract method to compute scores for the feature documents
protected abstract void computeScores(RankFeatureDoc[] featureDocs, ActionListener<float[]> scoreListener);

// Preprocesses the provided documents by sorting them in descending order of scores
protected RankFeatureDoc[] preprocess(RankFeatureDoc[] originalDocs) {
    return Arrays.stream(originalDocs)
        .sorted(Comparator.comparing((RankFeatureDoc doc) -> doc.score).reversed())
        .toArray(RankFeatureDoc[]::new);
}

/**
 *This method is responsible for ranking the global results based on the provided rank feature results from each shard.
 *We first start by extracting ordered feature data through a {@code List<RankFeatureDoc>}
 *from the provided rankSearchResults, and then compute the updated score for each of the documents.
 *Once all the scores have been computed, we sort the results, perform any pagination needed, and then call the `onFinish` consumer
 *with the final array of {@link ScoreDoc} results.
 *@param rankSearchResults a list of rank feature results from each shard
 *@param rankListener      a rankListener to handle the global ranking result
 */
public void computeRankScoresForGlobalResults(
    List<RankFeatureResult> rankSearchResults,
    ActionListener<RankFeatureDoc[]> rankListener
) {
    // Extract feature data from each shard rank-feature phase result
    RankFeatureDoc[] featureDocs = extractFeatureDocs(rankSearchResults);

    // Generate the final `topResults` results, and pass them to fetch phase through the `rankListener`
    if (featureDocs.length == 0) {
        rankListener.onResponse(new RankFeatureDoc[0]);
    } else {
        computeScores(featureDocs, rankListener.delegateFailureAndWrap((listener, scores) -> {
            for (int i = 0; i < featureDocs.length; i++) {
                featureDocs[i].score = scores[i];
            }
            listener.onResponse(featureDocs);
        }));
    }
}

/**
 *Ranks the provided {@link RankFeatureDoc} array and paginates the results based on the `from` and `size` parameters. Filters out
 *documents that have a relevance score less than min_score.
 *@param rankFeatureDocs documents to process
 */
public RankFeatureDoc[] rankAndPaginate(RankFeatureDoc[] rankFeatureDocs) {
    // Sort documents by their scores
    RankFeatureDoc[] sortedDocs = preprocess(rankFeatureDocs);
    // Ensure rankWindowSize does not exceed size
    int effectiveSize = Math.min(rankWindowSize, size);
    // Initialize an array to hold the top results based on pagination
    RankFeatureDoc[] topResults = new RankFeatureDoc[Math.max(0, Math.min(effectiveSize, sortedDocs.length - from))];
    // Fill the topResults array with sorted documents from the specified pagination offset
    for (int rank = 0; rank < topResults.length; ++rank) {
        topResults[rank] = sortedDocs[from + rank];
        topResults[rank].rank = from + rank + 1;
    }
    return topResults;
}

// Extracts feature documents from the list of rank feature results from each shard
private RankFeatureDoc[] extractFeatureDocs(List<RankFeatureResult> rankSearchResults) {
    List<RankFeatureDoc> docFeatures = new ArrayList<>();
    for (RankFeatureResult rankFeatureResult : rankSearchResults) {
        RankFeatureShardResult shardResult = rankFeatureResult.shardResult();
        for (RankFeatureDoc rankFeatureDoc : shardResult.rankFeatureDocs) {
            if (rankFeatureDoc.featureData != null) {
                docFeatures.add(rankFeatureDoc);
            }
        }
    }
    return docFeatures.toArray(new RankFeatureDoc[0]);
}

} ` This Java class in Elasticsearch coordinates the ranking of search results from multiple shards. It processes and sorts the documents based on their scores, ensures that rankWindowSize does not exceed size, and handles pagination. Now onto my second code:

`import org.elasticsearch.action.ActionListener; import org.elasticsearch.search.rank.feature.RankFeatureDoc; import org.junit.Test; import static org.junit.Assert.*;

public class RankFeaturePhaseRankCoordinatorContextTest {

@Test
public void testRankWindowSizeDoesNotExceedTopN() {
    // Setup
    int size = 10;
    int from = 0;
    int rankWindowSize = 20;
    RankFeaturePhaseRankCoordinatorContext context = new RankFeaturePhaseRankCoordinatorContext(size, from, rankWindowSize) {
        @Override
        protected void computeScores(RankFeatureDoc[] featureDocs, ActionListener<float[]> scoreListener) {
            // Mock implementation
        }
    };

    // Execute
    RankFeatureDoc[] docs = new RankFeatureDoc[30]; // Mock data
    RankFeatureDoc[] result = context.rankAndPaginate(docs);

    // Assert
    assertTrue(result.length <= size);
}

@Test
public void testEqualRankWindowSizeAndTopN() {
    // Setup
    int size = 10;
    int from = 0;
    int rankWindowSize = 10;
    RankFeaturePhaseRankCoordinatorContext context = new RankFeaturePhaseRankCoordinatorContext(size, from, rankWindowSize) {
        @Override
        protected void computeScores(RankFeatureDoc[] featureDocs, ActionListener<float[]> scoreListener) {
            // Mock implementation
        }
    };

    // Execute
    RankFeatureDoc[] docs = new RankFeatureDoc[30]; // Mock data
    RankFeatureDoc[] result = context.rankAndPaginate(docs);

    // Assert
    assertEquals(size, result.length);
}

} `

This Java test class includes unit tests to ensure that the RankFeaturePhaseRankCoordinatorContext class correctly handles cases where rankWindowSize exceeds top_n, and that the ranking and pagination logic works as expected. Please let me know if this is wrong, I'm still a beginner and learning and would appreciate any feedback, thanks!

demjened commented 3 months ago

Hey @Atharv914 - thanks for the suggestion. I did some more digging and found that the bug is in TextSimilarityRankFeaturePhaseRankCoordinatorContext#extractScoresFromResponse (see updated comment).

We could fix this by backfilling the scores array with Float.NEGATIVE_INFINITY and filter those entries out in a postprocessing step, but that would make it unclear to the user why they get only 10 results when they asked for a window size of 20. So IMO it's better to do a check and throw an exception.

elasticsearchmachine commented 3 months ago

Pinging @elastic/es-search-relevance (Team:Search Relevance)