streamnative / pulsar-flink

Elastic data processing with Apache Pulsar and Apache Flink
Apache License 2.0
279 stars 119 forks source link

[BUG] Fail to Insert into an upsert table #412

Open nlu90 opened 3 years ago

nlu90 commented 3 years ago

Describe the bug Fail to create correct table sink when inserting into an upsert table

To Reproduce for table:

CREATE TABLE some_table (
  `order_id` STRING,
  `transaction_time` STRING, 
  ....
  PRIMARY KEY (order_id) NOT ENFORCED 
) WITH (
  'connector' = 'upsert-pulsar',
  'topic' = 'persistent://public/ns/topic',
  'value.format' = 'avro',
  'key.format' = 'raw',
  'key.fields' = 'order_id'
)
;

and query:

INSERT INTO `public/ns`.some_table
WITH `theo_grid` AS (
  ....

get the following error message:

{"errors":["Internal server error.","<Exception on server side:\ncom.ververica.flink.table.gateway.utils.SqlExecutionException: Invalid SQL statement.
...
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'pulsar.public/ns.table'.\n\nTable options are:\n\n'admin-url'='....'\n'default-database'='public/default'\n'format'='json'\n'properties.auth-params'='file:///opt/flink-sql-gateway/secret/pulsar-token/pulsar/token'\n'properties.auth-plugin-classname'='org.apache.pulsar.client.impl.auth.AuthenticationToken'\n'service-url'='pulsar://pulsar-broker.pulsar:6650'\n'type'='pulsar'\n\tat 
...
Caused by: org.apache.flink.table.api.ValidationException: The Pulsar table 'pulsar.public/yab.some_table' with 'json' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.\n\tat org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory.validatePKConstraints(PulsarDynamicTableFactory.java:311)\n\tat org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory.createDynamicTableSink(PulsarDynamicTableFactory.java:121)\n\tat org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:153)\n\t... 69 more\n\nEnd of exception on server side>"]}

Seems default options are used and it's calling the PulsarDynamicTableFactory instead of UpsertPulsarDynamicTableFactory to create the table sink

Expected behavior UpsertPulsarDynamicTableFactory should be used to create table sink.

jianyun8023 commented 3 years ago

@nlu90 In upsert-pulsar, the key.fields and key.fields-prefix options will no longer be used, you need to use the PRIMARY KEY statement

CREATE TABLE some_table (
  `order_id` STRING,
  `transaction_time` STRING, 
  PRIMARY KEY (order_id) NOT ENFORCED 
) WITH (
  'connector' = 'upsert-pulsar',
  'admin-url' = 'http://localhost:55092',
  'service-url' = 'pulsar://localhost:55093',
  'topic' = 'persistent://public/default/topic54188168',
  'value.format' = 'avro',
  'key.format' = 'raw'
)

INSERT INTO some_table
VALUES
 ('1', 'name 1'),
 ('2', 'name 2'),
 ('3', 'name 3'),
 ('2', 'name 2')
jianyun8023 commented 3 years ago

Another issue, it seems that PulsarDynamicTableFactory was created, suggest to recheck and test it. I was unable to reproduce the problem.