confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

Unable to read table with NUMERIC column #480

Open somasundaramsekar opened 6 years ago

somasundaramsekar commented 6 years ago

image

I'm trying to set-up source connector for the Adventureworks database in postgres, the table is described in the above image. The source configuration is given below. When the connector runs it is unable to process the column with Numeric value and skipping all the rows claiming bad value

Warn trace of skipped record

[2018-09-04 14:48:03,324] WARN Ignoring record due to SQL error: (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
org.postgresql.util.PSQLException: Bad value for type byte : 183.9382
    at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getByte(AbstractJdbc2ResultSet.java:2093)
    at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$columnConverterFor$18(GenericDatabaseDialect.java:1166)
    at io.confluent.connect.jdbc.source.SchemaMapping$FieldSetter.setField(SchemaMapping.java:160)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:176)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:297)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Connector configuration

{
    "name": "jdbc_source_sales.salesorderdetail",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "topics": "sales.salesorderdetail",
        "connection.url": "jdbc:postgresql://172.18.0.1/adventureworks?user=postgres&password=postgres",
        "mode": "timestamp+incrementing",
        "timestamp.column.name": "modifieddate",
        "incrementing.column.name": "salesorderid",
        "topic.prefix": "jdbc_source_sales_",
        "table.whitelist": "sales.salesorderdetail",
        "transforms": "CastUnitPrice, InsertKey, ExtractId, CastLong, AddNamespace",
        "transforms.InsertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.InsertKey.fields": "salesorderid",
        "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.ExtractId.field": "salesorderid",
        "transforms.CastLong.type": "org.apache.kafka.connect.transforms.Cast$Key",
        "transforms.CastLong.spec": "int64", 
        "transforms.AddNamespace.type": "de.smava.kafka.connect.transforms.Namespacefy",
        "transforms.AddNamespace.record.namespace": "com.company.data.vault20",
        "transforms.CastUnitPrice.type": "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.CastUnitPrice.spec": "unitprice:float64",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "numeric.mapping": "best_fit"

    }
}
avocader commented 6 years ago

Hi @somasundaramsekar ,

Looks like it is a bug in JDBC Source when it cannot handle columns of user-specified precision when user omitted precision and scale in a column definition. NUMERIC, DECIMAL are affected by this bug. The root cause - some JDBC drivers might return 0,0 as precision and scale for a column without explicit precision and scale and we are falling into this branch: https://github.com/confluentinc/kafka-connect-jdbc/blob/v5.0.0/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java#L1166. In order to resolve it, we should treat a column with precision and scale = 0 as a Java's Double and also treat is a Byte when precision is 1 only.

Thanks.

avocader commented 6 years ago

@somasundaramsekar As a workaround, you can define your NUMERIC columns with explicit precision and scale, e.g. NUMERIC(16,2).

GiannisDimitriou commented 5 years ago

Hi @avocader ,

I am currently experimenting how postgresql data types are converted when using the kafka-connect-jdbc connector and I see a slightly different behaviour regarding the NUMERIC data type.

In my test case NUMERIC is converted into int8 in kafka topic and hence losing the decimal part while NUMERIC(20,10) is converted into bytes probably because of this line: https://github.com/confluentinc/kafka-connect-jdbc/blob/3c19049fdf9af562b58efca5bfefbb31ba2801f5/src/main/java/io/confluent/connect/jdbc/dialect/GenericDatabaseDialect.java#L1157

NUMERIC(15,10) is converted to double as expected.

Do you know when we may have the fix you suggested for the NUMERIC(0,0)? Also why does numeric with precision >=19 cannot be converted to double as well?

Here is my table ddl and connector configuration:

Postgresql Table CREATE TABLE postgres_source2 ( id serial NOT NULL, updated_at timestamp NOT NULL DEFAULT timezone('UTC'::text, now()), string_test1 varchar NULL, string_test2 varchar(10) NULL, numeric_test1 numeric NULL, numeric_test2 numeric(20,10) NULL, int_test1 int4 NULL, int_test2 int2 NULL, int_test3 int8 NULL, int_test4 float4 NULL, int_test5 float8 NULL, numeric_test3 numeric(15,10) NULL, CONSTRAINT postgres_source2_pkey PRIMARY KEY (id) )

INSERT INTO postgres_source2 (string_test1,string_test2,NUMERIC_test1,NUMERIC_test2,int_test1,int_test2,int_test3,int_test4,int_test5,numeric_test3) VALUES ('test','test',123.1234567, 123.1234567, 1000000, 1,100000000000, 123.12345, 123.1234567,123.1234567)

{ "name": "Postgres-source_test", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "topic.prefix": "test-", "connection.url": "jdbc:postgresql://*****/****?user=****&password=****&preparedStatementCacheQueries=0", "table.whitelist": "postgres_source2", "numeric.mapping": "best_fit", "mode": "timestamp+incrementing", "incrementing.column.name": "id", "timestamp.column.name": "updated_at", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"false", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry-1:8081", "value.converter.schemas.enable":"true" } }

Schema { "type": "record", "name": "postgres_source2", "fields": [ { "name": "id", "type": "int" }, { "name": "updated_at", "type": { "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" } }, { "name": "string_test1", "type": [ "null", "string" ], "default": null }, { "name": "string_test2", "type": [ "null", "string" ], "default": null }, { "name": "numeric_test1", "type": [ "null", { "type": "int", "connect.type": "int8" } ], "default": null }, { "name": "numeric_test2", "type": [ "null", { "type": "bytes", "scale": 10, "precision": 64, "connect.version": 1, "connect.parameters": { "scale": "10" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } ], "default": null }, { "name": "int_test1", "type": [ "null", "int" ], "default": null }, { "name": "int_test2", "type": [ "null", { "type": "int", "connect.type": "int16" } ], "default": null }, { "name": "int_test3", "type": [ "null", "long" ], "default": null }, { "name": "int_test4", "type": [ "null", "float" ], "default": null }, { "name": "int_test5", "type": [ "null", "double" ], "default": null }, { "name": "numeric_test3", "type": [ "null", "double" ], "default": null } ], "connect.name": "postgres_source2" }

Value { "id": 35, "updated_at": 1553777012820, "string_test1": { "string": "test" }, "string_test2": { "string": "test" }, "numeric_test1": { "int": 123 }, "numeric_test2": { "bytes": "\u0001\u001e«L‹X" }, "int_test1": { "int": 1000000 }, "int_test2": { "int": 1 }, "int_test3": { "long": 100000000000 }, "int_test4": { "float": 123.12345 }, "int_test5": { "double": 123.1234567 }, "numeric_test3": { "double": 123.1234567 } }

Thanks

gameofdatas commented 5 years ago

@GiannisDimitriou Were you able to solve the issue?

I am also facing the same challenge as schema of my table is something similar to this

requested_price     numeric(24, 8),
executed_price      numeric(24, 8),
old_executed_price      numeric(24, 8)

and because of the above mentioned issue , it is coverting the type to double as the scale is more than 0 and the json converter parses it in the form of

"requested_price": "IwzS2MA=",
"executed_price": "IwylEgA=",
"old_executed_price": "IwzS2MA="

As a string converter, it gives me exact double vaules. Any one has any suggestion to get the data in the correct format

GiannisDimitriou commented 5 years ago

@Rahul7794

Unfortunately I haven't.

@avocader @rhauch
Since it's a bug can you give it a look if it's something that can be resolved in the near future?

Schmidt96u commented 4 years ago

Hi if nobody is working on this i will be happy to take this issue and try to solve it !

Schmidt96u commented 4 years ago

After looking around the issue i'm no sure about to understand how NUMERIC are used. Indeed the main goal of NUMERIC is to have an exact value at the end. ( here more doc ). I would be really happy if someone can explain the logic used to code their "parser". (link to the parser).

For me there is 3 options : 1) First the type NUMERIC(N,0) here we have a Integer and the way how it's manage in JDBC is a good one. We directly know that when you have your scale = 0 you get an Int. 2) All NUMERIC(N,S) with S>0.Here, why not just use Decimal, it's supported by Avro as a logical type backed up by Avro Bytes ( doc ) and Kafka Connect ( doc ) with Java BigDecimal ( doc ). 3) And Finally the NUMERIC() without any specification. We can handle it directly like 2).

So i just want to have another point of view and see if i'm getting something wrong ?