GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

Unable to create Aggregator on release-2.0.0-beta2 #553

Closed gowrishankarsundersc closed 7 years ago

gowrishankarsundersc commented 7 years ago

I am trying to create a custom aggregator to sum long values similar to Sum.SumLongFn() in Dataflow but I am getting the following error. I am also attaching the code I used to create the aggregator. This is on release Version 2.0.0-beta2 and I am running this pipeline on Google Dataflow.

Running the same with the DirectRunner does not throw this issue although I am not sure if the aggregator even takes effect when running with the DirectRunner


public class SumLongAggregator extends Combine.BinaryCombineLongFn {
    @Override
    public long apply(long left, long right) {
        return left + right;
    }

    @Override
    public long identity() {
        return 0;
    }
}

static class StatefulDoFn extends DoFn<KV<String, String>, String> {
    final Aggregator processedElements = createAggregator("processed", new SumLongAggregator());
    Aggregator skippedElements = createAggregator("skipped", new SumLongAggregator());

    @StateId("tracker")
    private final StateSpec<Object, ValueState<Integer>> trackerSpec =
            StateSpecs.value(VarIntCoder.of());

    @ProcessElement
    public void processElement(
            ProcessContext context,
            @StateId("tracker") ValueState<Integer> tracker) {
        processedElements.addValue(1l);
        int wasSeen = firstNonNull(tracker.read(), 0);
        final String id = context.element().getKey();
        if (wasSeen == 0) {
            tracker.write(1);
            context.output(context.element().getValue());
            logger.info("Writing id first time: {}", id);
        } else {
              skippedElements.addValue(1l);
        }
    }
}

2017-02-26 18:10:56 ERROR MonitoringUtil$LoggingHandler:97 - 2017-02-27T02:10:55.848Z: (d2440485b08b19a3): java.lang.IllegalArgumentException: Unsupported CombineFn: com.company.analytics.beam.dataflow.SumLongAggregator
    at com.google.cloud.dataflow.worker.counters.CounterAggregator.constructCounter(CounterAggregator.java:142)
    at com.google.cloud.dataflow.worker.counters.CounterAggregator.<init>(CounterAggregator.java:95)
    at com.google.cloud.dataflow.worker.counters.CounterAggregator$CounterAggregatorFactory.createAggregatorForDoFn(CounterAggregator.java:66)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.createAggregator(SimpleDoFnRunner.java:415)
    at org.apache.beam.sdk.transforms.DoFn$Context.setupDelegateAggregator(DoFn.java:249)
    at org.apache.beam.sdk.transforms.DoFn$Context.setupDelegateAggregators(DoFn.java:240)
    at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.<init>(SimpleDoFnRunner.java:280)
    at org.apache.beam.runners.core.SimpleDoFnRunner.<init>(SimpleDoFnRunner.java:140)
    at org.apache.beam.runners.core.DoFnRunners.simpleRunner(DoFnRunners.java:63)
    at com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:200)
    at com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:235)
    at com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
    at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
    at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:198)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:321)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:280)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:227)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:213)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:193)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:180)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
gowrishankarsundersc commented 7 years ago

I realised that Beam itself has this packaged under Sum transforms and so, tried using it like createAggregator("processed", Sum.ofLongs()); and it is actually work although I am not sure what was different from mine since I am also extending the BinaryCombineFn. Anyway, the following worked but would definitely like to know the delta which caused the error earlier.

davorbonaci commented 7 years ago

Thanks @gowrishankarsundersc. Indeed, as you observed, aggregators work with a set of pre-defined CombineFns only. Glad to see you were able to resolve the issue yourself!

Thanks and let us know if we can help in any way!