apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.69k stars 4.2k forks source link

[Bug]: Unable to infer a coder for JdbcIO.read() transform when using a custom class with @DefaultCoder annotation #26003

Open j1cs opened 1 year ago

j1cs commented 1 year ago

What happened?

I am using Apache Beam's JdbcIO to read data from a PostgreSQL database and log the retrieved data. I'm using a custom UserData class and have applied the @DefaultCoder(AvroCoder.class) annotation to it. However, I am still encountering an error related to the coder.

Error message:

java.lang.IllegalStateException: Unable to infer a coder for JdbcIO.read() transform.

Here's the UserData class:

package me.jics;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@Jacksonized
@Builder
@Getter
@Value
@EqualsAndHashCode
@ToString
@DefaultCoder(AvroCoder.class)
public class UserData {
    String name;
    String lastname;
}

And here's the main part of the pipeline in the AppCommand class:

public void run() {
    AppOptions options = PipelineOptionsFactory
            .fromArgs("--runner=DirectRunner")
            .as(AppOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("Selecting", JdbcIO.<UserData>read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                            "org.postgresql.Driver",
                            "jdbc:postgres://localhost:5432/mydb")
                    .withUsername("myuser")
                    .withPassword("mypassword"))
                    .withQuery("select name, lastname from user where name = 'john'")
            .withRowMapper(resultSet -> UserData.builder()
                    .name(resultSet.getString(1))
                    .lastname(resultSet.getString(2))
                    .build())
            .withOutputParallelization(false)
    )
            .apply("nex step", MapElements.via(new SimpleFunction<UserData, Integer>() {
                @Override
                public Integer apply(UserData input) {
                    log.info(input.toString());
                    return 1;
                }
            }));

    p.run();
}

I would appreciate any guidance on the recommended way to set the coder for JdbcIO in this scenario, considering I've already applied the @DefaultCoder annotation to my custom UserData class.

here sample project: https://github.com/j1cs/coder-error-beam mail where the discussion started: https://lists.apache.org/thread/7wxmr3s7vcrll3mvb07rmj5cmco4wtn8

Issue Priority

Priority: 3 (minor)

Issue Components

Abacn commented 1 year ago

Could you please provide the full traceback? Also, have you tried JdbcIO.readRows? This would give a Beam row and do not need custom class for row.

j1cs commented 1 year ago

@Abacn

java.lang.IllegalStateException: Unable to infer a coder for JdbcIO.readAll() transform. Provide a coder via withCoder, or ensure that one can be inferred from the provided RowMapper.
        at org.apache.beam.sdk.util.Preconditions.checkStateNotNull(Preconditions.java:471)
        at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadAll.expand(JdbcIO.java:1094)
        at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadAll.expand(JdbcIO.java:941)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:360)
        at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:920)
        at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:783)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
        at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
        at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:192)
        at me.jics.AppCommand.run(AppCommand.java:31)
        at picocli.CommandLine.executeUserObject(CommandLine.java:1939)
        at picocli.CommandLine.access$1300(CommandLine.java:145)
        at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2358)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2352)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2314)
        at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
        at picocli.CommandLine$RunLast.execute(CommandLine.java:2316)
        at picocli.CommandLine.execute(CommandLine.java:2078)
        at io.micronaut.configuration.picocli.PicocliRunner.execute(PicocliRunner.java:202)
        at io.micronaut.configuration.picocli.PicocliRunner.execute(PicocliRunner.java:179)
        at me.jics.AppCommand.main(AppCommand.java:22)

let me try readRows.

j1cs commented 1 year ago

I created a branch and gave me:

java.lang.IllegalStateException: Unable to return a default Coder for next step/Map/ParMultiDo(Anonymous).output [PCollection@801996095]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for java.lang.Object.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
  Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
        at org.apache.beam.sdk.util.Preconditions.checkStateNotNull(Preconditions.java:471)
        at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:284)
        at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:115)
        at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:226)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:212)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
        at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:598)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
        at me.jics.AppCommand.run(AppCommand.java:48)
        at picocli.CommandLine.executeUserObject(CommandLine.java:1939)
        at picocli.CommandLine.access$1300(CommandLine.java:145)
        at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2358)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2352)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2314)
        at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
        at picocli.CommandLine$RunLast.execute(CommandLine.java:2316)
        at picocli.CommandLine.execute(CommandLine.java:2078)
        at io.micronaut.configuration.picocli.PicocliRunner.execute(PicocliRunner.java:202)
        at io.micronaut.configuration.picocli.PicocliRunner.execute(PicocliRunner.java:179)
        at me.jics.AppCommand.main(AppCommand.java:23)

My code:

public class AppCommand implements Runnable {

    public static void main(String[] args) throws Exception {
        int code = PicocliRunner.execute(AppCommand.class, args);
        System.exit(code);
    }

    public void run() {
        AppOptions options = PipelineOptionsFactory
                .fromArgs("--runner=DirectRunner")
                .as(AppOptions.class);
        Pipeline p = Pipeline.create(options);
        p.apply("Selecting", JdbcIO.readRows()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                                "org.postgresql.Driver",
                                "jdbc:postgresql://localhost:5432/main")
                        .withUsername("myuser")
                        .withPassword("mypassword"))
                        .withQuery("select first_name, last_name from person where first_name = 'john'")
                .withOutputParallelization(false)
        )
                .apply("next step", MapElements.via(new SimpleFunction<Row, Object>() {
                    @Override
                    public Object apply(Row input) {
                        return super.apply(input);
                    }
                }));

        p.run();
    }
}

Edit: I'm not that familiarized with apache beam but i don't know how i can simple print the Row without pass another object to the next pipe.

j1cs commented 1 year ago

I was playing around

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;

@DefaultSchema(JavaBeanSchema.class)
@DefaultCoder(AvroCoder.class)
public class UserData {
    private String name;
    private String lastname;

    public UserData(String name, String lastname) {
        this.name = name;
        this.lastname = lastname;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getLastname() {
        return lastname;
    }

    public void setLastname(String lastname) {
        this.lastname = lastname;
    }
}

and now i triggered this:

java.lang.IllegalArgumentException: unable to serialize SchemaCoder<Schema: Fields:
Field{name=name, description=, type=STRING NOT NULL, options={{}}}
Field{name=lastname, description=, type=STRING NOT NULL, options={{}}}
Encoding positions:
{name=0, lastname=1}
Options:{{}}UUID: 88e1e516-b9be-4fe0-8984-1d861baf55a2  UUID: 88e1e516-b9be-4fe0-8984-1d861baf55a2 delegateCoder: org.apache.beam.sdk.coders.Coder$ByteBuddy$mXw5Syaw@6537ac
        at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:59)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:133)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:102)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:284)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:35)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:239)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.translateAppliedPTransform(PTransformTranslation.java:492)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation$ParDoTranslator.translate(ParDoTranslation.java:186)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:248)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:799)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:814)
        at org.apache.beam.repackaged.direct_java.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:274)
        at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:290)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
        at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:268)
        at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:218)
        at org.apache.beam.runners.direct.DirectRunner.performRewrites(DirectRunner.java:254)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:175)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
        at me.jics.AppCommand.run(AppCommand.java:49)
        at picocli.CommandLine.executeUserObject(CommandLine.java:1939)
        at picocli.CommandLine.access$1300(CommandLine.java:145)
        at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2358)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2352)
        at picocli.CommandLine$RunLast.handle(CommandLine.java:2314)
        at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
        at picocli.CommandLine$RunLast.execute(CommandLine.java:2316)
        at picocli.CommandLine.execute(CommandLine.java:2078)
        at io.micronaut.configuration.picocli.PicocliRunner.execute(PicocliRunner.java:202)
        at io.micronaut.configuration.picocli.PicocliRunner.execute(PicocliRunner.java:179)
        at me.jics.AppCommand.main(AppCommand.java:23)
Caused by: java.io.NotSerializableException: me.jics.AppCommand
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
        at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
        ... 38 more

if anyone can help me to get the right result i'll appreciate it. thanks

j1cs commented 1 year ago

works fine with row:

    public void run() {
        AppOptions options = PipelineOptionsFactory
                .fromArgs("--runner=DirectRunner")
                .as(AppOptions.class);
        Pipeline p = Pipeline.create(options);
        p.apply("Selecting", JdbcIO.readRows()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                                "org.postgresql.Driver",
                                "jdbc:postgresql://localhost:5432/main")
                        .withUsername("myuser")
                        .withPassword("mypassword"))
                        .withQuery("select first_name, last_name from person where first_name = 'john'")
                .withOutputParallelization(false)
        )
        .apply("next step", MapElements.via(new RowToStringFunction()));

        p.run();
    }
    @Slf4j
    static class RowToStringFunction extends SimpleFunction<Row, Integer> {
        @Override
        public Integer apply(Row input) {
            log.info("Row value: {}", input.getValue(1).toString());
            return 1;
        }
    }   
18:14:47.650 [main] INFO  i.m.context.env.DefaultEnvironment - Established active environments: [cli]
18:14:50.355 [direct-runner-worker] INFO  org.apache.beam.sdk.io.jdbc.JdbcIO - Autocommit has been disabled
18:14:50.513 [direct-runner-worker] INFO  m.j.AppCommand$RowToStringFunction - Row value: doe