StarRocks / starrocks-connector-for-kafka

Apache License 2.0
7 stars 12 forks source link

Default value of a column not getting set #22

Open swapkh91 opened 6 months ago

swapkh91 commented 6 months ago

I'm using the connector to load data to a table where I have a column rowtime with default value as CURRENT_TIMESTAMP

When sending json data through the connector, this column doesn't get populated with current timestamp, instead NULL is set as value

table structure

CREATE TABLE `test` (
  `type` varchar(65533) NULL COMMENT "",
  `clientId` varchar(65533) NULL COMMENT "",
  `rowtime` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT "",
  INDEX buyer_clientId_idx (`clientId`) USING BITMAP
) ENGINE=OLAP 
DUPLICATE KEY(`type`, `clientId`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`clientId`)
PROPERTIES (
"replication_num" = "1",
"datacache.partition_duration" = "3 days",
"datacache.enable" = "true",
"storage_volume" = "starrocks_qa_volume",
"enable_async_write_back" = "false",
"enable_persistent_index" = "true",
"persistent_index_type" = "LOCAL",
"compression" = "LZ4"
);

connector yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: kafka-starrocks-test-sink-qa
  namespace: kafka
  annotations:
    strimzi.io/restart: "true"
  labels:
    strimzi.io/cluster: dp-kafka-connect-cluster 
spec:
 class: com.starrocks.connector.kafka.StarRocksSinkConnector
 tasksMax: 15
 autoRestart:
  enabled: true
  maxRestarts: 5
 config:
  key.converter: org.apache.kafka.connect.json.JsonConverter
  value.converter: org.apache.kafka.connect.json.JsonConverter
  header.converter: org.apache.kafka.connect.storage.SimpleHeaderConverter
  topics: realtime-test
  key.converter.schemas.enable: true
  value.converter.schemas.enable: false
  fetch.max.wait.ms: 500
  starrocks.http.url: <ip>:8030
  starrocks.topic2table.map: realtime-test:test
  starrocks.username: <user>
  starrocks.password: <pass>
  starrocks.database.name: test_db
  sink.properties.strip_outer_array: true
  bootstrap.servers: dp-kafka-cluster-kafka-0.dp-kafka-cluster-kafka-brokers.kafka.svc:9092,dp-kafka-cluster-kafka-1.dp-kafka-cluster-kafka-brokers.kafka.svc:9092,dp-kafka-cluster-kafka-2.dp-kafka-cluster-kafka-brokers.kafka.svc:9092
  kafka.partition.count: 30

ROUTINE LOAD works fine in this case

sample json

{'type': 'OPEN', 'clientId': '0000-asas-sadasd'}
fuzing commented 1 month ago

Same issue here - I've also opened an issue on the main starrocks repo: Creating a starrocks table with a DATETIME column that defaults to current_timestamp does not populate the DATETIME column when inserting via kafka connector

Steps to reproduce the behavior (Required)

CREATE TABLE IF NOT EXISTS events ( timestamp DATETIME DEFAULT current_timestamp, id STRING NOT NULL, event STRING NOT NULL, meta JSON NOT NULL ) ENGINE=OLAP ORDER BY (timestamp, id, event);

then perform insert/s via the kafka connector that exclude the "timestamp" column, results in null for the timestamp. Please note that this does NOT happen when performing inserts directly (via a mysql client).

Expected behavior (Required)

That insertions via the kafka-connector where the DATETIME field is missing would result in the DEFAULT (i.e. current date/time) being inserted.

Real behavior (Required)

When inserting from kafka-connector without providing the "timestamp" field, the column is not DEFAULTed to the current date/time. The value is inserted with null.

StarRocks version (Required)

docker/container starrocks/allin1-ubuntu:3.3-latest starrocks-kafka-connector from github repo as of (10-16-2024)