neo4j-contrib / neo4j-streams

Neo4j Kafka Connector
https://neo4j.com/docs/kafka
Apache License 2.0
173 stars 71 forks source link

How to Send data from neo4j to kafka with some filtered data using neo4j streams? #386

Open vishnu2497 opened 3 years ago

vishnu2497 commented 3 years ago

Hi Guys,

i am trying to send the data from neo4j to kafka using neo4j Streams.

streams.source.enabled=true streams.sink.enabled=false kafka.zookeeper.connect=localhost:2181 kafka.bootstrap.servers=localhost:9092 kafka.acks=1 kafka.num.partitions=1 kafka.retries=2 kafka.batch.size=16384 kafka.buffer.memory=33554432 kafka.reindex.batch.size=1000 kafka.session.timeout.ms=15000 kafka.connection.timeout.ms=10000 kafka.replication=1 kafka.linger.ms=1 kafka.transactional.id= kafka.topic.discovery.polling.interval=5000 streams.source.topic.nodes.nodeatest=NodeA{*} streams.source.schema.polling.interval=5000 streams.source.topic.nodes.nodedtest=NodeD{*}

but i have one scenario

if in NodeA based on the property i need to send the data to kafka NodeA{name:a,validRecord:true} NodeA{name:b,validRecord:false}

i need to send only the validRecord=true records

is it possible to perform this ?

if yes can you explain how to perform that?

conker84 commented 3 years ago

hi @vishnu2497 we don't allow to do the check based on property value but you can change your data model in order to add a label which has the same function:

ValidRecord{...........}

conker84 commented 3 years ago

So you're node will have NodeA:ValidRecord as labels and the one which must not be pushed have only NodeA as label

vishnu2497 commented 3 years ago

Thanks a lot @conker84

it helps..

i will try and update....

conker84 commented 3 years ago

@vishnu2497 any update? please lemme know if we can close this

vishnu2497 commented 3 years ago

i think it is not working @conker84

streams.source.topic.nodes.nodegtest=NodeG:validone

create(n:NodeG{name:'dfdsfdfffgfd',aaa:5444})

{
  "meta": {
    "timestamp": 1607687761622,
    "username": "neo4j",
    "txId": 57,
    "txEventId": 0,
    "txEventsCount": 1,
    "operation": "created",
    "source": {
      "hostname": "NSTN-DT-064"
    }
  },
  "payload": {
    "id": "19",
    "before": null,
    "after": {
      "properties": {
        "aaa": 5444,
        "name": "dfdsfdfffgfd"
      },
      "labels": [
        "NodeG"
      ]
    },
    "type": "node"
  },
  "schema": {
    "properties": {
      "aaa": "Long",
      "name": "String"
    },
    "constraints": []
  }
}
conker84 commented 3 years ago

please try with streams.source.topic.nodes.nodegtest=validone

vishnu2497 commented 3 years ago

Thanks @conker84

it is working...

But i need to work on this scenario.

streams.source.topic.nodes.nodevalidtest=validtwo

create(n:Member:validtwo{name:'check 1',aaa:2});
create(n:Member:validtwo{name:'check 2',aaa:3});

so Assume after this i am going to delete the label from the 2nd statement by using

match(n:Member) where n.aaa=3 remove n:validtwo return n

so we need to capture this UPDATE data right.

if we don't update in kafka only old data is available so if we proceed with that problem will occur right.

so is there any separate strategy available to handle this situation?