damavis / airflow-pentaho-plugin

Pentaho plugin for Apache Airflow - Orquestate pentaho transformations and jobs from Airflow
Apache License 2.0
38 stars 17 forks source link

Check for empty params and extraction value for version starting Airf… #22

Closed adolnik closed 1 year ago

adolnik commented 1 year ago

…low 2.2 https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/models/param.html

sonarcloud[bot] commented 1 year ago

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

piffall commented 1 year ago

Hi @adolnik

I've reviewed your PR and I think that we should discuss your proposal. I see that there is a problem when params is equals to None. But I don't see the use case to go and use the class Param to hold the parameters in this plugin.

Actually, yo can already use Param, because PentahoClient.build_command is already getting a regular dict extracted from the Param holder. That is not covered by tests, sorry.

Please, let me know if you think that I'm wrong.

Thanks.

adolnik commented 1 year ago

@piffall I use the following command

from airflow_pentaho.operators.kettle import KitchenOperator
...
    job1 = KitchenOperator(
        dag=dag,
        task_id='pentaho_job',
        directory='pentaho_scripts',
        job='main_test',
        xcom_push=True,
        file='main.kjb',
        params={"date": "'{{ds}}'",
            'empty_param':'', 
            'DB_HOST':default_db_host, 
            'DB_NAME' : Variable.get("DEFAULT_DB_NAME"),
            'DB_PWD' :  Variable.get("DEFAULT_DB_PWD"),
            'DB_USER' :  Variable.get("DEFAULT_DB_USER"),
            'default_path' : Variable.get("DEFAULT_PMPLAN_MEDIA_PATH")})

In my case this sample doesn't work because it generate something like param:DB_NAME=<airflow.models.param.Param object at 0x7f12865c78d0>

piffall commented 1 year ago

I see @adolnik , have you considered to use templates?

It would look similar to this:

    job1 = KitchenOperator(
        dag=dag,
        task_id='pentaho_job',
        directory='pentaho_scripts',
        job='main_test',
        xcom_push=True,
        file='main.kjb',
        params={"date": "'{{ds}}'",
            'empty_param':'', 
            'DB_HOST': default_db_host, 
            'DB_NAME' : '{{ var.value.get("DEFAULT_DB_NAME") }}',
            'DB_PWD' :  '{{ var.value.get("DEFAULT_DB_PWD") }}',
            'DB_USER' :  '{{ var.value.get("DEFAULT_DB_USER") }}',
            'default_path' : '{{ var.value.get("DEFAULT_PMPLAN_MEDIA_PATH") }}'
        })

You can take a look at Airflow Variables in Templates docs

adolnik commented 1 year ago

@piffall yes I do and "{{ ds }}" should works by default. It works perfectly for the previous step but for KitchenOperator I got the following in the logs:

[2022-12-01, 11:59:32 UTC] {kettle.py:82} INFO - /tmp/***tmpj57tdgcu/pentaho_job57m6u3b9: line 1: `/opt/pentaho/data-integration/kitchen.sh -rep= -user= -pass=None -dir=/opt/***/pentaho_scripts -job=main_test -level=Basic -logfile=/dev/stdout -safemode=false -maxloglines=0 -maxlogtimeout=0 -file=/opt/***/pentaho_scripts/main.kjb -norep=true -param:date=<***.models.param.Param object at 0x7fe1b3253f10> -param:empty_param=<***.models.param.Param object at 0x7fe1b3253d50> -param:DB_HOST=<***.models.param.Param object at 0x7fe1b3253ed0> -param:DB_NAME=<***.models.param.Param object at 0x7fe1b3253b10> -param:DB_PWD=<***.models.param.Param object at 0x7fe1b3253110> -param:DB_USER=<***.models.param.Param object at 0x7fe1b3253a90> -param:default_path=<***.models.param.Param object at 0x7fe1b3253ad0>'

That is why I added the following test above.

According to https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/param/index.html we need to run resolve() method to get value

piffall commented 1 year ago

Ok, I'll merge your code and tag a new version. I'll need to add some tests to cover this. Thanks for your contribution.