spring-attic / jdbc

Apache License 2.0
11 stars 18 forks source link

JDBC Source -> JDBC Sink binding errors #54

Closed sduvan closed 4 years ago

sduvan commented 4 years ago

I am trying to establish a JDBC (Postgres) "source" --> Snowflake (JDBC compliant) "sink" pipeline.

In order to integrate Snowflake I needed to patch the OOB JDBC Sink app according to the guide https://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.SR6/reference/htmlsingle/#_patching_pre_built_applications Which I did and registered on my local SCDF stack. (Simply embedded the snowflake JDBC driver dependency)

However I do have "binding errors" at the Sink side of this simple equation.

2020-06-22 18:59:29.077  INFO 15850 --- [           main] c.u.p.s.s.SnowflakeSinkApplication       : Started SnowflakeSinkApplication in 11.592 seconds (JVM running for 12.385)
2020-06-22 19:02:09.646  INFO 15850 --- [container-0-C-1] o.s.c.s.a.j.sink.JdbcSinkConfiguration   : Could not find value for column 'id': EL1008E: Property or field 'id' cannot be found on object of type 'byte[]' - maybe not public or not valid?
2020-06-22 19:02:09.646  INFO 15850 --- [container-0-C-1] o.s.c.s.a.j.sink.JdbcSinkConfiguration   : Could not find value for column 'first_name': EL1008E: Property or field 'first_name' cannot be found on object of type 'byte[]' - maybe not public or not valid?
2020-06-22 19:02:09.646  INFO 15850 --- [container-0-C-1] o.s.c.s.a.j.sink.JdbcSinkConfiguration   : Could not find value for column 'last_name': EL1008E: Property or field 'last_name' cannot be found on object of type 'byte[]' - maybe not public or not valid?

Here is my stream definition:

source: jdbc --trigger.time-unit=SECONDS --spring.datasource.username='******' --trigger.fixed-delay=1 --spring.datasource.url='******' --jdbc.query='SELECT * FROM public.source_table_1 WHERE active=false' --jdbc.update='UPDATE public.source_table_1 SET active=true WHERE id in (:id)' --spring.cloud.stream.bindings.output.contentType=application/json --spring.datasource.password='******' | snowflake-sink --jdbc.columns=id,first_name,last_name --spring.datasource.driver-class-name=net.snowflake.client.jdbc.SnowflakeDriver --spring.cloud.stream.bindings.input.contentType=application/json --jdbc.table-name=target_table_1

Changing the jdbc.columns as: --jdbc.columns='id=new String(payload).id,first_name=new String(payload).first_name....' did not help either.

My target table at SnowFlake: (which I manually created for now)

CREATE TABLE target_table_1 ( id bigint NOT NULL, first_name varchar, last_name varchar, primary key (id) )

Source table DDL is the same. Dataflow version = 2.5.1, Skipper version=2.4.1

Any help/pointers are appreciated.

sduvan commented 4 years ago

Replying to myself might also be useful for the others in the universe:

Added contentType property for both source and sink. Also declared the jdbc.columns property as shown below (as opposed to the ref. guide - https://github.com/spring-cloud-stream-app-starters/jdbc/tree/master/spring-cloud-starter-stream-sink-jdbc

@Bean

    public StreamApplication jdbcSource() {

        return new StreamApplication("source: jdbc")
                .addProperties(Map.of("jdbc.query", "'SELECT * FROM public.source_table_1 WHERE imported=false'"))
                .addProperties(Map.of("jdbc.update", "'UPDATE public.source_table_1 SET imported=true WHERE id in (:id)'"))
                .addProperties(Map.of("spring.datasource.username", "postgres"))
                .addProperties(Map.of("spring.datasource.password", "mysecretpassword"))
                .addProperties(Map.of("spring.datasource.url", "'jdbc:postgresql://localhost:5432/content_types'"))
                .addProperties(Map.of("trigger.time-unit", "SECONDS"))
                .addProperties(Map.of("trigger.fixed-delay", "1"))
                .addProperties(Map.of("spring.cloud.stream.bindings.output.contentType", "application/json"));
    }

    @Bean
    public StreamApplication sfSink() {

        return new StreamApplication("snowflake-sink")
                .addProperties(Map.of("jdbc.table-name", "target_table_1"))
                .addProperties(Map.of("jdbc.initialize", "true"))
                .addProperties(Map.of("spring.cloud.stream.bindings.input.contentType", "application/json"))
                .addProperties(Map.of("jdbc.columns", "'id:new String(payload).id,first_name:new String(payload).first_name,last_name:new String(payload).last_name'"))
                .addProperties(Map.of("spring.datasource.driver-class-name", "net.snowflake.client.jdbc.SnowflakeDriver"));
    }