GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 325 forks source link

Dataflow combine globally default coder #630

Closed lboudard closed 6 years ago

lboudard commented 6 years ago

Hi,

I'm trying to produce a simple custom combiner that aggregates strings into a set https://beam.apache.org/documentation/programming-guide/#combine

` public class CombineSetFn extends Combine.CombineFn<String, CombineSetFn.Accum, Set> { @DefaultCoder(SetCoder.class) public static class Accum { Set stringSet; public Accum () { stringSet = new HashSet(); } }

@Override
public Accum createAccumulator() { return new Accum(); }

@Override
public Accum addInput(Accum accum, String input) {
    accum.stringSet.add(input);
    return accum;
}

@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
    Accum merged = createAccumulator();
    for (Accum accum : accums) {
        merged.stringSet.addAll(accum.stringSet);
    }
    return merged;
}

@Override
public Set<String> extractOutput(Accum accum) {
    return accum.stringSet;
}

} `

` PCollection firstNamesDS = p.apply(TextIO.read().from( "gs://" + options.getBucket() + options.getFirstNamesFile()));

final PCollection<Set> firstNames = firstNamesDS.apply( Combine.globally(new CombineSetFn())); `

While initializing pipleline's transformation, I have the following exception java.lang.IllegalStateException: Unable to return a default Coder for Combine.globally(CombineSet)/Combine.perKey(CombineSet)/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to provide a Coder for K. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called. at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:259) at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:107) at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:190) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:536) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472) at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286) at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1158) at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1025)

I've tried to simplify to given example, AverageFn, though I still have the same issue, so I guess that the issue doesn't come from Combiner's input, output and accumulator not being serializable.

Could you help on this issue?

Thanks!

lboudard commented 6 years ago

Okay, found the solution to this. There was indeed a specific coder to implement whenever implementing a new accumulator class, that wasn't documented. I could find an example here https://github.com/apache/beam/blob/v2.4.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java

ricktjwong commented 5 years ago

Hi @lboudard, I'm struggling with this issue too, can I see your code for combining strings?

My encoder works but only on Dataflow Runner, on direct runner it always throws the error "Forbidden IOException when reading from InputStream".

jmnavarro commented 5 years ago

@ricktjwong Assuming you're combining several strings into one single string, you can try this:

First you need your accumulator:

    public static class StringAccum implements Combine.AccumulatingCombineFn.Accumulator<String, StringAccum, String> {

        private List<String> strings = new ArrayList<>();

        public StringAccum() {
            super();
        }

        public StringAccum(String initialValue) {
            super();
            addInput(initialValue);
        }

        @Override
        public void addInput(String input) {
            strings.add(input);
        }

        @Override
        public void mergeAccumulator(StringAccum other) {
            strings.addAll(other.strings);
        }

        @Override
        public String extractOutput() {
            StringBuffer buf = new StringBuffer();

            strings.forEach((String str) -> buf.append(str));

            return buf.toString();
        }
    }

Next you need the coder for that accumulator

    static class StringAccumCoder extends AtomicCoder<StringAccum> {

        private static final Coder<String> STRING_CODER = StringUtf8Coder.of();

        @Override
        public void encode(StringAccum value, OutputStream outStream) throws IOException {
            STRING_CODER.encode(value.extractOutput(), outStream);
        }

        @Override
        public StringAccum decode(InputStream inStream) throws IOException {
            return new StringAccum(STRING_CODER.decode(inStream));
        }
    }

And finally the combine function for that accumulator (and coder)

    public static class ConcatCombine extends Combine.AccumulatingCombineFn<String, StringAccum, String> {
        @Override
        public StringAccum createAccumulator() {
            return new StringAccum();
        }

        @Override
        public Coder<StringAccum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder) {
            return new StringAccumCoder();
        }
    }

Use it as usual, transforming a PCollection<String> with several strings to a new PCollection<String> of one single string.

PCollection<String> input = ...     
PCollection<String> output = input.apply(Combine.globally(new ConcatCombine()));

Notice this is a straightforward approach. I'm pretty sure this can be simplified using some SDK helper classes!