apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Bug]: When using beam.io.kinesis.ReadDataFromKinesis, a java error is raised in DataFlowRunner #27165

Open vishwesh0409 opened 1 year ago

vishwesh0409 commented 1 year ago

What happened?

"Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is null org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:887) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:824) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1788) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:142) org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2506) org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1034) org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:799) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.beam.sdk.io.kinesis.ShardReadersPool.getLatestRecordTimestamp()" because "this.shardReadersPool" is null org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:999) org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2415) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:540)

Issue Priority

Priority: 3 (minor)

Issue Components

benvit92 commented 1 year ago

Hello @vishwesh0409, what was the fix that was applied for this ? Tried to look for it but did not find it

vishwesh0409 commented 1 year ago

Hello @vishwesh0409, what was the fix that was applied for this ? Tried to look for it but did not find it

I'll reopen this. I thought it was fixed by #26953 but after updating the beam version to 2.49.0, I'm getting the same problem.

benvit92 commented 1 year ago

Hello @vishwesh0409, what was the fix that was applied for this ? Tried to look for it but did not find it

I'll reopen this. I thought it was fixed by #26953 but after updating the beam version to 2.49.0, I'm getting the same problem.

Yep, also tested this in the current master branch status and getting the same error when running a test job locally so def an issue IMO

benvit92 commented 1 year ago

also @vishwesh0409, https://github.com/apache/beam/pull/26953 was applied in the v2 of the aws kinesis reader and I think current implementation might still rely on v1 maybe (talking about the python code calling the right java package)

benvit92 commented 1 year ago

ok I am gonna structure my thought a little bit better here :)

So there are 2 KinesisReader.java around, one v1 and one v2, and as far as I can tell KinesisTransformRegistrar.java used to map the Python interface to the source Java package is relying on the v1 one.

https://github.com/apache/beam/pull/26953 fixed the issues in the KinesisReader.java v2 but not in the v1 and as a test I myself cloned master and applied the same fix on the v1 files and after that, the job runs without error.

I do encounter another issue though which is then when running on my laptop using the DirectRunner nothing happens when it comes to reading data even if I have data in the source stream and I am using the option InitialPositionInStream.TRIM_HORIZON, so the job runs but no data are read.

I also tried to make the KinesisTransformRegistrar.java point to v2 as the job does get deprecation warnings as v1 seems to be deprecated but my understanding of the project is not enough to make it work right now (plus my Java is a bit rusty 😓 ).

vishwesh0409 commented 1 year ago

Looks like the KinesisReader modules are not properly fixed yet. I observed the same deprication warnings when I tried to run my code on Google Cloud. It should be a simple enough fix tho.

benvit92 commented 1 year ago

Basically, there is a need to develop a new KinesisTransformRegistrar.java based on the aws2 kinesis package instead of the current one and expose a new python transform URN which would be v2, hopefully that should enable the python flow to work as expected with the latest modules

lostluck commented 1 year ago

2.50 release manager here. This issue is currently tagged for the 2.50.0 release, which cuts in a week on August 9th.

Please complete work and get it into the main branch in that time, or move this issue to the 2.51 Milestone: https://github.com/apache/beam/milestone/15

Abacn commented 1 year ago

this was due to issue get reopened and the milestone not removed

benvit92 commented 1 year ago

@Abacn @lostluck but then if an issue is open and neither who opened it nor who is participating in the discussion has the knowledge to fix it will the issue be forgotten or someone in the community will pick it up? Just trying to understand if there will be any resolution to this or if it will just stick around

lostluck commented 1 year ago

As an open source project, the primary means of fixes is via community and user involvement. As a rule, no one can expect someone else to fix an issue just because it's filed.

The story changes somewhat if judged to be a P1 or P0, a regression from the previous release version, or if it's affecting a large Dataflow customer, at which point it's likely that someone from the Dataflow team will drive it to resolution. That does require the customer to have a GCP support ticket in place as well.

P3s are typically not release blockers, so it's more likely this remains in the backlog until someone motivated to fix it it comes along. Beam welcomes contributions.

mmxgn commented 11 months ago

This issue seems to also affect FlinkRunner.

benvit92 commented 10 months ago

@mxm it should affect all runners since the issue is at source code level, the core issue I tried to explain in more details here https://github.com/apache/beam/issues/27165#issuecomment-1644183789

nahplay commented 9 months ago

The same issue on my side.