opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.81k stars 1.82k forks source link

[BUG] Streaming bulk request hangs #16035

Closed thomas-long-f3 closed 1 month ago

thomas-long-f3 commented 1 month ago

Describe the bug

I'm testing the experimental Streaming Bulk feature in 2.17.0, writing a stream indexer in Go.

The approach I'm taking is to asynchronously open a connection to the _bulk/stream endpoint, then pass in a chunk of bytes containing the usual bulk request header & body, then waiting until a bulk response is streamed back.

Everything works as expected with simple requests (i.e. small documents, index only), however when testing the indexer with a real service I hit a situation where the request is sent, and the response is never received.

I've managed to reproduce this with a simple program (see below). What seems to trigger the issue is a combination of scripted request and larger requests.

Related component

Indexing

To Reproduce

I'm running OpenSearch 2.17.0 locally in docker, using the below configuration:

FROM opensearchproject/opensearch:2.17.0

RUN ./bin/opensearch-plugin install transport-reactor-netty4
  opensearch:
    ports:
      - "9200:9200"
    build:
      context: opensearch
      dockerfile: OpenSearch.Dockerfile
    environment:
      - "discovery.type=single-node"
      - "bootstrap.memory_lock=true"
      - "http.type=reactor-netty4"
      - "DISABLE_INSTALL_DEMO_CONFIG=true" # Prevents execution of bundled demo script which installs demo certificates and security configurations to OpenSearch
      - "DISABLE_SECURITY_PLUGIN=true" # Disables security plugin
      - "DISABLE_PERFORMANCE_ANALYZER_AGENT_CLI=true" # Disables Performance Analyzer Plugin
    ulimits:
      memlock:
        soft: -1 # Set memlock to unlimited (no soft or hard limit)
        hard: -1
      nofile:
        soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
        hard: 65536
    user: "1000:1000"

When running the following Go program:

(Apologies I can't replicate this with a simple curl request, as I can't split the chunks per document)

package main

import (
    "context"
    "encoding/json"
    "io"
    "log"
    "net/http"
    "os/signal"
    "syscall"
)

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    tr := http.DefaultTransport.(*http.Transport).Clone()
    tr.DisableCompression = true

    reqReader, reqWriter := io.Pipe()
    respReader, respWriter := io.Pipe()

    streamReady := make(chan struct{})

    go func() {
        req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://localhost:9200/_bulk/stream", reqReader)
        if err != nil {
            panic(err)
        }

        req.ContentLength = -1
        req.Header.Set("Content-Type", "application/json")

        log.Println("Sending stream request...")

        resp, err := tr.RoundTrip(req)
        if err != nil {
            panic(err)
        }
        defer resp.Body.Close()

        log.Println("Ready to stream...")
        streamReady <- struct{}{}

        _, err = io.Copy(respWriter, resp.Body)
        if err != nil {
            panic(err)
        }
    }()

    <-streamReady

    for _, body := range [][]byte{
        []byte(`{ "index": { "_index": "test", "_id": "123" }
{"script": {"source": "ctx._source.counter += params.count", "lang": "painless", "params": {"count": 1}}, "upsert": {"counter": 1}}
`),
        []byte(`{ "index": { "_index": "test" } }
{ "hello": "world", "big": "aaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" }
`),
    } {
        n, err := reqWriter.Write(body)
        if err != nil {
            panic(err)
        }
        log.Printf("Written doc (%d)\n", n)

        var out json.RawMessage
        if err := json.NewDecoder(respReader).Decode(&out); err != nil {
            panic(err)
        }
        log.Printf("Received: %s", out)
    }

    <-ctx.Done()
}

This prints...

2024/09/23 08:49:35 Sending stream request...
2024/09/23 08:49:35 Ready to stream...
2024/09/23 08:49:35 Written doc (178)
2024/09/23 08:49:35 Received: {"took":118,"errors":false,"items":[{"index":{"_index":"test","_id":"123","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}
2024/09/23 08:49:35 Written doc (1878)

And hangs, there is no second response received.

For comparison, using a simple request (see below) in an infinite loop with happily run continuously without issue.

{ "index": { "_index": "test" } }
{ "hello": "world" }

Expected behavior

Each request returns a response.

Additional Details

No response

thomas-long-f3 commented 1 month ago

Just ran this again with the root logger set to DEBUG and can see the below error:

[2024-09-24T15:16:57,146][DEBUG][r.suppressed             ] [7659f0ded3ba] path: /_bulk/stream, params: {}
2024-09-24T15:16:57.147299792Z java.lang.IllegalArgumentException: The bulk request must be terminated by a newline [\n]
2024-09-24T15:16:57.147302333Z  at org.opensearch.action.bulk.BulkRequestParser.findNextMarker(BulkRequestParser.java:93) ~[opensearch-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147304042Z  at org.opensearch.action.bulk.BulkRequestParser.parse(BulkRequestParser.java:299) ~[opensearch-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147305292Z  at org.opensearch.action.bulk.BulkRequest.add(BulkRequest.java:296) ~[opensearch-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147306458Z  at org.opensearch.rest.action.document.RestBulkStreamingAction.lambda$prepareRequest$1(RestBulkStreamingAction.java:145) ~[opensearch-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147307750Z  at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147308917Z  at reactor.core.publisher.FluxZip$ZipCoordinator.drain(FluxZip.java:943) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147310083Z  at reactor.core.publisher.FluxZip$ZipInner.onNext(FluxZip.java:1121) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147311208Z  at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onNext(FluxBuffer.java:160) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147312417Z  at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:878) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147313583Z  at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:803) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147314708Z  at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147315917Z  at org.opensearch.http.reactor.netty4.ReactorNetty4StreamingHttpChannel.receiveChunk(ReactorNetty4StreamingHttpChannel.java:99) [transport-reactor-netty4-client-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147317208Z  at org.opensearch.http.reactor.netty4.ReactorNetty4StreamingRequestConsumer.accept(ReactorNetty4StreamingRequestConsumer.java:37) [transport-reactor-netty4-client-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147319167Z  at org.opensearch.http.reactor.netty4.ReactorNetty4StreamingRequestConsumer.accept(ReactorNetty4StreamingRequestConsumer.java:23) [transport-reactor-netty4-client-2.17.0.jar:2.17.0]
2024-09-24T15:16:57.147329083Z  at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147330208Z  at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147331250Z  at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147332208Z  at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) [reactor-core-3.5.20.jar:3.5.20]
2024-09-24T15:16:57.147333167Z  at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:379) [reactor-netty-core-1.1.22.jar:1.1.22]
2024-09-24T15:16:57.147334083Z  at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425) [reactor-netty-core-1.1.22.jar:1.1.22]
2024-09-24T15:16:57.147335083Z  at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:787) [reactor-netty-http-1.1.22.jar:1.1.22]
2024-09-24T15:16:57.147336042Z  at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) [reactor-netty-core-1.1.22.jar:1.1.22]
2024-09-24T15:16:57.147337083Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147338125Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147339167Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147340208Z  at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) [netty-handler-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147341208Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147342208Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147343250Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147344375Z  at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:311) [reactor-netty-http-1.1.22.jar:1.1.22]
2024-09-24T15:16:57.147345333Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147346375Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147347417Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147349417Z  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147350500Z  at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147351500Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147352500Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147353542Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147354542Z  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147355542Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147356542Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147357542Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147358792Z  at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147359833Z  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147360833Z  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147361833Z  at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147362833Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147363833Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147364833Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147365833Z  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147366875Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147368875Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147370000Z  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147371083Z  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147372083Z  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147373042Z  at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147374167Z  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147375125Z  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147376083Z  at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147377083Z  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
2024-09-24T15:16:57.147378000Z  at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]

I would expect any errors to be returned, but I also wouldn't expect this to error. I wonder if there is some request/response buffering happening which is splitting the larger documents.

reta commented 1 month ago

Thanks @thomas-long-f3 , I suspect we may hit Netty's chunk max size limits but the large documents are indeed have constraints, working on, thank you.

reta commented 1 month ago

The server responds now with the error (instead of hanging):

2024/10/01 16:32:33 Sending stream request...
2024/10/01 16:32:33 Ready to stream...
2024/10/01 16:32:33 Written doc (178)
2024/10/01 16:32:33 Received: {"took":39,"errors":false,"items":[{"index":{"_index":"test","_id":"123","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_primary_term":2,"status":200}}]}
2024/10/01 16:32:33 Written doc (1878)
2024/10/01 16:32:33 Received: {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"The bulk request must be terminated by a newline [\\n]"}],"type":"illegal_argument_exception","reason":"The bulk request must be terminated by a newline [\\n]"},"status":400}
thomas-long-f3 commented 1 month ago

Thanks @reta for fixing this! I still would have expected the second request to succeed as it's terminated by a newline. Can you see any reason why this is failing?

reta commented 1 month ago

Thanks @reta for fixing this! I still would have expected the second request to succeed as it's terminated by a newline. Can you see any reason why this is failing?

Thanks @thomas-long-f3 , yes, it is bit different issue, I am working on that as we speak, but we should be able to handle the large documents for sure.