GoogleCloudPlatform / DataflowTemplates

Cloud Dataflow Google-provided templates for solving in-Cloud data tasks
https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
Apache License 2.0
1.14k stars 960 forks source link

Expose Elasticsearch socket timeout option #1710

Closed seanstory closed 2 months ago

seanstory commented 3 months ago

👋 Elastic dev here.

We had a support issue come in from a customer using the BigQuery Template to ingest large amounts of data into Elasticsearch. They were using an Ingest Pipeline on the Elasticsearch side to apply an ML model, and getting errors like:

java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE]
full stack trace ``` errorGroups: [1] insertId: "2421626710615581440:164328:0:14638" jsonPayload: { exception: "java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:834) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:259) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246) at com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1592) at com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1541) at com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650) at com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer$TransformTextViaJavascript$1.processElement(JavascriptTextTransformer.java:371) at com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer$TransformTextViaJavascript$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650) at com.google.cloud.teleport.v2.transforms.BigQueryConverters$TableRowToJsonFn.processElement(BigQueryConverters.java:152) at com.google.cloud.teleport.v2.transforms.BigQueryConverters$TableRowToJsonFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650) at org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn.processElement(PassThroughThenCleanup.java:84) at org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup$IdentityFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2195) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$3.processElement(BigQueryIO.java:1380) at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$3$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:816) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650) at org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151) at org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWindowedValue(FnApiDoFnRunner.java:2680) at org.apache.beam.sdk.transforms.Reshuffle$RestoreMetadata$1.processElement(Reshuffle.java:187) at org.apache.beam.sdk.transforms.Reshuffle$RestoreMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792) at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2650) at org.apache.beam.sdk.transforms.Reshuffle$1.processElement(Reshuffle.java:123) at org.apache.beam.sdk.transforms.Reshuffle$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:803) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348) at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275) at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158) at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537) at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150) at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ... 1 more ```

After some digging, we found that this 30 second default timeout comes from the Elasticsearch RestClient, and was exposed in the Dataflow ElasticsearchIO class, but was not exposed through ElasticsearchWriteOptions to the WriteToElasticsearch transform. This PR aims to correct that and expose this timeout, as we expect this use case of ingesting resource-intensive (and therefore slow) batches of data to continue to rise in popularity, in the modern era of vector search.

seanstory commented 3 months ago

@shreyakhajanchi @fbiville @liferoad (tagging y'all since you'd helped review https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1393) can y'all give this a review, and/or let me know if any changes are needed?

seanstory commented 2 months ago

Thanks @liferoad ! Can you merge? I don't have permission to.

liferoad commented 2 months ago

Thanks @liferoad ! Can you merge? I don't have permission to.

Yes, waiting for the internal tests to be done. Thanks.