apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.26k forks source link

[Bug]: SpannerIO.readChangeStream() stops forwarding change records and starts continuously throwing (large number) of Operation ongoing errors #22779

Open ggprod opened 2 years ago

ggprod commented 2 years ago

What happened?

Using a new Dataflow template that reads Spanner change streams via the SpannerIO.readChangeStream(). The streaming template was working correctly for 2 days but then it stops forwarding change records and starts continuously throwing errors like below

Operation ongoing in step SpannerIO-ReadChangeStream-Read-change-stream-partition-ParMultiDo-ReadChangeStreamPartition-/ProcessElementAndRestrictionWithSizing-ptransform-60 for at least 332h25m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
  at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:136)
  at com.lmax.disruptor.MultiProducerSequencer.next(MultiProducerSequencer.java:105)
  at com.lmax.disruptor.RingBuffer.next(RingBuffer.java:263)
  at io.opencensus.impl.internal.DisruptorEventQueue$1.enqueue(DisruptorEventQueue.java:134)
  at io.opencensus.impl.internal.DisruptorEventQueue.enqueue(DisruptorEventQueue.java:162)
  at io.opencensus.implcore.stats.StatsManager.record(StatsManager.java:70)
  at io.opencensus.implcore.stats.MeasureMapImpl.record(MeasureMapImpl.java:82)
  at io.grpc.census.CensusStatsModule$CallAttemptsTracerFactory.<init>(CensusStatsModule.java:447)
  at io.grpc.census.CensusStatsModule$StatsClientInterceptor.interceptCall(CensusStatsModule.java:791)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.api.gax.grpc.GrpcChannelUUIDInterceptor.interceptCall(GrpcChannelUUIDInterceptor.java:52)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.api.gax.grpc.GrpcHeaderInterceptor.interceptCall(GrpcHeaderInterceptor.java:80)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor.interceptCall(GrpcMetadataHandlerInterceptor.java:54)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor.interceptCall(SpannerErrorInterceptor.java:64)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.cloud.spanner.spi.v1.LoggingInterceptor.interceptCall(LoggingInterceptor.java:68)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.cloud.spanner.spi.v1.HeaderInterceptor.interceptCall(HeaderInterceptor.java:72)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:923)
  at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63)
  at io.grpc.stub.MetadataUtils$HeaderAttachingClientInterceptor.interceptCall(MetadataUtils.java:81)
  at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
  at com.google.api.gax.grpc.GrpcClientCalls.newCall(GrpcClientCalls.java:99)
  at com.google.api.gax.grpc.GrpcDirectServerStreamingCallable.call(GrpcDirectServerStreamingCallable.java:65)
  at com.google.api.gax.grpc.GrpcServerStreamingRequestParamCallable.call(GrpcServerStreamingRequestParamCallable.java:61)
  at com.google.api.gax.grpc.GrpcExceptionServerStreamingCallable.call(GrpcExceptionServerStreamingCallable.java:59)
  at com.google.api.gax.rpc.WatchdogServerStreamingCallable.call(WatchdogServerStreamingCallable.java:69)
  at com.google.api.gax.rpc.ServerStreamingCallable$1.call(ServerStreamingCallable.java:237)
  at com.google.api.gax.rpc.ServerStreamingAttemptCallable.call(ServerStreamingAttemptCallable.java:234)
  at com.google.api.gax.rpc.ServerStreamingAttemptCallable.start(ServerStreamingAttemptCallable.java:194)
  at com.google.api.gax.rpc.RetryingServerStreamingCallable.call(RetryingServerStreamingCallable.java:87)
  at com.google.api.gax.tracing.TracedServerStreamingCallable.call(TracedServerStreamingCallable.java:76)
  at com.google.api.gax.rpc.ServerStreamingCallable$1.call(ServerStreamingCallable.java:237)
  at com.google.cloud.spanner.spi.v1.GapicSpannerRpc.executeQuery(GapicSpannerRpc.java:1506)
  at com.google.cloud.spanner.AbstractReadContext$1.startStream(AbstractReadContext.java:667)
  at com.google.cloud.spanner.AbstractResultSet$ResumableStreamIterator.computeNext(AbstractResultSet.java:1105)
  at com.google.cloud.spanner.AbstractResultSet$ResumableStreamIterator.computeNext(AbstractResultSet.java:986)
  at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
  at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
  at com.google.cloud.spanner.AbstractResultSet$GrpcValueIterator.ensureReady(AbstractResultSet.java:269)
  at com.google.cloud.spanner.AbstractResultSet$GrpcValueIterator.getMetadata(AbstractResultSet.java:245)
  at com.google.cloud.spanner.AbstractResultSet$GrpcResultSet.next(AbstractResultSet.java:119)
  at com.google.cloud.spanner.ForwardingResultSet.next(ForwardingResultSet.java:54)
  at com.google.cloud.spanner.SessionPool$AutoClosingReadContext$1.internalNext(SessionPool.java:272)
  at com.google.cloud.spanner.SessionPool$AutoClosingReadContext$1.next(SessionPool.java:252)
  at org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.getPartition(PartitionMetadataDao.java:110)
  at org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction.run(QueryChangeStreamAction.java:166)
  at org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn.processElement(ReadChangeStreamPartitionDoFn.java:234)
  at org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
  at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1063)
  at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:142)
  at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:661)
  at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:656)
  at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
  at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
  at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:179)
  at org.apache.beam.fn.harness.BeamFnDataReadRunner$Factory$$Lambda$242/882597230.accept(Unknown Source)
  at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
  at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:515)
  at org.apache.beam.fn.harness.FnHarness$$Lambda$108/1459016715.apply(Unknown Source)
  at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
  at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
  at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver$$Lambda$117/539047905.run(Unknown Source)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

Issue Priority

Priority: 1

Issue Component

Component: io-java-gcp

ggprod commented 2 years ago

I noticed in the logs for the 2 days the template is running correctly it is performing the DetectNewPartitionsDoFn which executes the DetectNewPartitionsAction which logs this very frequently (on average once or more every second)... but then it suddenly stops (suggested perhaps thread deadlock).

ggprod commented 2 years ago

I also noticed this abrupt stop in the logging of this happens about 1m after another error is logged by the worker related to OpenCensus (though not sure if they are related or it is coincidental)

java.lang.NullPointerException
    at io.opencensus.implcore.stats.MeasureToViewMap.record(MeasureToViewMap.java:153)
    at io.opencensus.implcore.stats.StatsManager$StatsEvent.process(StatsManager.java:101)
    at io.opencensus.impl.internal.DisruptorEventQueue$DisruptorEventHandler.onEvent(DisruptorEventQueue.java:229)
    at io.opencensus.impl.internal.DisruptorEventQueue$DisruptorEventHandler.onEvent(DisruptorEventQueue.java:222)
    at com.lmax.disruptor.BatchEventProcessor.processEvents(BatchEventProcessor.java:168)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:125)
    at java.lang.Thread.run(Thread.java:750)
ggprod commented 2 years ago

I believe I incorrectly set the priority though.. could be a P2 or P3 as the template does run correctly for some time and can be restarted when this problem is detected (with a change stream start-time starting in the past so no change data is lost)

johnjcasey commented 2 years ago

This working for several days, then failing, means that it will be hard to investigate. Can you raise a GCP support ticket with this information?

ggprod commented 2 years ago

@johnjcasey yes, I agree.. sure, will do

johnjcasey commented 2 years ago

changing issue priority now that this is a GCP support ticket. @ggprod please update this issue with any resolution based on that ticket

anip-patel-exa commented 10 months ago

@ggprod any root cause for this? I am also seeing the same issue.

ggprod commented 10 months ago

@anip-patel-exa I had logged a support issue (https://issuetracker.google.com/issues/244327728) but forgot to follow up and it was closed. You could provide your occurrence and perhaps it would get reeopened