spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.56k stars 513 forks source link

Refreshing SideInput for Streaming Pipelines #3521

Open syodage opened 3 years ago

syodage commented 3 years ago

When we are using side inputs with streaming pipelines, most of the use cases require this side inputs to be refreshed(re-calculate) over time. Scio doesn't have a nicer way to do this. Apache beam has this refreshable side input patterns define to work with both global and non-global windowing to address this easily.

However exact example code snippets work with neither DirectRunner nor DataflowRunner. The DirectRunner case side input doesn't output any data to the main pipeline code and with DataflowRunner it throws this error[4].

This issue has been raised in the apache beam user mailing list[1][2][3] a few years ago and concluded with suggesting to address the use case with help of guava LoadingCache, which periodically updates the local cache. Which of course not the beam way of doing it.

Related Issues: https://github.com/spotify/scio/issues/3521 , https://github.com/spotify/scio/issues/3201 , https://github.com/spotify/scio/issues/1190 , https://github.com/spotify/scio/issues/2525

[1] https://lists.apache.org/thread.html/%3CB1660EAB-AEC8-4635-8386-8353685DB19A@gameduell.de [2] https://lists.apache.org/thread.html/a5d804685a5810594a7860709fbcd6d3a22ead6e871fc3073a65ef1e@%3Cuser.beam.apache.org%3E [3] https://lists.apache.org/thread.html/681de1ae372951988a00b9affa7480f3117d3cae6dae9ee2c69baba4@%3Cuser.beam.apache.org%3E

[4]

2020-12-07 23:34:17.483 ESTError message from worker: java.lang.RuntimeException: Exception while fetching side input: org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:217) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:303) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:665) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:728) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:137) 

org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:53) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:312) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:236) 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: 
java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871) 
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:196) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:303) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:665) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:728) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:137) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:53) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:312) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:236) 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. 
org.apache.beam.sdk.values.PCollectionViews$SingletonViewFn.apply(PCollectionViews.java:451) 
org.apache.beam.sdk.values.PCollectionViews$SingletonViewFn.apply(PCollectionViews.java:379) 
org.apache.beam.runners.dataflow.worker.StateFetcher.lambda$fetchSideInput$2(StateFetcher.java:177) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4876) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871) 
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:196) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:303) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:665) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:728) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:137) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:53) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:312) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:236) 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834)
nullobject commented 3 years ago

@syodage Did you figure out a workaround for this? I am facing the exact same problem.

lurecas commented 3 years ago

Hey! Bumping this issue, since I've stumbled upon the same thing

Thanks for your thorough report @syodage

brunsgaard commented 2 years ago

@syodage any update on this issue