neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Objects inside an array coming from a source connector have their types set to string in the generated Schema #639

Closed farhadnowzari closed 3 weeks ago

farhadnowzari commented 1 month ago

Expected Behavior (Mandatory)

We have a cypher, where at the end will generate an output like this, if it is ran against neo4j:

{
    "name": "Movie#1",
    "resources": [
        {
            "name": "Camera",
            "isUsed": false
        },
        {
            "name": "Light",
            "isUsed": true
        }
    ]
}

if the resources objects have a value of type boolean or integer, the connector fails after reading one node into the topic and it complains with such error Invalid Java object for schema with type STRING: class java.lang.Long

We are using the neo4j-connector version 5.0.2 against neo4j 5.19 with confluentinc/cp-kafka-connect-base:6.2.1

Actual Behavior (Mandatory)

The objects inside an array should have their types kept and not be converted to String, because neo4j cypher is going to return boolean or long or any other type which cannot be casted to STRING.

How to Reproduce the Problem

The actual query is much bigger, but the following one kinda give the idea how our case look like.

MATCH (m:Movie)
WITH m

CALL {
    WITH m
    OPTIONAL MATCH (m)-[:HAS]->(r:Resource)
    WITH m,r
    RETURN collect({ id: r.id, name: r.name, isUsed: COALESCE(r.isUsed, false) }) as resources
}

WITH *
RETURN
    m.id as id,
    resources

Steps (Mandatory)

  1. Install the above cypher with a source connector with such config:
    {
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "errors.log.include.messages": "true",
    "topic.creation.default.partitions": "-1",
    "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "neo4j.authentication.basic.password": "<pass>",
    "transforms": "ValueToKey,extractKeyFromStruct",
    "transforms.extractKeyFromStruct.field": "id",
    "topic.creation.applicationlogs.cleanup.policy": "delete",
    "neo4j.database": "<db-name>",
    "neo4j.streaming.property": "modifiedUtc",
    "topic.creation.applicationlogs.retention.ms": "10000",
    "neo4j.enforce.schema": "true",
    "topic.creation.default.replication.factor": "3",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.log.enable": "true",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "errors.retry.timeout": "-1",
    "neo4j.server.uri": "<neo4j-address>",
    "neo4j.source.query": "",
    "errors.retry.delay.max.ms": "30000",
    "transforms.ValueToKey.fields": "id",
    "key.converter.schemas.enable": "true",
    "topic.creation.inventory.delete.retention.ms": "10000",
    "name": "<name>",
    "value.converter.schemas.enable": "false",
    "neo4j.authentication.basic.username": "<neo-user>",
    "topic": "topic",
    "errors.tolerance": "all",
    "neo4j.streaming.poll.interval.msecs": "200",
    "neo4j.streaming.from": "LAST_COMMITTED"
    }
  2. After one read it fails and cannot reading the rest, with the error mentioned at the beginning. The stacktrace comes at the end

Specifications (Mandatory)

The schema registry service from kafka is not deployed.

Versions

Stacktrace

org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type STRING: class java.lang.Long\n\tat streams.kafka.connect.source.Neo4jSourceService.checkError(Neo4jSourceService.kt:131)\n\tat streams.kafka.connect.source.Neo4jSourceService.poll(Neo4jSourceService.kt:139)\n\tat streams.kafka.connect.source.Neo4jSourceTask.poll(Neo4jSourceTask.kt:31)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:296)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:253)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type STRING: class java.lang.Long\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:261)\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)\n\tat org.apache.kafka.connect.data.Struct.put(Struct.java:216)\n\tat streams.kafka.connect.utils.ConnectExtensionFunctionsKt.asStruct(ConnectExtensionFunctions.kt:42)\n\tat streams.kafka.connect.source.SourceRecordBuilder.build(SourceRecordBuilder.kt:59)\n\tat streams.kafka.connect.source.Neo4jSourceService.toSourceRecord(Neo4jSourceService.kt:107)\n\tat streams.kafka.connect.source.Neo4jSourceService.access$toSourceRecord(Neo4jSourceService.kt:28)\n\tat streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend$lambda-1(Neo4jSourceService.kt:84)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.readTransaction(InternalSession.java:103)\n\tat streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend(Neo4jSourceService.kt:79)\n\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)\n\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)\n\tat kotlinx.coroutines.internal.LimitedDispatcher.run(LimitedDispatcher.kt:42)\n\tat kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:95)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)\n\tat kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)\n
farhadnowzari commented 1 month ago

I updated to 5.0.5 and now the error changed to: For Schema.Type.STRUCT we support only Map and Point

Emrehzl94 commented 1 month ago

Hi @farhadnowzari, I tested with version 5.0.5 and got the expected result. Could you please provide the whole query? And I recommend you to use confluentinc/cp-server-connect:7.3.0 container image for connect if you don't have a dependency on the current one.

farhadnowzari commented 1 month ago

Hi @farhadnowzari, I tested with version 5.0.5 and got the expected result. Could you please provide the whole query? And I recommend you to use confluentinc/cp-server-connect:7.3.0 container image for connect if you don't have a dependency on the current one.

So I did the update to cp-server-connect:7.3.0, but I still get the error. The data which I'm trying to send, I have attached with the message.

records.json

Edit

it works, if I return the location like this in the neo4j query:

    location{
        .id,
        .name, 
        addressLine: COALESCE(location.addressLine, null), 
        postalCode: COALESCE(location.postalCode, null), 
        city: COALESCE(location.city, null),
        country: COALESCE(location.country, null), 
        phoneNumber: COALESCE(location.phoneNumber, null)
    } as location,
Emrehzl94 commented 3 weeks ago

Looks like there is a bug, when query returns object list and if the first element of the list has a null value, like the example below;

{
    "name": "Movie#1",
    "resources": [
        {
            "name": null,
            "isUsed": false
        },
        {
            "name": "Light",
            "isUsed": true
        }
    ]
}

if so, it's not able to correctly determine schema for the data.

For now, as workaround solution you can either remove "neo4j.enforce.schema": "true" property from configuration but in this case, value schema will be String type or provide a not null default value for each property which is returning from query by using COALESCE.

Another solution would be to use latest connector version which is 5.1.0-rc02 but you will need to do small migration on the configuration.

This is the public repo for the latest version of connector; https://github.com/neo4j/neo4j-kafka-connector

Emrehzl94 commented 3 weeks ago

I'm closing this issue as it is no longer reproducible in the latest version. Please feel free to reopen it if the problem occurs again.