GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 323 forks source link

Read timed out for PubSub pull #473

Closed torbjornvatn closed 7 years ago

torbjornvatn commented 8 years ago

I'm trying to use the trick described here to make communicate with PubSub from Dataflow running with the DirectPipelinrunner. After a while however, I end up with the exception posted below. Could PubsubJsonClient.pull be the culprit for this as this states that

SocketTimeoutException when pulling messages In most cases, this is caused by the default timeout (20 seconds) in the Java client library. Cloud Pub/Sub requires a longer timeout if you're using hanging pull calls (i.e., req.setReturnImmediately(false)), so it is recommended that you explicitly set a longer timeout. Refer to the RetryHttpInitializerWrapper class for setting a longer timeout.

Any hints on how to get this working locally?

Caused by: java.lang.RuntimeException: Unexpected exception while reading from Pubsub:
    at com.google.cloud.dataflow.sdk.io.PubsubIO$Read$Bound$PubsubReader.processElement(PubsubIO.java:790)
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
    at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.dataflow.sdk.util.PubsubJsonClient.pull(PubsubJsonClient.java:171)
    at com.google.cloud.dataflow.sdk.io.PubsubIO$Read$Bound$PubsubReader.processElement(PubsubIO.java:778)
    at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49)
    at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:139)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:103)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:260)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:181)
francesperry commented 8 years ago

I've just updated that post with information on the new InProcessPipelineRunner, which has support for unbounded PCollections. I'd try that approach!

torbjornvatn commented 7 years ago

Thanks

KarthikaRamani commented 7 years ago

Hi torbjornvatn, Were you able to solve this issue? i am also facing the same issue now and i am getting the same exception. Any hint from you will be helpful

torbjornvatn commented 7 years ago

Sorry, I don't actually remember if I got it working this way. But my colleagues has gotten it to work with the DirectPipelinrunner in the new Beam SDK I believe.

dhalperi commented 7 years ago

Yes -- for Dataflow 1.x we recommend using the InProcessPipelineRunner, and for 2.x based on Apache Beam we recommend using the DirectRunner. If you have issues with these please let us know!

talonx commented 6 years ago

I am seeing these issues with both the DirectRunner as well as the FlinkRunner. Should I open a new ticket?

lukecwik commented 6 years ago

Please open an issue on https://issues.apache.org/jira/projects/BEAM/issues detailing the issue that you see.