apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

[Bug]: Uber jar not register coders causing `UnknownCoderWrapper` error #30994

Open Abacn opened 7 months ago

Abacn commented 7 months ago

What happened?

Minimum reproduce:

A minimum pipeline:

PipelineOptions options = PipelineOptionsFactory.fromArgs(argv).create();
Pipeline p = Pipeline.create(options);
p.apply(Create.of(1, 2, 3, 4, 5));
p.run().waitUntilFinish();

Gradle build file:

buildscript {
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
    dependencies {
        classpath "gradle.plugin.com.github.johnrengelman:shadow:7.1.2"
    }
}

plugins {
    id 'com.github.johnrengelman.shadow' version '7.1.2'
    id 'application'
}

apply plugin : "java"
ext {
    javaMainClass = "com.github.abacn.MinimumPipeline"
}

application {
    mainClassName = javaMainClass
}

def beam_version = "2.55.0"

repositories {
    mavenCentral()
    // to publish to local maven repository, in the beam repo, run sth like
    // ./gradlew :sdks:java:io:google-cloud-platform:PublishToMavenLocal  -Ppublishing
    mavenLocal()
}

dependencies {
    implementation(group: 'org.apache.beam', name: 'beam-sdks-java-core', version: "$beam_version")
    implementation(group: 'org.apache.beam', name: 'beam-runners-direct-java', version: "$beam_version")
    implementation(group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: "$beam_version")
}

shadowJar {
    zip64 true
}

group = 'com.github.abacn'
version = '1.0'
description = 'beam-demo'
java.sourceCompatibility = JavaVersion.VERSION_1_8

And build the uber jar with command ./gradlew :beamtest:shadowJar. Submit the job to Dataflow with

./gradlew :beamtest:run --args='--project=<...> --region=us-central1 \
--tempLocation=gs://<...>/tmp --runner=DataflowRunner \
--filesToStage=/<...>/build/libs/beamtest-1.0-all.jar'

The job fails with error

"org.apache.beam.sdk.coders.CoderException: `UnknownCoderWrapper` was used to perform an actual decoding in the Java SDK. Potentially a Java transform is being followed by a cross-language transform thatuses a coder that is not available in the Java SDK. Please make sure that Python transforms at the multi-language boundary use Beam portable coders.
    at org.apache.beam.sdk.util.construction.UnknownCoderWrapper.decode(UnknownCoderWrapper.java:55)
    at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
    at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

The same pipeline succeeded in Beam 2.54.0, 2.54.0 under Dataflow runner v2

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

chamikaramj commented 7 months ago

I don't think this is necessarily a bug but a limitation due the uber jar not properly registering coders. I tried a job with more logs and seems like we are simply not able to find a translator for the URN "beam:coder:length_prefix:v1" (and UnknownCoderWrapper ends up being the fallback).

We register LengthPrefixCoder here: https://github.com/apache/beam/blob/6bca71070e96b56b781600e8833a72cea329b1a1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java#L48

So seems like this registration is not being performed for the uber-jar.

I'm not getting the error when I stage the beam-sdks-java-core jar (which performs the above registration) along with the uber jar.

--filesToStage=./build/libs/unknown_coder_error-1.0-all.jar,beam-sdks-java-core-2.56.0-SNAPSHOT.jar

I'm not sure if this is necessarily a release blocker.

robertwb commented 7 months ago

Presumably this used to work and doesn't now? Or do we need better instructions on creating an uberjar that correctly preserves all the registration information?

liferoad commented 7 months ago

Please create a doc about how to build an uber jar. We have been getting couple of customer issues related to this.

Abacn commented 7 months ago

I see, so

Having the understanding that the underlying issue always exist, and the action item is more like a documentation request (proper way to packaging uber jar), I agree this is isn't a release blocker. Adjust the priority tag accordingly

robertwb commented 7 months ago

Long term, should we try to move away from autoservice for built in components (at least if standard uberjar building tools do not do the right thing with them)? Is this more possible now with the merging of runners core? @kennknowles

chamikaramj commented 7 months ago

Presumably this used to work and doesn't now? Or do we need better instructions on creating an uberjar that correctly preserves all the registration information?

Yeah, I'm not sure what resulted in the regression. Might be the core-construction merge (but I haven't verified). +1 for updating instructions as a workaround while we figure out the root cause.

robertwb commented 7 months ago

I created https://github.com/apache/beam/pull/31042 which should give a clearer error. Perhaps that is worth cherry-picking.