neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

( Impact on BC/DR - Data loss ) - High priority, Nodes with field value is 2D Point will throw exception and cannot be successfully sink to Neo4j #611

Open yhhongyang opened 8 months ago

yhhongyang commented 8 months ago

Description

In the source instance, create a node with a field value is 2D point, it will generate a Kafka payload like this

{
    "payload":{
        "id":"11",
        "before":null,
        "after":{
            "properties":{
                "address":{
                    "crs":"wgs-84",
                    "latitude":56.7,
                    "longitude":12.78,
                    "height":null
                }
            },
            "labels":[
                "Location"
            ]
        },
        "type":"node"
    },
    "schema":{
        "properties":{
            "address":"PointValue"
        },
        "constraints":[

        ]
    }
}

As the value of the height is null, in the sink instance, null values cannot be deserialized into correct PointValue, it will throw exception like this

2023-12-22 07:58:36.476+0000 INFO  [neo4j/2b281484] [Sink] Registering the Streams Sink procedures
org.neo4j.exceptions.InvalidArgumentException: Cannot assign NO_VALUE to field height
    at org.neo4j.values.storable.PointValue$PointBuilder.assignFloatingPoint(PointValue.java:667)
    at org.neo4j.values.storable.PointValue$PointBuilder.assign(PointValue.java:610)
    at org.neo4j.values.storable.PointValue.lambda$fromMap$0(PointValue.java:413)
    at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:127)
    at org.neo4j.values.storable.PointValue.fromMap(PointValue.java:413)
    at streams.utils.StreamsTransactionEventDeserializer.convertPoints(JSONUtils.kt:239)
    at streams.utils.StreamsTransactionEventDeserializer.deserialize(JSONUtils.kt:226)
    at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:200)
    at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:181)
    at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:4444)
    at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:4390)
    at streams.utils.JSONUtils.asStreamsTransactionEvent(JSONUtils.kt:388)
    at streams.utils.SchemaUtils.toStreamsTransactionEvent(SchemaUtils.kt:36)
    at streams.service.sink.strategy.SourceIdIngestionStrategy.mergeNodeEvents(SourceIdIngestionStrategy.kt:72)
        .......

Maybe in the source side, if the points are in 2D, the Kafka payload should not include height field.

Expected Behavior (Mandatory)

Nodes that has a field whose value is 2D point, can be successfully sinked to another Neo4j instance

Actual Behavior (Mandatory)

Currently, in the sink instance, it will throw this kind of exceptions when it handle kafka event that contains 2D point

to missing (therefore NULL) value for creator parameter start which is a non-nullable type
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: streams.events.RelationshipPayload["start"]), key="431-0", value={"meta":{"timestamp":1703232298105,"username":"neo4j","txId":431,"txEventId":0,"txEventsCount":1,"operation":"created","source":{"hostname":"dhcp-9-245-199-179.e2y-cn.ibmmobiledemo.com"}},"payload":{", executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
    at streams.service.errors.KafkaErrorService.report(KafkaErrorService.kt:37) ~[neo4j-streams-local-4.1.3.jar:?]
    at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:98) ~[neo4j-streams-local-4.1.3.jar:?]
    at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:89) ~[neo4j-streams-local-4.1.3.jar:?]
    at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:119) ~[neo4j-streams-local-4.1.3.jar:?]
    at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:163) [neo4j-streams-local-4.1.3.jar:?]
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697) [neo4j-streams-local-4.1.3.jar:?]
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684) [neo4j-streams-local-4.1.3.jar:?]

The root cause of this error is

2023-12-22 07:58:36.476+0000 INFO  [neo4j/2b281484] [Sink] Registering the Streams Sink procedures
org.neo4j.exceptions.InvalidArgumentException: Cannot assign NO_VALUE to field height
    at org.neo4j.values.storable.PointValue$PointBuilder.assignFloatingPoint(PointValue.java:667)
    at org.neo4j.values.storable.PointValue$PointBuilder.assign(PointValue.java:610)
    at org.neo4j.values.storable.PointValue.lambda$fromMap$0(PointValue.java:413)
    at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:127)
    at org.neo4j.values.storable.PointValue.fromMap(PointValue.java:413)
    at streams.utils.StreamsTransactionEventDeserializer.convertPoints(JSONUtils.kt:239)
    at streams.utils.StreamsTransactionEventDeserializer.deserialize(JSONUtils.kt:226)
    at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:200)
    at streams.utils.StreamsTransactionNodeEventDeserializer.deserialize(JSONUtils.kt:181)
    at com.fasterxml.jackson.databind.ObjectMapper._convert(ObjectMapper.java:4444)
    at com.fasterxml.jackson.databind.ObjectMapper.convertValue(ObjectMapper.java:4390)
    at streams.utils.JSONUtils.asStreamsTransactionEvent(JSONUtils.kt:388)
    at streams.utils.SchemaUtils.toStreamsTransactionEvent(SchemaUtils.kt:36)
    at streams.service.sink.strategy.SourceIdIngestionStrategy.mergeNodeEvents(SourceIdIngestionStrategy.kt:72)
    at streams.service.StreamsSinkService.writeWithStrategy(StreamsSinkService.kt:32)
    at streams.service.StreamsSinkService.writeForTopic(StreamsSinkService.kt:40)
    at streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:167)
    at streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:163)
    at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:95)
    at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:89)
    at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:119)
    at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:163)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
    at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Currently used versions

Versions

yhhongyang commented 8 months ago

Possible solution: Maybe add more data structures for PointValue

data class StreamsPointCartesian(override val crs: String, val x: Double, val y: Double): StreamsPoint()
data class StreamsPointCartesian3D(override val crs: String, val x: Double, val y: Double, val z: Double): StreamsPoint()
data class StreamsPointWgs(override val crs: String, val latitude: Double, val longitude: Double): StreamsPoint()
data class StreamsPointWgs3D(override val crs: String, val latitude: Double, val longitude: Double, val height: Double): StreamsPoint()