mindwm / mindwm-gitops

6 stars 5 forks source link

Change format and normalize events received from Neo4j kafka change-data-capture (cdc) topic #14

Closed metacoma closed 3 months ago

metacoma commented 3 months ago

CDC new node event

☁️   cloudevents.Event                                                                       
Context Attributes,
  specversion: 1.0
  type: dev.knative.kafka.event
  source: /apis/v1/namespaces/context-team-a/kafkasources/context-broker-team-a-source#team-a-cdc-topic
  subject: partition:0#42
  id: partition:0/offset:42
  time: 2024-06-10T10:02:18.314Z
Extensions,
  key: "70-0"
  knativearrivaltime: 2024-06-10T10:02:18.554550543Z
  knativekafkaoffset: 42
  knativekafkapartition: 0
  partitionkey: "70-0"
Data,

Payload:

{
  "meta": {
    "timestamp": 1718013738174,
    "username": "neo4j",
    "txId": 70,
    "txEventId": 0,
    "txEventsCount": 1,
    "operation": "created",
    "source": {
      "hostname": "team-a-neo4j-0"
    }
  },
  "payload": {
    "id": "22",
    "before": null,
    "after": {
      "properties": {
        "output": "\n",
        "user_input": "pwd",
        "time": 1718013738.170958,
        "uuid": "de7521af-c55d-4df4-861d-4a3a369501b8",
        "ps1": "bebebeko@mcmp4:~/poc-mindwm-dev$"
      },
      "labels": [
        "IoDocument"
      ]
    },
    "type": "node"
  },
  "schema": {
    "properties": {
      "output": "String",
      "user_input": "String",
      "time": "Double",
      "uuid": "String",
      "ps1": "String"
    },
    "constraints": []
  }
}

CDC relationship new event

   cloudevents.Event
Context Attributes,
  specversion: 1.0
  type: dev.knative.kafka.event
  source: /apis/v1/namespaces/context-team-a/kafkasources/context-broker-team-a-source#team-a-cdc-topic
  subject: partition:0#43
  id: partition:0/offset:43
  time: 2024-06-10T10:02:18.413Z
Extensions,
  key: "71-0"
  knativearrivaltime: 2024-06-10T10:02:18.56316508Z
  knativekafkaoffset: 43
  knativekafkapartition: 0
  partitionkey: "71-0"

payload

{
  "meta": {
    "timestamp": 1718013738325,
    "username": "neo4j",
    "txId": 71,
    "txEventId": 0,
    "txEventsCount": 1,
    "operation": "created",
    "source": {
      "hostname": "team-a-neo4j-0"
    }
  },
  "payload": {
    "id": "20",
    "start": {
      "id": "13",
      "labels": [
        "TmuxPane"
      ],
      "ids": {}
    },
    "end": {
      "id": "22",
      "labels": [
        "IoDocument"
      ],
      "ids": {}
    },
    "before": null,
    "after": {
      "properties": {}
    },
    "label": "HAS_IO_DOCUMENT",
    "type": "relationship"
  },
  "schema": {
    "properties": {},
    "constraints": []
  }
}
metacoma commented 3 months ago

MVP code (values for vector/vector helm chart)

customConfig:
  data_dir: /vector-data-dir
  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: false
  sources:
    http_server:
      address: 0.0.0.0:6000
      encoding: json
      path: /event
      type: http_server
#NAME                           TOPICS                 BOOTSTRAPSERVERS              READY   REASON   AGE
#context-broker-team-a-source   ["team-a-cdc-topic"]   ["neo4j-cdc.redpanda:9093"]   True             23h
    neo4j-cdc:
      type: kafka
      bootstrap_servers: "neo4j-cdc.redpanda:9093"
      group_id: team-a-vector
      #key_field: message_key
      topics:
        - team-a-cdc-topic
      decoding:
        codec: json
      librdkafka_options:
        api.version.request: 'true'
        partition.assignment.strategy: roundrobin
        log_level: '6'
        session.timeout.ms: '10000'
        max.poll.interval.ms: '300000'
        socket.nagle.disable: 'false'
        socket.keepalive.enable: 'true'
        socket.max.fails: '3'
        fetch.min.bytes: '1000000'
  sinks:
    stdout:
      type: console
      inputs: [http_server,neo4j-cdc]
      encoding:
        codec: json

    context-broker:
      type: http
      inputs: [http_server,neo4j-cdc]
      encoding:
        codec: json
      uri: http://broker-ingress.knative-eventing.svc.cluster.local/context-team-a/context-broker-team-a
      request:
        concurrency: 1
        headers:
          "Content-Type": "application/json"
          "Ce-Id": "1234"
          "Ce-Specversion": "1.0"
          "Ce-Type": "{{`{{ .meta.operation }}`}}" # XXX verctor doesn't support this
          "Ce-Source": "/mycontext"

image:
  repository: timberio/vector
  tag: 0.26.0-alpine
service:
  enabled: true
  ports:
    - name: aggregator
      port: 31399
      protocol: TCP
      targetPort: 31399
    - name: http-server
      port: 6000
      protocol: TCP
      targetPort: 31398
  type: ClusterIP
serviceHeadless:
  enabled: false

image:
  repository: timberio/vector
  #tag: 0.26.0-alpine
  tag: 0.38.0-alpine
service:
  enabled: true
  ports:
    - name: aggregator
      port: 31399
      protocol: TCP
      targetPort: 31399
    - name: http-server
      port: 6000
      protocol: TCP
      targetPort: 31398
  type: ClusterIP
serviceHeadless:
  enabled: false

But unfortunately, vector doesn't support templating inside http.request.header values, more details here: https://github.com/vectordotdev/vector/issues/201

metacoma commented 3 months ago

A simple data converter between Neo4j CDC Kafka topic data format and Knative broker data format (CloudEvent)

customConfig:
  data_dir: /vector-data-dir
  transforms:
    cloudevent:
      type: "lua"
      version: "2" 
      inputs:
        - neo4j-cdc
      hooks:
        process: process
      source: |
        function process(event, emit)
          local cloudevent = {}
          cloudevent.log = { 
            id = "4051f0cc-2761-11ef-b43c-3b4c0580692a",
            data = event.log,
            source = "graph",
            specversion = "1.0",
            datacontenttype = "application/json",
            type = "created",
            --type = event.log["meta"]["operations"],
            --time = os.date("!%Y-%m-%dT%H:%M:%SZ", os.time()),

            traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
            tracestate = "rojo=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01,congo=lZWRzIHRoNhcm5hbCBwbGVhc3VyZS4"
          }
          emit(cloudevent)
        end

  api:
    enabled: true
    address: 127.0.0.1:8686
    playground: false
  sources:
    http_server:
      address: 0.0.0.0:6000
      encoding: json
      path: /event
      type: http_server
#NAME                           TOPICS                 BOOTSTRAPSERVERS              READY   REASON   AGE
#context-broker-team-a-source   ["team-a-cdc-topic"]   ["neo4j-cdc.redpanda:9093"]   True             23h
    neo4j-cdc:
      type: kafka
      bootstrap_servers: "neo4j-cdc.redpanda:9093"
      group_id: team-a-vector
      #key_field: message_key
      topics:
        - team-a-cdc-topic
      decoding:
        codec: json
      librdkafka_options:
        api.version.request: 'true'
        partition.assignment.strategy: roundrobin
        log_level: '6' 
        session.timeout.ms: '10000'
        max.poll.interval.ms: '300000'
        socket.nagle.disable: 'false'
        socket.keepalive.enable: 'true'
        socket.max.fails: '3' 
        fetch.min.bytes: '1000000'
  sinks:
    stdout:
      type: console
      inputs: [cloudevent]
      encoding:
        codec: json

    test:
      type: http
      inputs: [cloudevent]
      encoding:
        codec: json
      method: post

      batch:
        max_size: 1
        max_events: 1
      framing:
        method: "newline_delimited"
      #uri: http://broker-ingress.knative-eventing.svc.cluster.local/context-team-a/context-broker-team-a
      uri: https://knative.requestcatcher.com/test 
      request:
        concurrency: 1
        headers:
          "Content-Type": "application/cloudevents+json"

    context-broker:
      type: http
      inputs: [cloudevent]
      encoding:
        codec: json
      method: post
      batch:
        max_size: 1
        max_events: 1
      framing:
        method: "newline_delimited"
      uri: http://broker-ingress.knative-eventing.svc.cluster.local/context-team-a/context-broker-team-a
      request:
        concurrency: 1
        headers:
          "Content-Type": "application/cloudevents+json"