scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
46 stars 22 forks source link

Connector is failed and entry is not inserted if "__ttl" is defined for this entry #37

Closed izhukov1992 closed 2 months ago

izhukov1992 commented 3 years ago

I would like to insert entry with TTTL:

sudo kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic topic2 \
--property parse.key=true \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator='$' \
--property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},{"name":"firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"__ttl","type":"int"}]}'

{"id":2}${"id":2,"firstName":"first","lastName":"last","__ttl":10}

But entry is not inserted and connector indicates "degraded" in confluent web interface. It's not reproducible, if don't use "__ttl" field in value.schema and in value.

Stack trace:

[2021-04-21 13:24:41,031] INFO create() - Adding table test.topic2

        CREATE TABLE test.topic2(
                id int,
                firstName varchar,
                lastName varchar,
                __ttl int,
                PRIMARY KEY(id))
        WITH compression = {'sstable_compression' : ''} (io.connect.scylladb.ScyllaDbSchemaBuilder:341)
[2021-04-21 13:24:41,072] ERROR WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: line 6:2 no viable alternative at input '' (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
com.datastax.driver.core.exceptions.SyntaxError: line 6:2 no viable alternative at input ''
        at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:52)
        at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:22)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35)
        at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293)
        at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58)
        at io.connect.scylladb.ScyllaDbSessionImpl.executeStatement(ScyllaDbSessionImpl.java:56)
        at io.connect.scylladb.ScyllaDbSchemaBuilder.create(ScyllaDbSchemaBuilder.java:342)
        at io.connect.scylladb.ScyllaDbSchemaBuilder.build(ScyllaDbSchemaBuilder.java:245)
        at io.connect.scylladb.ScyllaDbSessionImpl.createOrAlterTable(ScyllaDbSessionImpl.java:97)
        at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:85)
        at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 6:2 no viable alternative at input ''
        at com.datastax.driver.core.Responses$Error.asException(Responses.java:177)
        at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
        at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:229)
        at com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:63)
        at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1011)
        at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:814)
        at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1289)
        at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1207)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
[2021-04-21 13:24:41,087] ERROR WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 6:2 no viable alternative at input ''
        at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:52)
        at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:22)
        at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:35)
        at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:293)
        at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:58)
        at io.connect.scylladb.ScyllaDbSessionImpl.executeStatement(ScyllaDbSessionImpl.java:56)
        at io.connect.scylladb.ScyllaDbSchemaBuilder.create(ScyllaDbSchemaBuilder.java:342)
        at io.connect.scylladb.ScyllaDbSchemaBuilder.build(ScyllaDbSchemaBuilder.java:245)
        at io.connect.scylladb.ScyllaDbSessionImpl.createOrAlterTable(ScyllaDbSessionImpl.java:97)
        at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:85)
        at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 6:2 no viable alternative at input ''
        at com.datastax.driver.core.Responses$Error.asException(Responses.java:177)
        at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:215)
        at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:229)
        at com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:63)
        at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:1011)
        at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:814)
        at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1289)
        at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1207)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at com.datastax.driver.core.InboundTrafficMeter.channelRead(InboundTrafficMeter.java:38)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
[2021-04-21 13:24:41,089] INFO Closing getValidSession (io.connect.scylladb.ScyllaDbSinkTask:274)
[2021-04-21 13:24:43,116] INFO [Consumer clientId=connector-consumer-scylladb-sink-connector-0, groupId=connect-scylladb-sink-connector] Revoke previously assigned partitions topic1-0, topic2-0, topic3-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[2021-04-21 13:24:43,116] INFO [Consumer clientId=connector-consumer-scylladb-sink-connector-0, groupId=connect-scylladb-sink-connector] Member connector-consumer-scylladb-sink-connector-0-ab913fb5-f76c-4e92-8b23-0f202aa0d509 sending LeaveGroup request to coordinator ilya-vm:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1038)
[2021-04-21 13:24:43,135] INFO Publish thread interrupted for client_id=connector-consumer-scylladb-sink-connector-0 client_type=CONSUMER session= cluster=cBYPBr45R-aKrrjW8SEZ7g group=connect-scylladb-sink-connector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:285)
[2021-04-21 13:24:43,136] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-scylladb-sink-connector-0 client_type=CONSUMER session= cluster=cBYPBr45R-aKrrjW8SEZ7g group=connect-scylladb-sink-connector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:297)
[2021-04-21 13:24:43,137] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-scylladb-sink-connector-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1205)
[2021-04-21 13:24:43,144] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-04-21 13:24:43,144] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-04-21 13:24:43,144] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-04-21 13:24:43,144] INFO App info kafka.producer for confluent.monitoring.interceptor.connector-consumer-scylladb-sink-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-04-21 13:24:43,144] INFO Closed monitoring interceptor for client_id=connector-consumer-scylladb-sink-connector-0 client_type=CONSUMER session= cluster=cBYPBr45R-aKrrjW8SEZ7g group=connect-scylladb-sink-connector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor:320)
[2021-04-21 13:24:43,146] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-04-21 13:24:43,146] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-04-21 13:24:43,146] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-04-21 13:24:43,147] INFO App info kafka.consumer for connector-consumer-scylladb-sink-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-04-21 13:31:34,274] INFO [Worker clientId=connect-1, groupId=connect-cluster] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1609)
avelanarius commented 3 years ago

If you want to configure the connector to insert the data with TTL, you must also configure mapping by adding the following property to configuration:

Property key:

topic.topic2.test.topic2.mapping

Property value:

id=key.id,firstName=value.firstName,lastName=value.lastName,__ttl=value.__ttl

Without this mapping, the connector was interpreting __ttl as a "normal" column do be added to Scylla, not as an indication to set the TTL of added row. This mapping format is described in the "Topic to table" section of documentation.

However, if your intention was to have a separate __ttl column in Scylla table and not set expiration of row, there seems to be a bug preventing creating a table with such column name.

dkropachev commented 2 months ago

However, if your intention was to have a separate __ttl column in Scylla table and not set expiration of row, there seems to be a bug preventing creating a table with such column name.

All correct, only thing that it is not a bug, scylla and cassandra do not allow column names to start with underscore.