opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.42k stars 1.72k forks source link

[BUG] CompoundProcessor limits ingest pipeline length #6338

Open MaximeWewer opened 1 year ago

MaximeWewer commented 1 year ago

Describe the bug I have used the ingest pipeline to rename a large number of fields and I found that it is possible to get a stack overflow exception when running an ingest pipeline with many processors.

I've found similar issue on the Elasticsearch github => https://github.com/elastic/elasticsearch/issues/84274

Expected behavior Can you fix the problem like the PR below ? https://github.com/elastic/elasticsearch/pull/84250

Host/Environment (please complete the following information):

saratvemulapalli commented 1 year ago

@MaximeWewer thanks for reaching out. We cannot look at code which is not compatible with ALv2. It would be nice to see the stacktrace of the problem and we can learn about it more.

MaximeWewer commented 1 year ago

Hi @saratvemulapalli,

Yes, of course, I understand for license compliance. Here is an example of my stacktrace when the ingest pipeline stops working :

[2023-02-20T10:22:03,191][ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [opensearch.node01] fatal error in thread [opensearch[opensearch.node01][write][T#1]], exiting java.lang.StackOverflowError: null at org.opensearch.ingest.IngestDocument.createTemplateModel(IngestDocument.java:713) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.IngestDocument.renderTemplate(IngestDocument.java:709) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.common.RenameProcessor.execute(RenameProcessor.java:82) ~[?:?] at org.opensearch.ingest.Processor.execute(Processor.java:65) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.Processor.execute(Processor.java:70) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.Processor.execute(Processor.java:70) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.Processor.execute(Processor.java:70) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) ~[opensearch-1.2.4.jar:1.2.4] at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) ~[opensearch-1.2.4.jar:1.2.4] fatal error in thread [opensearch[opensearch.node01][write][T#1]], exiting java.lang.StackOverflowError at org.opensearch.ingest.IngestDocument.createTemplateModel(IngestDocument.java:713) at org.opensearch.ingest.IngestDocument.renderTemplate(IngestDocument.java:709) at org.opensearch.ingest.common.RenameProcessor.execute(RenameProcessor.java:82) at org.opensearch.ingest.Processor.execute(Processor.java:65) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.Processor.execute(Processor.java:70) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:147) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:161) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:152) at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$1(CompoundProcessor.java:179) at org.opensearch.ingest.Processor.execute(Processor.java:70)

dblock commented 1 year ago

Looks like a bug. Looking for someone to write a unit test that reproduces this problem and a fix, without looking at any non-APLv2 code, please.

msfroh commented 1 year ago

From the description in this issue, I was able to reproduce with the following unit test in CompoundProcessorTests:

    public void testManyProcessors() {
        TestProcessor testProcessor = new TestProcessor(i -> {});
        Processor[] processors = new Processor[10_000];
        Arrays.fill(processors, testProcessor);
        CompoundProcessor compoundProcessor = new CompoundProcessor(processors);
        compoundProcessor.execute(ingestDocument, (result, e) -> {});
    }

The problem is that CompoundProcessor implements chaining via callbacks. Each invocation adds a nested call to trigger the next processor.

I think we might be able to reimplement it as a for-loop. I need to wrap my head around it some more, but I was already planning to refactor CompoundProcessor as one of my TODOs on https://github.com/opensearch-project/OpenSearch/pull/6587

msfroh commented 1 year ago

I poked at the CompoundProcessor code for a few minutes. It's not a simple matter of turning it into a for-loop, since some processors may do things that we want to do asynchronously, like network calls to get some extra data to enrich documents.

I'll give it some more thought.

It would be so much easier if Java had tail-call optimization. :)

msfroh commented 3 months ago

We should probably address this by adding a method to Processor (and SearchRequestProcessor and SearchReponseProcessor) like:

default boolean isSynchronous() {
  return true;
}

For search pipelines, we have "handy" async subinterfaces that flip the sync-vs-async abstractness, so we can override isSynchronous there.

For ingest processors, we could similarly add a subinterface that flips things. Right now, we don't have any async ingest processors AFAIK, but e.g. the ML inference ingest processor (and probably the GeoIP ingest processor) should be async, to avoid holding the transport_worker thread.

Then, for both cases, we can process a whole chain of synchronous processors in a while loop, only using callbacks for async processors. Since the async processors should be running on a task executor, the callback should execute on a different thread (with a fresh stack) anyway.

dhwanilpatel commented 1 month ago

For CompoundProcessor we are using the execute method with callback via BiConsumer only. Can we use the same for the all type [sync + async] processor, and implement a mechanism where Compound processor will wait for the response from BiConsumer via Countdown latch and after that execute the next processor. With this way we can move this recursive call to iterative call as well.

Simple snippet to explain idea :

void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
    for(int i = 0 ; i < processors ; i++) {
        CountDownLatch latch = new CountDownLatch(1);
        ...
        processor.execute(ingestDocument, (result, e) -> {
            ...
            latch.countDown();
        });
        latch.wait();
    }
}

@msfroh Thoughts? Let me know if I have missed something over here.

cc: @shwetathareja / @ankitkala

msfroh commented 1 month ago

@msfroh Thoughts? Let me know if I have missed something over here.

The difficulty with that approach is that the latch.wait() blocks the current thread, which is what the async processors are trying to avoid.

If you have an async ingest processor that makes a long network call (e.g calling out to a remote ML inference service to compute embeddings), it could exhaust the available indexing threads. Normally the remote call would not hold a thread, but only need to execute the callback on a threadpool once the call completes.