GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 325 forks source link

Cannot launch template with repeated parameters #632

Open dadrian opened 6 years ago

dadrian commented 6 years ago

It is not possible to launch a templated dataflow that accepts an option in the form of ValueProvider<List<String>>, despite support for Lists within the ValueProvider class. I'm not sure if this is an SDK or a platform issue.

ValueProvider supports List<String> for accepting repeated options, or options in the form of a list.

public interface MyOptions {
    @Description("A repeated option, can also be passed in as a list")
    ValueProvider<List<String>> getRepeated();
    void setRepeated(ValueProvider<List<String>> 
}

This accepts command-line arguments in the form --repeated=a,b,c, and --repeated=a --repeated=b --repeated=c. Both yield a list of the form ["a", "b", "c"].

However, if I try to launch a template that uses a ValueProvider<List<String>> using the gcloud tool, the value for --repeated is always encoded as a single-string and I always get a runtime JSON deserialization exception (got string, expected array). If I try manually hitting the API using the Python API client and explicit passing a JSON array, the API kicks back "Invalid value at 'launch_parameters.parameters[0].value' (Map), Cannot have repeated items ('repeated') within a map.

Traceback from command-line launch below:

aused by: java.lang.RuntimeException: Unable to parse representation
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:503)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:154)
    at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:242)
    at com.example.MyDoFn.startBundle(MyDoFn.java:31)
    at com.example.MyDoFn$DoFnInvoker.invokeStartBundle(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.startBundle(SimpleDoFnRunner.java:127)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300)
    at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:310)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:67)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:361)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:333)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:264)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:133)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:113)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:100)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.util.ArrayList` out of VALUE_STRING token
 at [Source: (String)""a""; line: 1, column: 1]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092)
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.handleNonArray(StringCollectionDeserializer.java:266)
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:179)
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:169)
    at com.fasterxml.jackson.databind.deser.std.StringCollectionDeserializer.deserialize(StringCollectionDeserializer.java:21)
    at org.apache.beam.sdk.options.ValueProvider$Deserializer.deserialize(ValueProvider.java:334)
    at org.apache.beam.sdk.options.ValueProvider$Deserializer.deserialize(ValueProvider.java:299)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3030)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getValueFromJson(ProxyInvocationHandler.java:501)
    ... 22 more
lllewellyn commented 5 years ago

I can confirm this is still happening with Apache Beam 2.9.0. My Dataflow job works fine if pushed and run manually with Java/Maven, but if I create a template from the job and run it, I see a similar exception.

lllewellyn commented 5 years ago

I experimented with some JSON configuration options, to no avail. None of these worked. Using jackson-databind 2.9.5.

@JsonFormat(shape = JsonFormat.Shape.ARRAY)
ValueProvider<List<String>> getRepeated();
void setRepeated(ValueProvider<List<String>> 

@JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
ValueProvider<List<String>> getRepeated();
void setRepeated(ValueProvider<List<String>> 

@JsonFormat(shape = JsonFormat.Shape.ARRAY, with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
ValueProvider<List<String>> getRepeated();
void setRepeated(ValueProvider<List<String>> 
jklukas commented 5 years ago

Seeing this with Beam 2.12 as well.

connorlwilkes commented 4 years ago

Is this being tracked as a Jira item?