lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1.01k stars 366 forks source link

Cassandra source connector produces wrong values for UDTs #720

Open cnmaia opened 3 years ago

cnmaia commented 3 years ago

I'm running the fast-data-dev stack with a cassandra database in docker and testing the Cassandra Source Connector. It works great for most of the use cases, but I have a table with a UDT and I expected it to be streamed to Kafka with the value of the UDT as a string type, as stated in the docs. Instead of the value of the UDT, I'm seeing a json string that denotes the schema of the UDT.

What version of the Stream Reactor are you reporting this issue for?

Kafka Connect Version : 2.2.2-L0

{
"class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"type": "source",
"version": "2.1.3"
}

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

cqlsh> show version;
[cqlsh 5.0.1 | Cassandra 3.11.6 | CQL spec 3.4.4 | Native protocol v4]

Have you read the docs? Yes

What is the expected behaviour?

It should output the value of the column UDT.

What was observed?

The UDT schema was in the place of which I presumed would be the value.

What is your connector properties configuration (my-connector.properties)?

{
    "name": "cassandra-test-source-connector-2",
    "config": {
        "task.max": "10",
        "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
        "connect.cassandra.contact.points": "contract_api_load_test",
        "connect.cassandra.port": 9042,
        "connect.cassandra.username": "contract_adm",
        "connect.cassandra.password": "123",
        "connect.cassandra.consistency.level": "LOCAL_ONE",
        "connect.cassandra.key.space": "test_adm",
        "connect.cassandra.import.mode": "incremental",
        "connect.cassandra.kcql": "INSERT INTO test.udt.data.topic SELECT * FROM TEST_WITH_CUSTOM_TYPE PK DAT_CREATION INCREMENTALMODE=TIMESTAMP",
        "connect.cassandra.import.poll.interval": 500,
        "connect.cassandra.slice.duration": 1000,
        "connect.cassandra.slice.delay.ms": 3000,
        "connect.cassandra.task.buffer.size": 1000,
        "connect.cassandra.batch.size": 1000
    }
}

Please provide full log files (redact and sensitive information)

  1. Table creation:

    
    cqlsh> CREATE KEYSPACE test_adm WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
    cqlsh> CREATE TYPE test_adm.TEST_TYPE(
    ...     COD_SOMETHING text,
    ...     COD_OTHER text
    ... );
    cqlsh> CREATE TABLE test_adm.TEST_WITH_CUSTOM_TYPE(
    ...     ID text,
    ...     DAT_CREATION timestamp,
    ...     LST_SOMETHING LIST<TEXT>,
    ...     LST_TEST_TYPE LIST<FROZEN<TEST_TYPE>>,
    ...     PRIMARY KEY((ID), DAT_CREATION)
    ... );
    cqlsh> INSERT INTO test_adm.TEST_WITH_CUSTOM_TYPE(ID, DAT_CREATION, LST_SOMETHING, LST_TEST_TYPE) VALUES (
    ...     'some-id-value',
    ...     toTimestamp(now()),
    ...     ['8268ca56-f395-463b-bb54-857bc5aac5cc'],
    ...     [{COD_SOMETHING: 'udt_custom_value', COD_OTHER: 'other_udt_custom_value'}]
    ... );
    cqlsh> SELECT * FROM TEST_WITH_CUSTOM_TYPE;
    InvalidRequest: Error from server: code=2200 [Invalid query] message="No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename"
    cqlsh> SELECT * FROM test_adm.TEST_WITH_CUSTOM_TYPE;
    
    id            | dat_creation                    | lst_something                            | lst_test_type
    ---------------+---------------------------------+------------------------------------------+----------------------------------------------------------------------------
    some-id-value | 2020-12-10 02:44:13.820000+0000 | ['8268ca56-f395-463b-bb54-857bc5aac5cc'] | [{cod_something: 'udt_custom_value', cod_other: 'other_udt_custom_value'}]

(1 rows)


2. Topic creation:
`docker run --rm -it --net=host landoop/fast-data-dev kafka-topics --zookeeper localhost:2181 --create --topic test.udt.data.topic --partitions 4 --replication-factor 1`

3. Schema auto-created in schema-registry:

{ "type": "record", "name": "TEST_WITH_CUSTOM_TYPE", "namespace": "test_adm", "fields": [ { "name": "id", "type": [ "null", "string" ], "default": null }, { "name": "dat_creation", "type": [ "null", { "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" } ], "default": null }, { "name": "lst_something", "type": [ "null", "string" ], "default": null }, { "name": "lst_test_type", "type": [ "null", "string" ], "default": null } ], "connect.name": "test_adm.TEST_WITH_CUSTOM_TYPE" }


4. Data that got in the kafka topic [**HERE'S THE PROBLEM. FIELD `LST_TEST_TYPE`**]:

[ { "topic": "test.udt.data.topic", "key": null, "value": { "id": { "string": "some-id-value" }, "dat_creation": { "long": 1607568253820 }, "lst_something": { "string": "[\"8268ca56-f395-463b-bb54-857bc5aac5cc\"]" }, "lst_test_type": { "string": "[{\"type\":{\"name\":\"UDT\",\"keyspace\":\"test_adm\",\"typeName\":\"test_type\",\"frozen\":false,\"fieldNames\":[\"cod_something\",\"cod_other\"],\"typeArguments\":[],\"collection\":false}}]" } }, "partition": 3, "offset": 0 } ]


I expected to see something like this for the UDT field above: 
`[{cod_something: 'udt_custom_value', cod_other: 'other_udt_custom_value'}]`.

----

Connector logs:

By Andrew Stevenson.2.1.3 (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:63) [2020-12-10 02:48:42,365] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860) [2020-12-10 02:48:42,365] INFO StreamReactor-Version: 2.1.3 Kafka-Version: 2.5.0 Git-Repo:
Git-Commit-Hash: 15a5cbaf1fe005643d45cdcf2c46e5953acafb76 Git-Tag: 2.1.3 StreamReactor-Docs: https://docs.lenses.io/connectors/ (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:64) [2020-12-10 02:48:42,367] INFO CassandraConfigSource values: connect.cassandra.assigned.tables = TEST_WITH_CUSTOM_TYPE connect.cassandra.batch.size = 1000 connect.cassandra.bucket.timeseries.field.name = bucket connect.cassandra.bucket.timeseries.format = connect.cassandra.bucket.timeseries.mode = connect.cassandra.consistency.level = LOCAL_ONE connect.cassandra.contact.points = contract_api_load_test connect.cassandra.error.policy = THROW connect.cassandra.fetch.size = 5000 connect.cassandra.import.allow.filtering = true connect.cassandra.import.poll.interval = 500 connect.cassandra.initial.offset = 1900-01-01 00:00:00.0000000Z connect.cassandra.kcql = INSERT INTO test.udt.data.topic SELECT FROM TEST_WITH_CUSTOM_TYPE PK DAT_CREATION INCREMENTALMODE=TIMESTAMP connect.cassandra.key.space = test_adm connect.cassandra.key.store.password = [hidden] connect.cassandra.key.store.path = connect.cassandra.key.store.type = JKS connect.cassandra.load.balancing.policy = TOKEN_AWARE connect.cassandra.mapping.collection.to.json = true connect.cassandra.max.retries = 20 connect.cassandra.password = [hidden] connect.cassandra.port = 9042 connect.cassandra.retry.interval = 60000 connect.cassandra.slice.delay.ms = 3000 connect.cassandra.slice.duration = 1000 connect.cassandra.ssl.client.cert.auth = false connect.cassandra.ssl.enabled = false connect.cassandra.task.buffer.size = 1000 connect.cassandra.time.slice.ms = 10000 connect.cassandra.trust.store.password = [hidden] connect.cassandra.trust.store.path = connect.cassandra.trust.store.type = JKS connect.cassandra.username = contract_adm (com.datamountaineer.streamreactor.connect.cassandra.config.CassandraConfigSource:279) [2020-12-10 02:48:42,367] INFO Using native clock to generate timestamps. (com.datastax.driver.core.ClockFactory:57) [2020-12-10 02:48:42,419] INFO Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (com.datastax.driver.core.policies.DCAwareRoundRobinPolicy:110) [2020-12-10 02:48:42,420] INFO New Cassandra host contract_api_load_test/172.31.0.3:9042 added (com.datastax.driver.core.Cluster:1621) [2020-12-10 02:48:42,444] INFO Cluster ID: WWjGRFe-QjuJNLwtSHuhuQ (org.apache.kafka.clients.Metadata:379) [2020-12-10 02:48:42,457] INFO Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor) (com.datastax.driver.core.policies.DCAwareRoundRobinPolicy:110) [2020-12-10 02:48:42,457] INFO New Cassandra host contract_api_load_test/172.31.0.3:9042 added (com.datastax.driver.core.Cluster:1621) [2020-12-10 02:48:42,458] INFO Cluster ID: WWjGRFe-QjuJNLwtSHuhuQ (org.apache.kafka.clients.Metadata:379) [2020-12-10 02:48:42,459] INFO Cluster ID: WWjGRFe-QjuJNLwtSHuhuQ (org.apache.kafka.clients.Metadata:379) [2020-12-10 02:48:42,463] INFO Connection to Cassandra established. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:82) [2020-12-10 02:48:42,468] INFO the increment mode is TIMESTAMP (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:73) [2020-12-10 02:48:42,468] INFO checking CQL for PK: DAT_CREATION (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:130) [2020-12-10 02:48:42,468] INFO generated CQL: <> (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:83) [2020-12-10 02:48:42,479] INFO The increment mode is TIMESTAMP (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:47) [2020-12-10 02:48:42,479] INFO checking CQL for PK: DAT_CREATION (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:130) [2020-12-10 02:48:42,480] INFO Generated CQL: <(com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:57) [2020-12-10 02:48:42,482] WARN Re-preparing already prepared query is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once. Query='' (com.datastax.driver.core.Cluster:2345) [2020-12-10 02:48:42,484] INFO Connection to Cassandra established. (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:82) [2020-12-10 02:48:42,488] INFO the increment mode is TIMESTAMP (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:73) [2020-12-10 02:48:42,489] INFO checking CQL for PK: DAT_CREATION (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:130) [2020-12-10 02:48:42,489] INFO generated CQL: SELECT FROM test_adm.TEST_WITH_CUSTOM_TYPE WHERE DAT_CREATION > ? AND DAT_CREATION <= ? ALLOW FILTERING (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:83) [2020-12-10 02:48:42,506] INFO The increment mode is TIMESTAMP (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:47) [2020-12-10 02:48:42,506] INFO checking CQL for PK: DAT_CREATION (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:130) [2020-12-10 02:48:42,507] INFO Generated CQL: SELECT FROM test_adm.TEST_WITH_CUSTOM_TYPE WHERE DAT_CREATION > ? AND DAT_CREATION <= ? ALLOW FILTERING (com.datamountaineer.streamreactor.connect.cassandra.source.CqlGenerator:57) [2020-12-10 02:48:42,512] WARN Re-preparing already prepared query is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once. Query='SELECT FROM test_adm.TEST_WITH_CUSTOM_TYPE WHERE DAT_CREATION > ? AND DAT_CREATION <= ? ALLOW FILTERING' (com.datastax.driver.core.Cluster:2345) [2020-12-10 02:48:42,787] INFO Recovered offset 2020-12-10 02:31:59.228Z for connector cassandra-test-source-connector (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraTableReader:92) [2020-12-10 02:48:42,788] INFO WorkerSourceTask{id=cassandra-test-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:203) [2020-12-10 02:48:42,788] INFO WorkerSourceTask{id=cassandra-test-source-connector-2-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:203) [2020-12-10 02:48:43,327] INFO Sending 1 records for connector cassandra-test-source-connector-2 (com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceTask:169) [2020-12-10 02:49:42,282] INFO WorkerSourceTask{id=logs-broker-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:410) [2020-12-10 02:49:42,282] INFO WorkerSourceTask{id=logs-broker-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:427)



Am I configuring something wrong or it's another problem? Did not find any docs about UDTs and this connector except the type translation in the official docs.
antomys commented 2 days ago

I have faced this issue, too. Has there been any progress in finding a solution?