damavis / airflow-hop-plugin

Apache Hop plugin for Apache Airflow - Orquestate Apache Hop pipelines and workflows from Airflow
Apache License 2.0
11 stars 6 forks source link

Issue running sub workflows and sub pipelines when the project name is different from default #29

Open epie-godfred opened 9 months ago

epie-godfred commented 9 months ago

Hi, we have been having some issues with the plugin when trying to run the workflows created with project names different from the "default" project which comes with Apache HOP, mostly if this workflows have sub workflows and pipelines. To illustrate the problem I used the provided example workflow and pipeline located here, below are the different configurations:

AIRFLOW

docker-compose.yml


  # Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.6.0
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3.8'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  #image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0}
  build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    # yamllint disable rule:line-length
    # Use simple http server on scheduler for health checks
    # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
    # yamllint enable rule:line-length
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
    # for other purpose (development, test and especially production usage) build/extend Airflow image.
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - /var/run/docker.sock:/var/run/docker.sock
    - HOP_CLENT_HOME:/home/hop
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    # networks:
    #   - airflowhop
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    # networks:
    #   - airflowhop
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    # networks:
    #   - airflowhop
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    # networks:
    #   - airflowhop
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-admin}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    # networks:
    #   - airflowhop
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  docker-socket-proxy:
    image: tecnativa/docker-socket-proxy:0.1.1
    environment:
      CONTAINERS: 1
      IMAGES: 1
      AUTH: 1
      POST: 1
    privileged: true
    # networks:
    #   - airflowhop
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock:ro
    restart: always

volumes:
  postgres-db-volume:

# networks:
#   airflowhop:
#     driver: "bridge"

make sure to replace ${HOP_CLENT_HOME} with the path to your HOP_CLIENT HOME.

Airflow - HOP SERVER Connection

DAGS

The dag used is the same as dag provided here simply commented out the first two pipelines

# -*- coding: utf-8 -*-
# Copyright 2022 Aneior Studio, SL
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow import DAG
from datetime import datetime

from airflow_hop.operators import HopPipelineOperator
from airflow_hop.operators import HopWorkflowOperator

with DAG('airflow_plugin_sample_dag', start_date=datetime(2023,12,18),
        schedule_interval='@daily', catchup=False) as dag:

    # fake_users = HopPipelineOperator(
    #     task_id='fake_users',
    #     pipeline='pipelines/fake-data-generate-person-record.hpl',
    #     pipe_config='remote hop server',
    #     project_name='default',
    #     log_level='Basic')

    # get_param = HopPipelineOperator(
    #     task_id='get_param',
    #     pipeline='pipelines/get_param.hpl',
    #     pipe_config='remote hop server',
    #     project_name='default',
    #     log_level='Basic',
    #     params={'DATE':'{{ ds }}'})

    work_test = HopWorkflowOperator(
        task_id='work_test',
        workflow='workflows/workflowTest.hwf',
        project_name='airflow_plugin',
        log_level='Basic',
        params={'DATE':'{{ ds }}'})

    #fake_users >> get_param >> work_test

    work_test

APACHE HOP CLIENT

Installed version 2.6 which can be found here

CONFIG FOLDER DETAILS

hop_config.json


{
  "variables" : [ {
    "name" : "HOP_MAX_LOG_SIZE_IN_LINES",
    "value" : "0",
    "description" : "The maximum number of log lines that are kept internally by Hop. Set to 0 to keep all rows (default)"
  }, {
    "name" : "HOP_MAX_LOG_TIMEOUT_IN_MINUTES",
    "value" : "1440",
    "description" : "The maximum age (in minutes) of a log line while being kept internally by Hop. Set to 0 to keep all rows indefinitely (default)"
  }, {
    "name" : "HOP_LOG_TAB_REFRESH_DELAY",
    "value" : "1000",
    "description" : "The hop log tab refresh delay."
  }, {
    "name" : "HOP_LOG_TAB_REFRESH_PERIOD",
    "value" : "1000",
    "description" : "The hop log tab refresh period."
  }, {
    "name" : "HOP_USE_NATIVE_FILE_DIALOG",
    "value" : "N",
    "description" : "Set this value to 'Y' if you want to use the system file open/save dialog when browsing files."
  }, {
    "name" : "HOP_LOG_SIZE_LIMIT",
    "value" : "0",
    "description" : "The log size limit for all pipelines and workflows that don't have the \"log size limit\" property set in their respective properties."
  }, {
    "name" : "HOP_SERVER_DETECTION_TIMER",
    "value" : "",
    "description" : "The name of the variable that defines the timer used for detecting server nodes"
  }, {
    "name" : "HOP_EMPTY_STRING_DIFFERS_FROM_NULL",
    "value" : "N",
    "description" : "NULL vs Empty String. If this setting is set to 'Y', an empty string and null are different. Otherwise they are not"
  }, {
    "name" : "HOP_LENIENT_STRING_TO_NUMBER_CONVERSION",
    "value" : "N",
    "description" : "System wide flag to allow lenient string to number conversion for backward compatibility. If this setting is set to 'Y', an string starting with digits will be converted successfully into a number. (example: 192.168.1.1 will be converted into 192 or 192.168 or 192168 depending on the decimal and grouping symbol). The default (N) will be to throw an error if non-numeric symbols are found in the string."
  }, {
    "name" : "HOP_SYSTEM_HOSTNAME",
    "value" : "",
    "description" : "You can use this variable to speed up hostname lookup. Hostname lookup is performed by Hop so that it is capable of logging the server on which a workflow or pipeline is executed."
  }, {
    "name" : "HOP_SERVER_OBJECT_TIMEOUT_MINUTES",
    "value" : "1440",
    "description" : "This project variable will set a time-out after which waiting, completed or stopped pipelines and workflows will be automatically cleaned up. The default value is 1440 (one day)."
  }, {
    "name" : "HOP_TRANSFORM_PERFORMANCE_SNAPSHOT_LIMIT",
    "value" : "0",
    "description" : "The maximum number of transform performance snapshots to keep in memory. Set to 0 to keep all snapshots indefinitely (default)"
  }, {
    "name" : "HOP_MAX_WORKFLOW_TRACKER_SIZE",
    "value" : "5000",
    "description" : "The maximum age (in minutes) of a log line while being kept internally by Hop. Set to 0 to keep all rows indefinitely (default)"
  }, {
    "name" : "HOP_MAX_ACTIONS_LOGGED",
    "value" : "5000",
    "description" : "The maximum number of action results kept in memory for logging purposes."
  }, {
    "name" : "HOP_MAX_LOGGING_REGISTRY_SIZE",
    "value" : "10000",
    "description" : "The maximum number of logging registry entries kept in memory for logging purposes"
  }, {
    "name" : "HOP_PLUGIN_CLASSES",
    "value" : "",
    "description" : "A comma delimited list of classes to scan for plugin annotations"
  }, {
    "name" : "HOP_PIPELINE_ROWSET_SIZE",
    "value" : "",
    "description" : "Name of the environment variable that contains the size of the pipeline rowset size. This overwrites values that you set pipeline settings"
  }, {
    "name" : "HOP_PASSWORD_ENCODER_PLUGIN",
    "value" : "Hop",
    "description" : "Specifies the password encoder plugin to use by ID (Hop is the default)."
  }, {
    "name" : "HOP_ROWSET_GET_TIMEOUT",
    "value" : "50",
    "description" : "The name of the variable that optionally contains an alternative rowset get timeout (in ms). This only makes a difference for extremely short lived pipelines."
  }, {
    "name" : "HOP_ROWSET_PUT_TIMEOUT",
    "value" : "50",
    "description" : "The name of the variable that optionally contains an alternative rowset put timeout (in ms). This only makes a difference for extremely short lived pipelines."
  }, {
    "name" : "HOP_BATCHING_ROWSET",
    "value" : "N",
    "description" : "Set this variable to 'Y' if you want to test a more efficient batching row set."
  }, {
    "name" : "HOP_FILE_OUTPUT_MAX_STREAM_COUNT",
    "value" : "1024",
    "description" : "This project variable is used by the Text File Output transform. It defines the max number of simultaneously open files within the transform. The transform will close/reopen files as necessary to insure the max is not exceeded"
  }, {
    "name" : "HOP_FILE_OUTPUT_MAX_STREAM_LIFE",
    "value" : "0",
    "description" : "This project variable is used by the Text File Output transform. It defines the max number of milliseconds between flushes of files opened by the transform."
  }, {
    "name" : "HOP_DISABLE_CONSOLE_LOGGING",
    "value" : "N",
    "description" : "Set this variable to 'Y' to disable standard Hop logging to the console. (stdout)"
  }, {
    "name" : "HOP_DEFAULT_NUMBER_FORMAT",
    "value" : "",
    "description" : "The name of the variable containing an alternative default number format"
  }, {
    "name" : "HOP_DEFAULT_BIGNUMBER_FORMAT",
    "value" : "",
    "description" : "The name of the variable containing an alternative default bignumber format"
  }, {
    "name" : "HOP_DEFAULT_INTEGER_FORMAT",
    "value" : "",
    "description" : "The name of the variable containing an alternative default integer format"
  }, {
    "name" : "HOP_DEFAULT_DATE_FORMAT",
    "value" : "",
    "description" : "The name of the variable containing an alternative default date format"
  }, {
    "name" : "HOP_AGGREGATION_MIN_NULL_IS_VALUED",
    "value" : "N",
    "description" : "Set this variable to 'Y' to set the minimum to NULL if NULL is within an aggregate. Otherwise by default NULL is ignored by the MIN aggregate and MIN is set to the minimum value that is not NULL. See also the variable HOP_AGGREGATION_ALL_NULLS_ARE_ZERO."
  }, {
    "name" : "HOP_AGGREGATION_ALL_NULLS_ARE_ZERO",
    "value" : "N",
    "description" : "Set this variable to 'Y' to return 0 when all values within an aggregate are NULL. Otherwise by default a NULL is returned when all values are NULL."
  }, {
    "name" : "HOP_DEFAULT_TIMESTAMP_FORMAT",
    "value" : "",
    "description" : "The name of the variable containing an alternative default timestamp format"
  }, {
    "name" : "HOP_SPLIT_FIELDS_REMOVE_ENCLOSURE",
    "value" : "N",
    "description" : "Set this variable to 'N' to preserve enclosure symbol after splitting the string in the Split fields transform. Changing it to true will remove first and last enclosure symbol from the resulting string chunks."
  }, {
    "name" : "HOP_ALLOW_EMPTY_FIELD_NAMES_AND_TYPES",
    "value" : "N",
    "description" : "Set this variable to 'Y' to allow your pipeline to pass 'null' fields and/or empty types."
  }, {
    "name" : "HOP_GLOBAL_LOG_VARIABLES_CLEAR_ON_EXPORT",
    "value" : "N",
    "description" : "Set this variable to 'N' to preserve global log variables defined in pipeline / workflow Properties -> Log panel. Changing it to 'Y' will clear it when export pipeline / workflow."
  }, {
    "name" : "HOP_LOG_MARK_MAPPINGS",
    "value" : "N",
    "description" : "Set this variable to 'Y' to precede transform/action name in log lines with the complete path to the transform/action. Useful to perfectly identify where a problem happened in our process."
  }, {
    "name" : "HOP_SERVER_JETTY_ACCEPTORS",
    "value" : "",
    "description" : "A variable to configure jetty option: acceptors for Hop server"
  }, {
    "name" : "HOP_SERVER_JETTY_ACCEPT_QUEUE_SIZE",
    "value" : "",
    "description" : "A variable to configure jetty option: acceptQueueSize for Hop server"
  }, {
    "name" : "HOP_SERVER_JETTY_RES_MAX_IDLE_TIME",
    "value" : "",
    "description" : "A variable to configure jetty option: lowResourcesMaxIdleTime for Hop server"
  }, {
    "name" : "HOP_DEFAULT_SERVLET_ENCODING",
    "value" : "",
    "description" : "Defines the default encoding for servlets, leave it empty to use Java default encoding"
  }, {
    "name" : "HOP_SERVER_REFRESH_STATUS",
    "value" : "",
    "description" : "A variable to configure refresh for Hop server workflow/pipeline status page"
  }, {
    "name" : "HOP_MAX_TAB_LENGTH",
    "value" : "",
    "description" : "A variable to configure Tab size"
  }, {
    "name" : "HOP_ZIP_MIN_INFLATE_RATIO",
    "value" : "",
    "description" : "A variable to configure the minimum allowed ratio between de- and inflated bytes to detect a zipbomb"
  }, {
    "name" : "HOP_ZIP_MIN_INFLATE_RATIO_DEFAULT_STRING",
    "value" : "",
    "description" : ""
  }, {
    "name" : "HOP_ZIP_MAX_ENTRY_SIZE",
    "value" : "",
    "description" : "A variable to configure the maximum file size of a single zip entry"
  }, {
    "name" : "HOP_ZIP_MAX_ENTRY_SIZE_DEFAULT_STRING",
    "value" : "",
    "description" : ""
  }, {
    "name" : "HOP_ZIP_MAX_TEXT_SIZE",
    "value" : "",
    "description" : "A variable to configure the maximum number of characters of text that are extracted before an exception is thrown during extracting text from documents"
  }, {
    "name" : "HOP_ZIP_MAX_TEXT_SIZE_DEFAULT_STRING",
    "value" : "",
    "description" : ""
  }, {
    "name" : "HOP_LICENSE_HEADER_FILE",
    "value" : "",
    "description" : "This is the name of the variable which when set should contains the path to a file which will be included in the serialization of pipelines and workflows"
  }, {
    "name" : "HOP_JSON_INPUT_INCLUDE_NULLS",
    "value" : "Y",
    "description" : "Name of the variable to set so that Nulls are considered while parsing JSON files. If HOP_JSON_INPUT_INCLUDE_NULLS is \"Y\" then nulls will be included (default behavior) otherwise they will not be included"
  }, {
    "name" : "HOP_CONTEXT_DIALOG_STRICT_SEARCH",
    "value" : "N",
    "description" : "This variable influences how the search is done in the context dialog, when set to Y it will do a strict search (Needed for automated UI testing)"
  }, {
    "name" : "HOP_DEFAULT_BUFFER_POLLING_WAITTIME",
    "value" : "20",
    "description" : "This is the default polling frequency for the transforms input buffer (in ms)"
  }, {
    "name" : "HOP_S3_VFS_PART_SIZE",
    "value" : "",
    "description" : ""
  }, {
    "name" : "NEO4J_LOGGING_CONNECTION",
    "value" : "",
    "description" : "Set this variable to the name of an existing Neo4j connection to enable execution logging to a Neo4j database."
  } ],
  "googleCloud" : {
    "serviceAccountKeyFile" : null
  },
  "LocaleDefault" : "en_US",
  "dropbox" : {
    "accessToken" : null
  },
  "guiProperties" : {
    "FontFixedSize" : "9",
    "MaxUndo" : "100",
    "DarkMode" : "N",
    "FontNoteSize" : "9",
    "ShowCanvasGrid" : "N",
    "MiddlePct" : "35",
    "HideViewport" : "N",
    "FontFixedStyle" : "0",
    "FontNoteName" : "Segoe UI",
    "ContextDialogFixedWidth" : "Y",
    "FontFixedName" : "Consolas",
    "HideMenuBar" : "Y",
    "FontGraphStyle" : "0",
    "FontDefaultSize" : "9",
    "FontGraphSize" : "9",
    "IconSize" : "32",
    "GlobalZoomFactor" : "1.0",
    "FontNoteStyle" : "0",
    "FontGraphName" : "Segoe UI",
    "ShowTableViewToolbar" : "Y",
    "FontDefaultName" : "Segoe UI",
    "FontDefaultStyle" : "0",
    "CanvasGridSize" : "16",
    "ContextDialogShowCategories" : "Y",
    "WorkflowDialogStyle" : "RESIZE,MAX,MIN",
    "LineWidth" : "1",
    "AutoSave" : "Y",
    "CustomParameterUsageInfoParameter" : "N",
    "CustomParameterSetVariableUsageWarning" : "Y"
  },
  "googleDrive" : {
    "credentialsFile" : null,
    "tokensFolder" : null
  },
  "projectsConfig" : {
    "enabled" : true,
    "projectMandatory" : true,
    "environmentMandatory" : false,
    "environmentsForActiveProject" : false,
    "defaultProject" : "default",
    "defaultEnvironment" : null,
    "standardParentProject" : "default",
    "standardProjectsFolder" : null,
    "defaultProjectConfigFile" : "project-config.json",
    "projectConfigurations" : [ {
      "projectName" : "default",
      "projectHome" : "config/projects/default",
      "configFilename" : "project-config.json"
    }, {
      "projectName" : "samples",
      "projectHome" : "config/projects/samples",
      "configFilename" : "project-config.json"
    }, {
      "projectName" : "airflow_plugin",
      "projectHome" : "config/projects/airflow_plugin",
      "configFilename" : "project-config.json"
    } ],
    "lifecycleEnvironments" : [ {
      "name" : "plugin_dev",
      "purpose" : "Testing",
      "projectName" : "airflow_plugin",
      "configurationFiles" : [ "config/projects/dev.json" ]
    } ],
    "projectLifecycles" : [ ]
  },
  "azure" : {
    "account" : null,
    "key" : null,
    "blockIncrement" : "4096"
  },
  "explorer-perspective" : {
    "lazyLoadingDepth" : "0",
    "fileLoadingMaxSize" : "16"
  },
  "doNotShowWelcomeDialog" : false
}

airflow_plugin project folder contents

The contents are exactly those provided here on the repo .

HOP_SERVER

docker-compose.yml

version: '3.9'
services:
  apache-hop:
    image: apache/hop:latest
    ports:
      - 8081:8081
    volumes:
      - HOP_CLIENT_HOME_CONFIG :/opt/hop/config
    networks:
      - airflow_plugin_test_default
    environment:
      HOP_SERVER_USER: cluster
      HOP_SERVER_PASS: cluster
      HOP_SERVER_PORT: 8081
      HOP_SERVER_HOSTNAME: 0.0.0.0

networks:
  airflow_plugin_test_default:
    driver: "bridge"

notice that the Apache Airflow containers network is shared with the HOP_SERVER container, thus on the Airflow-HOP SERVER connection, the HOP_SERVER_IP should be Gateway IP of the corresponding shared network between the Airflow Containers and HOP_SERVER container. Also remember to change HOP_CLIENT_HOME_CONFIG to the path to your HOP_CLIENT_HOME config folder.

AIRFLOW TASK LOG

dag_id=airflow_plugin_sample_dag_run_id=manual__2023-12-18T10_33_09.262213+00_00_task_id=work_test_attempt=1 (1).log

As can be noticed the mother workflow starts properly, the issue is the HOP_SERVER starts looking for the sub pipelines in the default project and not the proper project name which in this case is airflow_plugin as specified in the dag file. How can we resolve this issue.

duyvb125 commented 9 months ago

I got the same error. This is the cause:

[2023-12-18T10:33:15.402+0000] {operators.py:47} INFO - 2023/12/18 10:33:12 - fake-data-generate-person-record.hpl - ERROR: Unable to run workflow workflowTest. The fake-data-generate-person-record.hpl has an error. The pipeline path config/projects/default/pipelines/fake-data-generate-person-record.hpl is invalid, and will not run successfully.

I think an option in workflow run configuration 'Export linked resources to the server" isn't sent to the hop-server. So, the hop-server finds that pipeline file in the server folder, of course it's not there.

Why doesn't HopWorkflowOperator have workflow run configuration input?

PereAL7 commented 8 months ago

Hello, I think the problem you've encountered is due to the hop server configuration. I've noticed that in the following line:

    volumes:
      - HOP_CLIENT_HOME_CONFIG :/opt/hop/config

You are using the path /opt/hop/config, instead try to use the path to the base hop directory /opt/hop as this plugin uses relative pathing from this directory due to apache hop shenanigans.

kupelabs commented 8 months ago

This just permits me to mount the config folder of the HOP_CLIENT installed on my server with the HOP_SERVER running on the container so that at each time both have the same files. As you rightly stated this config folder is that which is used by the plugin, as described in the article but as my question hints the issue is why the sub workflows and pipelines always searched under the default project directory and not the provided project_name directory.

PereAL7 commented 8 months ago

I've not tested running sub workflows and pipelines with this plugin. I'm no apache hop expert myself so the idea never occurred to me. Right now this probably a limitation of the plugin, but I'll consider it for a future version. Thank you for your feedback!

pauloricardoferreira commented 8 months ago

Excellent discussion.

I'm here in need of help.

I went the other way.

I made some adaptations so that the Plugin works correctly.

pauloricardoferreira commented 8 months ago

1 - In the configuration of the variables file I made the following modification.

Before image

After image

I used the full path


and my DAG looked like this:

job = HopWorkflowOperator( dag=dag, task_id='tsk-job-agt-cana-hop', workflow='PLANEJAMENTO/JOB_AGT_CANA.hwf', project_name='hop_repo', environment='hop-repo-prd', log_level= 'Basic' )


To make this work with Docker I used the structure below.

A common network between the two containers

a common volume between the two containers

image

e por último resolvi questão do hop-config.json

before "projectConfigurations" : [ { "projectName" : "hop_repo", "projectHome" : "/opt/projetos/hop_repo", "configFilename" : "project-config.json" }, { "projectName" : "default", "projectHome" : "config/projects/default", "configFilename" : "project-config.json" }, { "projectName" : "samples", "projectHome" : "config/projects/samples", "configFilename" : "project-config.json" }, { "projectName" : "live_hop", "projectHome" : "/opt/projetos/live_hop", "configFilename" : "project-config.json" }, { "projectName" : "UAG", "projectHome" : "/opt/projetos/UAG", "configFilename" : "project-config.json" } ], "lifecycleEnvironments" : [ { "name" : "hop-repo-prd", "purpose" : "Production", "projectName" : "hop_repo", "configurationFiles" : [ "/opt/projetos/hop_repo/hop-repo-prd-config.json" ] }, { "name" : "hop-repo-dev", "purpose" : "Development", "projectName" : "hop_repo", "configurationFiles" : [ "/opt/projetos/hop_repo/hop-repo-dev-config.json" ] }, { "name" : "hop-live-prd", "purpose" : "Production", "projectName" : "live_hop", "configurationFiles" : [ "/opt/projetos/live_hop/hop-live-prd-config.json" ] }, { "name" : "hop-live-dev", "purpose" : "Development", "projectName" : "live_hop", "configurationFiles" : [ "/opt/projetos/live_hop/hop-live-dev-config.json" ] }, { "name" : "UAG-PRD", "purpose" : "Production", "projectName" : "UAG", "configurationFiles" : [ "/opt/projetos/UAG/UAG-PRD-config.json" ] }, { "name" : "dev", "purpose" : "Development", "projectName" : "live_hop", "configurationFiles" : [ "${PROJECT_HOME}/hop-live-dev-config.json" ] } ],

eu utilizei um comando linux para remover o caminho /opt com o comando sed

after

"projectConfigurations" : [ { "projectName" : "hop_repo", "projectHome" : "projetos/hop_repo", "configFilename" : "project-config.json" }, { "projectName" : "default", "projectHome" : "config/projects/default", "configFilename" : "project-config.json" }, { "projectName" : "samples", "projectHome" : "config/projects/samples", "configFilename" : "project-config.json" }, { "projectName" : "live_hop", "projectHome" : "projetos/live_hop", "configFilename" : "project-config.json" }, { "projectName" : "UAG", "projectHome" : "projetos/UAG", "configFilename" : "project-config.json" } ], "lifecycleEnvironments" : [ { "name" : "hop-repo-prd", "purpose" : "Production", "projectName" : "hop_repo", "configurationFiles" : [ "projetos/hop_repo/hop-repo-prd-config.json" ] }, { "name" : "hop-repo-dev", "purpose" : "Development", "projectName" : "hop_repo", "configurationFiles" : [ "projetos/hop_repo/hop-repo-dev-config.json" ] }, { "name" : "hop-live-prd", "purpose" : "Production", "projectName" : "live_hop", "configurationFiles" : [ "projetos/live_hop/hop-live-prd-config.json" ] }, { "name" : "hop-live-dev", "purpose" : "Development", "projectName" : "live_hop", "configurationFiles" : [ "projetos/live_hop/hop-live-dev-config.json" ] }, { "name" : "UAG-PRD", "purpose" : "Production", "projectName" : "UAG", "configurationFiles" : [ "projetos/UAG/UAG-PRD-config.json" ] }, { "name" : "dev", "purpose" : "Development", "projectName" : "live_hop", "configurationFiles" : [ "${PROJECT_HOME}/hop-live-dev-config.json" ] } ],

I have shell scripts that solve this problem.

tks.

vegaskyo commented 3 months ago

any solution for this?

piffall commented 3 months ago

any solution for this?

For now, you will need to follow the file structure.

Checkout this section on README https://github.com/damavis/airflow-hop-plugin?tab=readme-ov-file#3-hop-directory-structure