neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

Kafka Connect Source connector problems with AVRO serialization #616

Closed Romsick closed 3 months ago

Romsick commented 8 months ago

Description

When attempting to use the Neo4j Kafka Connect Source connector configured with an AVRO value converter and corresponding schema in Schema Registry, the connector is unable to correctly produce records due to serialization errors.

Expected Behavior (Mandatory)

When the connector is configured as Source in QUERY mode with a valid AVRO schema, the messages should be correctly serialized and produced to the configured topic.

Actual Behavior (Mandatory)

The connector throws an error due to it being unable to parse messages properly with the configured AVRO schema. While testing different configurations, a few issues surfaced:

  1. When the AVRO schema has fields configured as enum, and an incoming message has a valid value for the field, the connector fails with org.apache.avro.AvroTypeException: Not an enum: {EXPECTED_VALUE} for schema.

  2. When disabling the neo4j.enforce.schema option, and having a valid incoming message, the connector fails with java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord.

  3. When one of the returned fields by the Cypher query is null, the connector fails with org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field {FIELD_NAME} cannot be null.

How to Reproduce the Problem

Dataset and configurations

Connector config

{
   "name": "Neo4jSourceConnectorAVRO",
   "config": {
     "topic": "neo4j.test.avro",
     "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "key.converter.schemas.enable": "false",
     "value.converter": "io.confluent.connect.avro.AvroConverter",
     "value.converter.schema.registry.url": "http://schema-registry:8081",
     "value.converter.use.latest.version": "true",
     "value.converter.enhanced.avro.schema.support": "true",
     "value.converter.latest.compatibility.strict": "false",
     "value.converter.auto.register.schemas": "false",
     "value.converter.connect.meta.data": "true",
     "neo4j.server.uri": "bolt://neo4j:7687",
     "neo4j.authentication.basic.username": "neo4j",
     "neo4j.authentication.basic.password": "password",
     "neo4j.streaming.poll.interval.msecs": 5000,
     "neo4j.streaming.from": "LAST_COMMITTED",
     "neo4j.enforce.schema": "true",
     "neo4j.source.query": "MATCH (n:TestNode) WHERE n.lastModifiedAt > datetime({epochMillis: toInteger($lastCheck)}) RETURN n.uuid AS uuid, n.name AS name, n.company AS company, n.numericId AS numericId, n.entityType AS entityType, n.status AS status, n.isDeleted AS isDeleted;"
   }
}

AVRO schema

{
   "type": "record",
   "name": "myNode",
   "namespace": "com.mycompany.neo4j.avro",
   "fields": [
       {
           "name": "uuid",
           "type": {
               "type": "string",
               "logicalType": "uuid"
           },
           "doc": "Mandatory node UUID."
       },
       {
           "name": "name",
           "type": "string",
           "doc": "Mandatory node Name."
       },
       {
           "name": "company",
           "type": ["null", "string"],
           "doc": "Optional node Company."
       },
       {
           "name": "numericId",
           "type": "int"
       },
       {
           "name": "entityType",
           "type": {
               "name": "entityTypeTypes",
               "type": "enum",
               "symbols": [
                   "Company",
                   "Subcompany"
               ]
           },
           "doc": "Mandatory node entityType."
       },
       {
           "name": "status",
           "type": [
               "null",
               {
                   "type": "enum",
                   "name": "statusTypes",
                   "symbols": [
                       "approved",
                       "reviewed",
                       "deleted"
                   ]
               }
           ],
           "doc": "Optional node status."
       },
       {
           "name": "isDeleted",
           "type": "boolean",
           "doc": "Mandatory node isDeleted."
       }
   ]
}

Sample test Cypher to produce records for the connector

CREATE (n:TestNode)
SET
    n.uuid = "3bfef4ba-564e-4ce1-b3af-616651f90aff",
    n.name = "Node Name",
    n.company = "Company Name",
    n.numericId = 10001,
    n.entityType = "Company",
    n.status = "reviewed",
    n.isDeleted = False,
    n.lastModifiedAt = datetime()
RETURN n

Steps (Mandatory)

  1. Setup the environment with Docker as per Quickstart: Neo4j Kafka integration docs
  2. Create a topic and an AVRO schema for it, with the following field types:
    • A mandatory STRING field
    • A mandatory STRING field of uuid logicalType
    • An optional STRING field [null, string]
    • A mandatory INT field
    • A mandatory ENUM field
    • An optional ENUM field (null, [enum1, enum2])
    • A mandatory boolean field
  3. Create a Neo4j connector with the provided configuration
  4. Create or update a TestNode to stream the expected values matching the defined schema

Test cases and stacktraces

This are the results when:

  1. I create a node with all correctly populated properties (like in the provided sample), the Neo4j connector fails with this stacktrace, indicating that the value found Company is not part of the expected ENUMS of Company, Subcompany for the entityType field:

    connect  | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
    connect  |      at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
    connect  |      at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
    connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
    connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
    connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
    connect  |      ... 12 more
    connect  | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
    connect  |      at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
    connect  |      at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
    connect  |      ... 16 more
    connect  | Caused by: org.apache.avro.AvroTypeException: Not an enum: Company for schema: {"type":"enum","name":"entityTypeTypes","namespace":"com.mycompany.neo4j.avro","symbols":["Company","Subcompany"]} in field entityType
    connect  | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
    connect  |      at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
    connect  |      at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
    connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
    connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
    connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
    connect  |      ... 12 more
    connect  | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
    connect  |      at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
    connect  |      at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
    connect  |      ... 16 more
    connect  | Caused by: org.apache.avro.AvroTypeException: Not an enum: Company for schema: {"type":"enum","name":"entityTypeTypes","namespace":"com.mycompany.neo4j.avro","symbols":["Company","Subcompany"]} in field entityType
    connect  |      at org.apache.avro.generic.GenericDatumWriter.addAvroTypeMsg(GenericDatumWriter.java:198)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:231)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:180)
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
    connect  |      ... 18 more
    connect  | Caused by: org.apache.avro.AvroTypeException: Not an enum: Company for schema: {"type":"enum","name":"entityTypeTypes","namespace":"com.mycompany.neo4j.avro","symbols":["Company","Subcompany"]}
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:241)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:134)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
    connect  |      ... 24 more
  2. With the same configuration and same node update, but setting neo4j.enforce.schema to false, it produces a different stack trace and fails:

    connect  | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro :
    connect  |      at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
    connect  |      at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
    connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495)
    connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
    connect  |      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
    connect  |      ... 12 more
    connect  | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
    connect  |      at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
    connect  |      at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
    connect  |      ... 16 more
    connect  | Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.IndexedRecord is in unnamed module of loader 'app')
    connect  |      at org.apache.avro.generic.GenericData.getField(GenericData.java:846)
    connect  |      at org.apache.avro.generic.GenericData.getField(GenericData.java:865)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:219)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
    connect  |      at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:180)
    connect  |      at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
    connect  |      ... 18 more
  3. When I update the property company in the node to null, and neo4j.enforce.schema is true, the connector fails and produces:

    
    org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field company cannot be null.
    connect  |      at streams.kafka.connect.source.Neo4jSourceService.checkError(Neo4jSourceService.kt:131)
    connect  |      at streams.kafka.connect.source.Neo4jSourceService.poll(Neo4jSourceService.kt:139)
    connect  |      at streams.kafka.connect.source.Neo4jSourceTask.poll(Neo4jSourceTask.kt:31)
    connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:470)
    connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:349)
    connect  |      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
    connect  |      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
    connect  |      at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    connect  |      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    connect  |      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    connect  |      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    connect  |      at java.base/java.lang.Thread.run(Thread.java:829)
    connect  | Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: fieldSchema for field company cannot be null.
    connect  |      at org.apache.kafka.connect.data.SchemaBuilder.field(SchemaBuilder.java:327)
    connect  |      at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.schema(ConnectExtensionFunctions.kt:33)
    connect  |      at streams.kafka.connect.utils.ConnectExtensionFunctionsKt.asStruct(ConnectExtensionFunctions.kt:39)
    connect  |      at streams.kafka.connect.source.SourceRecordBuilder.build(SourceRecordBuilder.kt:59)
    connect  |      at streams.kafka.connect.source.Neo4jSourceService.toSourceRecord(Neo4jSourceService.kt:107)
    connect  |      at streams.kafka.connect.source.Neo4jSourceService.access$toSourceRecord(Neo4jSourceService.kt:28)
    connect  |      at streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend$lambda$1(Neo4jSourceService.kt:84)
    connect  |      at org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)
    connect  |      at org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)
    connect  |      at org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)
    connect  |      at org.neo4j.driver.internal.InternalSession.readTransaction(InternalSession.java:103)
    connect  |      at streams.kafka.connect.source.Neo4jSourceService$job$1.invokeSuspend(Neo4jSourceService.kt:79)
    connect  |      at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    connect  |      at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    connect  |      at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
    connect  |      at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
    connect  |      at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    connect  |      at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    connect  |      at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    connect  |      at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

4. And with the same update as in the previous point, but with `neo4j.enforce.schema` set to `false`, a different error happens:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic neo4j.test.avro : connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93) connect | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64) connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$9(AbstractWorkerSourceTask.java:495) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183) connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217) connect | ... 12 more connect | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166) connect | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153) connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86) connect | ... 16 more connect | Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.IndexedRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.IndexedRecord is in unnamed module of loader 'app') connect | at org.apache.avro.generic.GenericData.getField(GenericData.java:846) connect | at org.apache.avro.generic.GenericData.getField(GenericData.java:865) connect | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:219) connect | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) connect | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) connect | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:180) connect | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156) connect | ... 18 more



## Specifications and Versions (Mandatory)
The initial issues with the AVRO schema were found with the following specs:

- Neo4j Enterprise edition on version 4.4.19
- Neo4j Kafka Connect connector on version 5.0.0-oss over `confluentinc/cp-kafka-connect-base:7.3.1`
- Schema Registry on version 7.0.0
- Kafka cluster on version 3.5.1

To replicate the issue and discard older version issues, I used the config provided above through the docker-compose file provided:
 - Neo4j Enterprise edition on latest version (5.16.0)
 - Neo4j Kafka Connect connector on latest version (5.0.3) from confluent hub.
 - Schema Registry on version 7.3.0

I have also tested the schema separately with other connectors and with a custom Producer using the schema without any issues.
venikkin commented 8 months ago

Hi @Romsick. Thank you for flagging it. Unfortunately, Kafka Connect Neo4j Source Connector doesn’t support the Avro enum type. I would recommend converting them to string and implementing validation on the consumer side.

Regarding nullable properties and the error you receive on null company property. Neo4j is a schema-less database. Schema is derived based on individual messages and thus can conflict if a node or relationship within the same label or type has a different set of properties. I would recommend to ensure all the nodes have the same set of properties. For example, in your example, you could treat null properties as empty strings and filter out them on the consumer side. You could also leverage existence and type constraints. Although some of them might need you to upgrade to the later Neo4j Enterprise version. Alternatively, you can try a schema-less format, that does not require a Schema Registry like JSON.

Edit: The main reason why Kafka Connect Neo4j Source Connector doesn’t support the Avro enum type because Kafka Connect schema doesn't support Enum type.