confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

IFNULL should support data types other than VARCHAR #1181

Closed satybald closed 4 years ago

satybald commented 6 years ago

At the current moment, there's no method how to map NULL value to another scalar value. However, this quite common operation while working with SQL. The function is ISO SQL-92 compatible.

Coalesce — Map null to Another Value
SQL’s coalesce turns a null value into another value.

COALESCE ( expression1, expression2 [ , … ] )
Coalesce takes an arbitrary number of arguments and returns the first not null value or null if all arguments are null

This function is not allowed in a GROUP BY clause
Arguments to this function cannot be query expressions
satybald commented 6 years ago

I've written a small design doc for this feature in order to solicit the feedback from the community. I would highly appreciate a feedback. https://docs.google.com/document/d/1Ag2QiBa3YpPSzSqLeWqp6fDWpQG_GVeti0QKb1zuMRc/edit?usp=sharing

apurvam commented 6 years ago

Thanks @satybald , we will take a look.

cc @confluentinc/ksql

satybald commented 6 years ago

before trying to implement the feature, I'd be interested what community thinks about it? Does anyone find it useful and needed?

I also would love to hear feedback on the approach that I took in the design doc cc: @confluentinc/ksql

rmoff commented 6 years ago

Related : #620 (although I would +1 explicit support for COALESCE)

rmoff commented 6 years ago

One workaround to use is using INSERT INTO. It's not very elegant, and certainly not as flexible as a function such as COALESCE. But until we do have such function, it may be useful…

# Set up some sample data, run this from bash
# For more info about kafkacat see
#    https://docs.confluent.io/current/app-development/kafkacat-usage.html
    kafkacat -b kafka-broker:9092 \
            -t topic_with_nulls \
            -P <<EOF
{"col1":1,"col2":16000,"col3":"foo"}
{"col1":2,"col2":42000}
{"col1":3,"col2":94000,"col3":"bar"}
{"col1":4,"col2":12345}
EOF

Here's the KSQL workaround for handling NULLs in col3:

-- Register the topic
CREATE STREAM topic_with_nulls (COL1 INT, COL2 INT, COL3 VARCHAR) \
  WITH (KAFKA_TOPIC='topic_with_nulls',VALUE_FORMAT='JSON');

-- Query the topic to show there are some null values
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT COL1, COL2, COL3 FROM topic_with_nulls;
1 | 16000 | foo
2 | 42000 | null
3 | 94000 | bar
4 | 12345 | null

-- Create a derived stream, with just records with no NULLs in COL3
CREATE STREAM NULL_WORKAROUND AS \
  SELECT COL1, COL2, COL3 FROM topic_with_nulls WHERE COL3 IS NOT NULL;

-- Insert into the derived stream any records where COL3 *is* NULL, replacing it with a fixed string
INSERT INTO NULL_WORKAROUND \
  SELECT COL1, COL2, 'N/A' AS COL3 FROM topic_with_nulls WHERE COL3 IS NULL;

-- Confirm that the NULL substitution worked
ksql> SELECT COL1, COL2, COL3 FROM NULL_WORKAROUND;
1 | 16000 | foo
2 | 42000 | N/A
3 | 94000 | bar
4 | 12345 | N/A
thirstyfish commented 6 years ago

For anyone looking for a solution for this, I found IFNULL(x,y) works in 5.0.1. Not sure if this is a new addition as I haven't found any documentation or reference to this, only found it in list functions; But it works, hopefully this is not something that will be deprecated?

satybald commented 6 years ago

I guess it's a hidden easter egg in KSQL. It might be useful to document a function ;)

miguno commented 6 years ago

cc @JimGalasyn (also to double-check)

mtpatter commented 6 years ago

@thirstyfish do you have a special setup or anything? I just tried e.g., SELECT IFNULL(NULL,0); using confluentinc/ksql Docker images tagged 5.0.1 and get

io.confluent.ksql.parser.exception.ParseFailedException
Caused by: java.lang.NullPointerException

I do see it when running list functions; however. (Not sure if I'm just using it wrong.)

aculver28 commented 6 years ago

Just FYI...the IFNULL function seems to only work for VARCHAR fields. Everything else is giving me the following error:

io.confluent.ksql.util.KsqlException: Function 'IFNULL' does not accept parameters of types:[DOUBLE, DOUBLE]
    at io.confluent.ksql.function.UdfFactory.createNoMatchingFunctionException(UdfFactory.java:137)
    at io.confluent.ksql.function.UdfFactory.getFunction(UdfFactory.java:127)
    at io.confluent.ksql.util.ExpressionTypeManager.visitFunctionCall(ExpressionTypeManager.java:233)
    at io.confluent.ksql.util.ExpressionTypeManager.visitFunctionCall(ExpressionTypeManager.java:48)
    at io.confluent.ksql.parser.tree.FunctionCall.accept(FunctionCall.java:91)
    at io.confluent.ksql.parser.tree.DefaultAstVisitor.process(DefaultAstVisitor.java:25)
    at io.confluent.ksql.util.ExpressionTypeManager.getExpressionSchema(ExpressionTypeManager.java:61)
    at io.confluent.ksql.planner.LogicalPlanner.buildProjectNode(LogicalPlanner.java:174)
    at io.confluent.ksql.planner.LogicalPlanner.buildPlan(LogicalPlanner.java:80)
    at io.confluent.ksql.QueryEngine.buildQueryLogicalPlan(QueryEngine.java:117)
    at io.confluent.ksql.QueryEngine.buildLogicalPlans(QueryEngine.java:89)
    at io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:221)
    at io.confluent.ksql.KsqlEngine.createQueries(KsqlEngine.java:618)
    at io.confluent.ksql.rest.server.StandaloneExecutor.executeStatements(StandaloneExecutor.java:118)
    at io.confluent.ksql.rest.server.StandaloneExecutor.start(StandaloneExecutor.java:67)
    at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:60)
    at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:46)
thirstyfish commented 6 years ago

@thirstyfish do you have a special setup or anything? I just tried e.g., SELECT IFNULL(NULL,0); using confluentinc/ksql Docker images tagged 5.0.1 and get

io.confluent.ksql.parser.exception.ParseFailedException
Caused by: java.lang.NullPointerException

I do see it when running list functions; however. (Not sure if I'm just using it wrong.)

No special setup, also developing using the Docker image atm. I was testing IFNULL on a JSON input stream to extract the message type field value which exists at different object hierarchy levels in different message types. Like so:

CREATE STREAM CSAS_LOAD AS SELECT IFNULL(EXTRACTJSONFIELD(body, '$.message.type'),EXTRACTJSONFIELD(body, '$.type')) as msg_type, body from CSAS_EXTRACT;

I'm guessing that this is the actual UDF source: https://github.com/confluentinc/ksql/blob/master/ksql-engine/src/main/java/io/confluent/ksql/function/udf/string/IfNullKudf.java

satybald commented 6 years ago

@aculver28 yes, it's indeed defined only for VARCHARs.

final KsqlFunction ifNull = new KsqlFunction(Schema.OPTIONAL_STRING_SCHEMA,
        Arrays.asList(Schema.OPTIONAL_STRING_SCHEMA,
            Schema.OPTIONAL_STRING_SCHEMA),
        "IFNULL", IfNullKudf.class);

https://github.com/confluentinc/ksql/blob/2ff99faafeb99e342d4520bc57750d53f6daebd4/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java#L199

aculver28 commented 6 years ago

@satybald yeah I noticed that before I saw your comment. I wrote a custom UDF for something I had to push out earlier. Gonna implement a generic solution for this enhancement tomorrow. I'll take a look at the design doc today, but it should be a pretty straightforward fix.

Also, for what it's worth, I think it makes sense to have both a properly defined IFNULL() function that supports all KSQL supported datatypes, as well as a COALESCE() function. Thoughts?

JimGalasyn commented 6 years ago

Created internal ticket KSQL-1912 to track.

satybald commented 6 years ago

@aculver28 I think IFNULL is a handy function to have also.

rmoff commented 5 years ago

I've updated the title of this ticket to reflect the changing state. In summary:

  1. We should improve IFNULL to support data types other than VARCHAR
  2. Personally, I think we should add a synonym for COALESCE to IFNULL, since the former is a very standard SQL term that people will expect to find.
sunilkpokkalla commented 5 years ago

[2019-07-02 20:46:15,018] INFO Attempting to open connection #1 to Generic (io.confluent.connect.jdbc.util.CachedConnectionProvider:86) [2019-07-02 20:46:15,157] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:48) [2019-07-02 20:46:15,286] INFO Checking Generic dialect for existence of table "EVENT_ADMISSION_HX_Kafka2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:499) [2019-07-02 20:46:15,385] INFO Using Generic dialect table "EVENT_ADMISSION_HX_Kafka2" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:507) [2019-07-02 20:46:16,845] INFO Setting metadata for table "EVENT_ADMISSION_HX_Kafka2" to Table{name='"EVENT_ADMISSION_HX_Kafka2"', columns=[Column{'LAST_SYNC_SEQ', isPrimaryKey=false, allowsNull=true, sqlType=INTEGER}, Column{'MEMBER_ID', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'EVENT_SEQ', isPrimaryKey=false, allowsNull=false, sqlType=INTEGER}, Column{'UPDATED_VALUE', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'MODIFICATION_DATE', isPrimaryKey=false, allowsNull=false, sqlType=TIMESTAMP}, Column{'COLUMN_NAME', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'MODIFIED_BY', isPrimaryKey=false, allowsNull=true, sqlType=SMALLINT}, Column{'CODE_VERSION', isPrimaryKey=false, allowsNull=true, sqlType=SMALLINT}, Column{'LAST_UPDATE_DBID', isPrimaryKey=false, allowsNull=true, sqlType=INTEGER}]} (io.confluent.connect.jdbc.util.TableDefinitions:63) [2019-07-02 20:46:16,845] INFO Closing BufferedRecords with preparedStatement: null (io.confluent.connect.jdbc.sink.BufferedRecords:184) [2019-07-02 20:46:16,935] WARN Write of 12 records failed, remainingRetries=0 (io.confluent.connect.jdbc.sink.JdbcSinkTask:75) java.sql.SQLException: [Teradata JDBC Driver] [TeraJDBC 16.00.00.34] [Error 1063] [SQLState HY000] null is not supported as a data value with this variant of the setObject method; use the setNull method or the setObject method with a targetSqlType parameter at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDriverJDBCException(ErrorFactory.java:95) at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDriverJDBCException(ErrorFactory.java:65) at com.teradata.jdbc.jdbc_4.TDPreparedStatement.setObject(TDPreparedStatement.java:1629) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.bindField(GenericDatabaseDialect.java:1417) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindField(PreparedStatementBinder.java:140) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindNonKeyFields(PreparedStatementBinder.java:134) at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:70) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:138) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:71) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2019-07-02 20:46:16,936] ERROR WorkerSinkTask{id=ccms-production-event-admission-hx-teradatasink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585) org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.SQLException: [Teradata JDBC Driver] [TeraJDBC 16.00.00.34] [Error 1063] [SQLState HY000] null is not supported as a data value with this variant of the setObject method; use the setNull method or the setObject method with a targetSqlType parameter

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.sql.SQLException: java.sql.SQLException: [Teradata JDBC Driver] [TeraJDBC 16.00.00.34] [Error 1063] [SQLState HY000] null is not supported as a data value with this variant of the setObject method; use the setNull method or the setObject method with a targetSqlType parameter

... 12 more

[2019-07-02 20:46:16,936] ERROR WorkerSinkTask{id=ccms-production-event-admission-hx-teradatasink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.SQLException: [Teradata JDBC Driver] [TeraJDBC 16.00.00.34] [Error 1063] [SQLState HY000] null is not supported as a data value with this variant of the setObject method; use the setNull method or the setObject method with a targetSqlType parameter

at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more

Caused by: java.sql.SQLException: java.sql.SQLException: [Teradata JDBC Driver] [TeraJDBC 16.00.00.34] [Error 1063] [SQLState HY000] null is not supported as a data value with this variant of the setObject method; use the setNull method or the setObject method with a targetSqlType parameter

... 12 more

[2019-07-02 20:46:16,936] ERROR WorkerSinkTask{id=ccms-production-event-admission-hx-teradatasink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) [2019-07-02 20:46:16,936] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:104) [2019-07-02 20:46:16,937] INFO Closing connection #1 to Generic (io.confluent.connect.jdbc.util.CachedConnectionProvider:112) [2019-07-02 20:46:16,983] INFO [Consumer clientId=ccms-production-event-admission-hx-teradatasink, groupId=connect-ccms-production-event-admission-hx-teradatasink] Sending LeaveGroup request to coordinator cilhdkfd0308.sys.cigna.com:9093 (id: 2147483169 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)

I'm having the similar error in sink - JDBC kafka connector (confluent) kindly help

Deninc commented 4 years ago

Dear @rmoff, any plan to support coalesce soon?

Also, I just tried your insert into workaround, and the result is not the same. The original order is not preserved.

Setting up:

CREATE STREAM topic_with_nulls (COL1 INT, COL2 INT, COL3 VARCHAR) \
  WITH (KAFKA_TOPIC='topic_with_nulls',VALUE_FORMAT='JSON', PARTITIONS=1);

INSERT INTO topic_with_nulls (COL1, COL2, COL3) VALUES (1, 16000, 'foo');
INSERT INTO topic_with_nulls (COL1, COL2) VALUES (2, 42000);
INSERT INTO topic_with_nulls (COL1, COL2, COL3) VALUES (3, 94000, 'bar');
INSERT INTO topic_with_nulls (COL1, COL2) VALUES (4, 12345);

CREATE STREAM NULL_WORKAROUND AS \
  SELECT COL1, COL2, COL3 FROM topic_with_nulls WHERE COL3 IS NOT NULL;

INSERT INTO NULL_WORKAROUND \
  SELECT COL1, COL2, 'N/A' AS COL3 FROM topic_with_nulls WHERE COL3 IS NULL;

Result (order should be 1,2,3,4 but here it is 1,3,2,4):

ksql> SELECT COL1, COL2, COL3 FROM NULL_WORKAROUND emit changes;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|COL1                                                     |COL2                                                     |COL3                                                     |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1                                                        |16000                                                    |foo                                                      |
|3                                                        |94000                                                    |bar                                                      |
|2                                                        |42000                                                    |N/A                                                      |
|4                                                        |12345                                                    |N/A                                                      |

ksqlDB 0.7.1

aculver28 commented 4 years ago

@rmoff - I may have some time to work on this at some point in the next week, so wanted to make sure there are no outstanding concerns (e.g. number of params to accept, default behavior, mixed datatypes, etc.) before diving in.

rbk1993 commented 4 years ago

Just FYI...the IFNULL function seems to only work for VARCHAR fields. Everything else is giving me the following error:

io.confluent.ksql.util.KsqlException: Function 'IFNULL' does not accept parameters of types:[DOUBLE, DOUBLE]
  at io.confluent.ksql.function.UdfFactory.createNoMatchingFunctionException(UdfFactory.java:137)
  at io.confluent.ksql.function.UdfFactory.getFunction(UdfFactory.java:127)
  at io.confluent.ksql.util.ExpressionTypeManager.visitFunctionCall(ExpressionTypeManager.java:233)
  at io.confluent.ksql.util.ExpressionTypeManager.visitFunctionCall(ExpressionTypeManager.java:48)
  at io.confluent.ksql.parser.tree.FunctionCall.accept(FunctionCall.java:91)
  at io.confluent.ksql.parser.tree.DefaultAstVisitor.process(DefaultAstVisitor.java:25)
  at io.confluent.ksql.util.ExpressionTypeManager.getExpressionSchema(ExpressionTypeManager.java:61)
  at io.confluent.ksql.planner.LogicalPlanner.buildProjectNode(LogicalPlanner.java:174)
  at io.confluent.ksql.planner.LogicalPlanner.buildPlan(LogicalPlanner.java:80)
  at io.confluent.ksql.QueryEngine.buildQueryLogicalPlan(QueryEngine.java:117)
  at io.confluent.ksql.QueryEngine.buildLogicalPlans(QueryEngine.java:89)
  at io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:221)
  at io.confluent.ksql.KsqlEngine.createQueries(KsqlEngine.java:618)
  at io.confluent.ksql.rest.server.StandaloneExecutor.executeStatements(StandaloneExecutor.java:118)
  at io.confluent.ksql.rest.server.StandaloneExecutor.start(StandaloneExecutor.java:67)
  at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:60)
  at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:46)

Hi, did you fix the "createNoMatchingFunctionException" ? I have the same problem with a customer UDF that I try to create.

big-andy-coates commented 4 years ago

Closing as ksqlDB now supports IFNULL on any SQL type and also has COALESCE.