tosun-si / asgarde

Asgarde allows simplifying error handling with Apache Beam Java, with less code, more concise and expressive code.
MIT License
73 stars 5 forks source link

asgarde doesn't support DoFn "simple" @ProcessContext annotated method #9

Open Riduidel opened 5 months ago

Riduidel commented 5 months ago

If asgard allows usage of DoFn with a processElement(ProcessContext ctx) method, it doesn't support the versioon where the annotated @ProcessElement method directly uses input type (see as an example Creating a oFn).

I'm currently searching for a workaround ...

Riduidel commented 5 months ago

Code is invoked through SimpleDoFnRunner.invokeProcessElement(...) and a generated inner class called MyClass$DoFnInvoker.invokeProcessElement(...). Is it possible to add a kind of "interceptor" to this dynamic invocation?

tosun-si commented 1 month ago

Hello @Riduidel

Really sorry for the last answer, I missed your message.

I hope you solved your issue.

With Asgarde, it also possible to have a total controle of a DoFn class.

We need to create a class that extends BaseElementFn, it's also indicated in the Readme file :

import fr.groupbees.asgarde.Failure;

final PCollection<WordStats> resCustomDoFn=CollectionComposer.of(input)
        .apply("PaDo",new WordStatsFn(sideInputs),Collections.singleton(sideInputs))
        .getResult()
        .output();

// Custom DoFn class.
public class WordStatsFn extends BaseElementFn<String, WordStats> {

    private PCollectionView<String> sideInputs;

    public WordStatsFn(final PCollectionView<String> sideInputs) {
        // Do not forget to call this!
        super();

        this.sideInputs = sideInputs
    }

    @ProcessElement
    public void processElement(ProcessContext ctx) {
        try {
            ctx.output(toWordStats(sideInputs, ctx));
        } catch (Throwable throwable) {
            final Failure failure = Failure.from("step", ctx.element(), throwable);
            ctx.output(failuresTag, failure);
        }
    }
}

In this case, you can apply your own logic in the processElement method and specify all the needed input types.

Please keep me informed if it can help you to solve your issue.