kestra-io / plugin-neo4j

Apache License 2.0
1 stars 2 forks source link

Data Not Inserting into Neo4j Using Kestra's neo4j.Batch Task #84

Open bipurevpn opened 2 months ago

bipurevpn commented 2 months ago

Description:

I'm facing an issue where data is not being inserted into Neo4j when using the io.kestra.plugin.neo4j.Batch task in a Kestra workflow. The task executes without errors, but the Neo4j database remains empty.

Steps to Reproduce:

Set Up Docker Environment: Use the following docker-compose.yml file to set up the environment:

version: "3"

networks:
  default:
    external:
      name: merged_network

  merged_network:
    driver: bridge
    name: merged_network
    ipam:
      config:
        - subnet: 172.27.0.0/24

services:
  postgres_kestra:
    container_name: postgres_kestra
    hostname: postgres
    image: postgres:15.4
    restart: unless-stopped
    environment:
      POSTGRES_DB: kestra
      POSTGRES_USER: kestra
      POSTGRES_PASSWORD: k3str4
    networks:
      merged_network:
        ipv4_address: 172.27.0.3
    ports:
      - "5432:5432"
    volumes:
      - postgres_kestra_data:/var/lib/postgresql/data

  kestra:
    container_name: kestra
    image: kestra/kestra:latest-full
    restart: unless-stopped
    command: server standalone --worker-thread=128
    user: "root"
    networks:
      merged_network:
        ipv4_address: 172.27.0.4
    environment:
      KESTRA_CONFIGURATION: |
        datasources:
          postgres:
            url: jdbc:postgresql://postgres:5432/kestra
            driverClassName: org.postgresql.Driver
            username: kestra
            password: k3str4
        kestra:
          server:
            basic-auth:
              enabled: false
              username: "admin@kestra.io"
              password: kestra
          repository:
            type: postgres
          storage:
            type: local
            local:
              base-path: "/app/storage"
          queue:
            type: postgres
          tasks:
            tmp-dir:
              path: /tmp/kestra-wd/tmp
          url: http://localhost:8080/
    ports:
      - "8090:8080"
      - "8091:8081"
    volumes:
      - kestra_data:/app/storage
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/kestra-wd:/tmp/kestra-wd
    depends_on:
      postgres_kestra:
        condition: service_started

  neo4j:
    container_name: neo4j
    image: neo4j:latest
    restart: unless-stopped
    environment:
      - NEO4J_AUTH=neo4j/test12345678
    networks:
      merged_network:
        ipv4_address: 172.27.0.5
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
      - neo4j_data:/data

  elasticsearch:
    container_name: elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
      - xpack.security.enabled=false  # Disable security
    networks:
      merged_network:
        ipv4_address: 172.27.0.6
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  kibana:
    container_name: kibana
    image: docker.elastic.co/kibana/kibana:8.5.0
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    networks:
      merged_network:
        ipv4_address: 172.27.0.7
    ports:
      - "5601:5601"  # Kibana's web UI port
    depends_on:
      - elasticsearch

volumes:
  postgres_kestra_data:
    driver: local
  kestra_data:
    driver: local
  neo4j_data:
    driver: local
    name: neo4j_data
  es_data:
    driver: local

Create a Kestra Workflow:

id: ELK_To_Neo4j
namespace: ai.team

tasks:
  - id: ELK
    type: io.kestra.plugin.elasticsearch.Search
    description: Extract data from Elasticsearch
    connection:
      headers: []
      hosts:
        - http://172.27.0.6:9200/
    indexes:
      - people
    request:
      query:
        match_all: {}

  - id: "transform_data"
    type: io.kestra.plugin.scripts.python.Commands
    inputFiles:
      data.json: |
        {{ outputs.ELK | json }}
      transform_script.py: |
        from kestra import Kestra
        import json
        import sys

        # Load Elasticsearch data
        with open(sys.argv[1], 'r') as f:
            raw_data = f.read()

        es_data = json.loads(raw_data)
        print("es_data:", json.dumps(es_data))
        transformed_data = []
        for person in es_data.get('rows', []):
            name = person.get('name')
            if not name:
                print(f"Skipping entry with missing name: {person}") 
                continue
            transformed_person = {
                'name': name.upper(),
                'age': person.get('age', 0) + 1
            }
            transformed_data.append(transformed_person)

        print("Filtered transformed_data:", json.dumps(transformed_data))

        output_data = {'props': transformed_data}

        Kestra.outputs({'transformed_data': transformed_data})

        with open('transformed_data.json', 'w') as f:
            json.dump(output_data, f)

    beforeCommands:
      - pip install requests kestra > /dev/null
    commands:
      - python transform_script.py data.json
    outputFiles:
      - transformed_data.json

  - id: "neo4j"
    type: io.kestra.plugin.neo4j.Batch
    description: "Insert transformed data into Neo4j"
    url: bolt://172.27.0.5:7687
    username: neo4j
    password: test12345678
    chunk: 1000
    from: "{{ outputs.transform_data.outputFiles['transformed_data.json'] }}"
    query: |
      UNWIND $props AS row
      WITH row
      WHERE row.name IS NOT NULL
      MERGE (p:Person {name: row.name})
      ON CREATE SET p.age = row.age, p.created_at = timestamp()
      ON MATCH SET p.age = row.age, p.updated_at = timestamp()
      RETURN p.name AS name, p.age AS age, p.created_at, p.updated_at

Expected Behavior:

The neo4j task should insert the transformed data into the Neo4j database, creating or updating nodes with the label Person and the appropriate properties. Actual Behavior: The neo4j task executes without errors but reports: Successfully bulk 1 queries with 0 updated rows

Additional Information:

The neo4j_test task using io.kestra.plugin.neo4j.Query works and can insert data:

- id: "neo4j_test"
  type: io.kestra.plugin.neo4j.Query
  description: "Test Neo4j connectivity"
  url: bolt://172.27.0.5:7687
  username: neo4j
  password: test12345678
  query: |
    CREATE (n:TestNode {name: 'Test', timestamp: timestamp()})
    RETURN n.name AS name, n.timestamp AS timestamp

However, using the Batch task does not insert data into Neo4j.

Attempts to Resolve:

  1. Ensured that the transformed_data.json file is correctly generated and accessible.
  2. Confirmed that the from parameter correctly references the file URI.
  3. Tested the Neo4j connection using the Query task, which works.
  4. Tried different configurations and queries, but the issue persists.
Ben8t commented 2 months ago

Thanks for this full reproducer ! We will try to tackle that one soon 👍