neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

org.apache.kafka.connect.errors.SchemaBuilderException Error with Neo4jSource connector 5.0.3+ but not with 5.0.2 #625

Closed aonamrata closed 4 months ago

aonamrata commented 5 months ago

Hi,

I was updating neo4j source connector to get the latest security updated and noticed some strange behavior/errors from connector version 5.0.2 to 5.0.3+.

Expected Behavior (as it is with 5.0.2)

It reads the new updates from neo4j and post it to kafka topic properly. Connector logs:

2024-05-16 12:01:07 [2024-05-16 06:31:07,387] DEBUG Poll returns 0 results (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:01:08 [2024-05-16 06:31:08,388] DEBUG Poll returns 0 results (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:01:09 [2024-05-16 06:31:09,388] DEBUG Poll returns 0 results (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:01:09 [2024-05-16 06:31:09,428] INFO Poll returns 1 result(s) (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:01:11 [2024-05-16 06:31:11,483] DEBUG Poll returns 0 results (streams.kafka.connect.source.Neo4jSourceService)

All the optional fields are posted as null because that is how i return it from cypher (full cypher below). Example of message in kafka topic with version 5.0.2

{
  "uuid": "83222244-0862-499a-9f3b-d424994ee31b",
  "name": "Foo",
  "vendorId": 32936,
  "vendorUUID": "111115bd-03eb-414e-a226-851b35c0b12a",
  "subaccountId": 62781,
  "subaccountUUID": "83222244-0862-499a-9f3b-d424994ee31b",
  "labelType": "Subaccount",
  "vendorStatus": null,
  "vendorType": null,
  "isDistributor": null,
  "isDeleted": false,
  "lastModifiedAt": "2024-05-16T00:33:57.286Z",
}

Actual Behavior (Error with version 5.0.3)

I get this error in connector logs and no message is posted in Kafka topic.

2024-05-16 12:04:50 [2024-05-16 06:34:50,157] DEBUG Poll returns 0 results (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:04:50 [2024-05-16 06:34:50,584] ERROR Error: (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:04:50 org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field vendorStatus cannot be null.
2024-05-16 12:04:50     at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:327)
2024-05-16 12:04:50     at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.schema(ConnectExtensionFunctions.kt:33)
2024-05-16 12:04:50     at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.asStruct(ConnectExtensionFunctions.kt:39)
2024-05-16 12:04:50     at streams.kafka.connect.source.SourceRecordBuilder.build(SourceRecordBuilder.kt:59)
2024-05-16 12:04:50     at streams.kafka.connect.source.Neo4jSourceService.toSourceRecord(Neo4jSourceService.kt:107)
2024-05-16 12:04:50     at streams.kafka.connect.source.Neo4jSourceService.access$toSourceRecord(Neo4jSourceService.kt:28)
2024-05-16 12:04:50     at streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend$lambda$1(Neo4jSourceService.kt:84)
2024-05-16 12:04:50     at org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)
2024-05-16 12:04:50     at org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)
2024-05-16 12:04:50     at org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)
2024-05-16 12:04:50     at org.neo4j.driver.internal.InternalSession.readTransaction(InternalSession.java:103)
2024-05-16 12:04:50     at streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend(Neo4jSourceService.kt:79)
2024-05-16 12:04:50     at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
2024-05-16 12:04:50     at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
2024-05-16 12:04:50     at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
2024-05-16 12:04:50     at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
2024-05-16 12:04:50     at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
2024-05-16 12:04:50     at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
2024-05-16 12:04:50     at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
2024-05-16 12:04:50     at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
2024-05-16 12:04:51 [2024-05-16 06:34:51,158] DEBUG Poll returns 0 results (streams.kafka.connect.source.Neo4jSourceService)
2024-05-16 12:04:51 [2024-05-16 06:34:51,158] DEBUG WorkerSourceTask{id=neo4j_source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
2024-05-16 12:04:51 [2024-05-16 06:34:51,159] DEBUG WorkerSourceTask{id=neo4j_source-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask)
2024-05-16 12:04:51 [2024-05-16 06:34:51,160] DEBUG WorkerSourceTask{id=neo4j_source-0} Finished offset commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
2024-05-16 12:04:51 [2024-05-16 06:34:51,160] ERROR WorkerSourceTask{id=neo4j_source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2024-05-16 12:04:51 org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field vendorStatus cannot be null.

Most of the changes in the releaselog https://github.com/neo4j-contrib/neo4j-streams/releases/tag/5.0.3 are dependencies update except this change which update the streams class. Do you know what i should change to work with the latest version of connector? Below are details about the connector and the cypher query.

This is what my connector configuration looks like

    "neo4j.encryption.enabled": "true",
    "neo4j.enforce.schema": "true",   //--- this is because i have some EXTRACT_VALUE_TO_KEY transformations and need it to be a struct
    "topic": "cdc.test1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false,
    "value.converter.schemas.enable": "false",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "neo4j.streaming.poll.interval.msecs": 5000,
    "neo4j.streaming.from": "LAST_COMMITTED",
    "neo4j.batch.size": "1000",
    "neo4j.source.query": "..."

neo4j.source.query

WITH datetime({epochMillis: toInteger($lastCheck)}) AS lastCheck_dt
CALL {
    WITH lastCheck_dt
    MATCH (v:Vendor)
    WHERE v.lastModifiedAt > lastCheck_dt
    RETURN
        v.uuid AS uuid,
        v.name AS name,
        v.vendorId AS vendorId,
        v.uuid AS vendorUUID,
        0 AS subaccountId,
        NULL AS subaccountUUID,
        cb.uuid AS companyBrandUUID,
        \"Vendor\" AS labelType,
        v.status AS vendorStatus,
        v.labelIdentifier AS vendorType,
        v.isDistributor AS isDistributor,
        CASE WHEN v.status = 'deletion' THEN True ELSE False END AS isDeleted,
        v.lastModifiedAt AS lastModifiedAt

    UNION

    WITH lastCheck_dt
    MATCH (sa:SubAccount)
    WHERE sa.lastModifiedAt > lastCheck_dt 
    RETURN
        sa.uuid AS uuid,
        sa.name AS name,
        v.vendorId AS vendorId,
        v.uuid AS vendorUUID,
        sa.id AS subaccountId,
        sa.uuid AS subaccountUUID,
        cb.uuid AS companyBrandUUID,
        \"Subaccount\" AS labelType,
        NULL AS vendorStatus,
        NULL AS vendorType,
        NULL AS isDistributor,
        sa.isDeleted AS isDeleted,
        sa.lastModifiedAt AS lastModifiedAt
}
RETURN
    uuid,
    name,
    vendorId,
    vendorUUID,
    subaccountId,
    subaccountUUID,
    companyBrandUUID,
    labelType,
    vendorStatus,
    vendorType,
    isDistributor,
    isDeleted,
    lastModifiedAt
;

Currently used versions

Versions