GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 323 forks source link

java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider #650

Closed davidscottturner closed 4 years ago

davidscottturner commented 4 years ago

Hi,

I am fairly new to Dataflow and tried to upload a streaming job template. I am having problems with executing a dataflow template with RuntimeValueProvider.

NOTE - This all worked with hardcoded values as a test

My code looks as follows:

Options:

public interface OrderPipelineOptions : PipelineOptions, StreamingOptions {
    @get:Description("Kafka Bootstrap Servers")
    @get:Default.String("127.0.0.1:9092")
    var bootstrapServers: ValueProvider<String>

    @get:Description("Kafka Topic")
    @get:Default.String("order")
    var inputTopic: ValueProvider<String>

    @get:Description("Kafka Group Id")
    @get:Default.String("local")
    var consumerGroupId: ValueProvider<String>
}

Job:

object OrderKafkaToElasticsearchPipeline {
   @JvmStatic
    fun main(args: Array<String>) {
        val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OrderPipelineOptions::class.java)
        var pipeline = Pipeline.create(options)
        pipeline
                /*
                    * Step #1: Read messages in from Kafka
                */
                .apply("Read from Kafka",
                        KafkaIO.read<String, String>()
                                .withBootstrapServers(options.bootstrapServers.get())
                                .withTopic(options.inputTopic.get())
                                .withKeyDeserializer(StringDeserializer::class.java)
                                .withValueDeserializer(StringDeserializer::class.java)
                                .commitOffsetsInFinalize()
                                .withoutMetadata())
                /*
                    * Step #2: Get the values from the stream after 1 second
                */
                .apply("Get Order Events from stream", Values.create<String>())
                .apply("Wait a bit", Window.into<String>(FixedWindows.of(org.joda.time.Duration.standardSeconds(1))))
                .apply("Deserialize", ParDo.of(ExtractOrderDataFn()))
                .apply("Serialize", ParDo.of(ExtractJsonFromOrderEventFn()))
                .apply(
                        "Send to elastic search",
                        ElasticsearchIO.write()
                                .withIdFn(ElasticsearchIO.Write.FieldValueExtractFn { input ->
                                    input.path("id").asText()
                                })
                                .withConnectionConfiguration(
                                        ElasticsearchIO.ConnectionConfiguration.create(arrayOf("<elastichost>"), "test", "doc")
                                                .withUsername("<user>")
                                                .withPassword("<password>")
                                )
                )
            pipeline.run()//.waitUntilFinish()
    }

This seems correct according to the Dataflow documentation. Im also trying Kotlin for this. This all ran with hard coded values before so I know the execution code works fine.

I then build the project and execute it with Gradle:

build.gradle.kts:

plugins {
    // Apply the Kotlin JVM plugin to add support for Kotlin.
    kotlin("jvm") version "1.3.70"
    java
    id("com.github.johnrengelman.shadow") version "5.2.0"
}

repositories {
    // Use jcenter for resolving dependencies.
    // You can declare any Maven/Ivy/file repository here.
    jcenter()
}

java {
    sourceCompatibility = JavaVersion.VERSION_1_8
    targetCompatibility = JavaVersion.VERSION_1_8
}

dependencies {

    implementation(platform(kotlin("bom")))
    implementation(kotlin("stdlib-jdk8"))
    implementation("org.slf4j:slf4j-simple:1.7.30")
    implementation("org.apache.beam:beam-sdks-java-core:2.19.0")
    implementation("org.apache.beam:beam-runners-direct-java:2.19.0")
    implementation("org.apache.beam:beam-runners-google-cloud-dataflow-java:2.19.0")
    implementation("org.apache.beam:beam-sdks-java-io-kafka:2.19.0")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.10.3")
    implementation("org.apache.beam:beam-sdks-java-io-elasticsearch:2.19.0")
    implementation("org.apache.kafka:kafka-clients:2.4.1")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.10.3")
    implementation("org.apache.beam:beam-sdks-java-io-elasticsearch:2.19.0")
    implementation("org.apache.commons:commons-lang3:3.9")
    implementation("commons-cli:commons-cli:1.4")
}

tasks.withType<ShadowJar>() {
    manifest {
        attributes["Main-Class"] = "com.example.OrderKafkaToElasticsearchPipeline"
    }
    mergeServiceFiles()
}

tasks.register("execute", JavaExec::class.java) {
    main = if (!project.hasProperty("mainClass")) System.getProperty("mainClass") else "NULL"
    classpath(sourceSets["main"].runtimeClasspath)
    args(System.getProperty("exec.args").split("\\s".toRegex()))
}

I then use gradle clean build followed by:

gradle execute -DmainClass="com.example.orders.OrderKafkaToElasticsearchPipeline" -Dexec.args="--runner=DataflowRunner --templateLocation=gs:/<PATH>/OrderKafkaToElasticsearchPipeline --project=<PROJECT> --tempLocation=gs://<PATH>/temp" -Pdataflow-runner --info

Before trying to use the RuntimeValueProvider this all worked fine and the template was uploaded. Now that Im trying to get the Kafka configuration passed in I get the following error:

Exception in thread "main" java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=bootstrapServers, default=127.0.0.1:9092}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:250)
        at com.imburse.beams.orders.OrderKafkaToElasticsearchPipeline.main(OrderKafkaToElasticsearchPipeline.kt:99)

Any help or suggestions would be very welcome as to where I am going wrong.

lukecwik commented 4 years ago

Calling #get on the ValueProvider resolves the value so if you do this at pipeline construction time as your doing above you get the errors.

Only transforms / IO connectors where the public API of takes in ValueProvider support those parameters to be used within a template.

See https://stackoverflow.com/questions/43992120/valueprovider-issue for more details.

davidscottturner commented 4 years ago

Thanks for the answer and see you closed it. I guess your answer is fairly unhelpful due to the fact the KafkaIO Im using is the Apache Beam KafkaIO (https://github.com/apache/beam/tree/master/sdks/java/io/kafka). Are you saying that because its API doesn't support a ValueProvider I am unable to overcome this issue? Do I log an issue there?

lukecwik commented 4 years ago

https://issues.apache.org/jira/browse/BEAM-3925 already exists.

You can not overcome this issue unless you implement support for KafkaIO to use ValueProviders.

davidscottturner commented 4 years ago

Thanks Luke, appreciate the help on this.