damavis / airflow-pentaho-plugin

Pentaho plugin for Apache Airflow - Orquestate pentaho transformations and jobs from Airflow
Apache License 2.0
38 stars 17 forks source link

Return code 0 PanOperator for failed transformations and Xcom #33

Closed 3pm closed 9 months ago

3pm commented 10 months ago

Hello,

In some cases Pentaho spoon.sh (used by pans.sh) returns EXIT_CODE 0 despite the fact that inner transformation has failed. E.g. create trans.ktr with failing sub transformation in it. In such case we cannot determine if it has failed, but only looking into the logs. Sometimes it is allowed that some sub transformation or transformation in Job can be in failed state, therefore it is ok for spoon.sh return 0, because general Job or transformation is successful. In such cases we go line by line and detect if it has error line like:

               contains_error = False
                for line in iter(self.sub_process.stdout.readline, b''):
                    line = line.decode('utf-8').rstrip()
                    if not contains_error: 
                        if re.search(r'-\s+ERROR\s+\(version', line):
                            contains_error = True
                    self.log.info(line)
                self.sub_process.wait()

And then put to Xcom for Airflow tasks be able to detect such errors.


     if self.xcom_push_flag:
            if not contains_error:
                return line
            else:
                return 'Untracked Error'

Can we add something like that (extra return key maybe) to airflow-pentaho-plugin for tracking such errors?

piffall commented 10 months ago

Hi @3pm ,

I appreciate your suggestion. Unfortunately, I think that is not a good idea.

First of all, it's a common practice to log non-fatal errors in logs, that are not actual process errors, because are well handled, and logged.

Secondly, I think that it breaks the "airflow way". So that would make the Operator work in a way that is not expected.

I think that the best way to solve this, is to create a job (Kitchen) failing when there is any error in any transformation, that is the default behavior.

Thanks.

3pm commented 9 months ago

Hello,

Agree on first.

How it's breaks Airflow if we just add extra XCom key, for example 'has_errors'? One can later just choose not to use this XCom key. I do not propose exact code change. But just add extra XCom key/value based on parsed log.

Third. This is not good to mark failed Pentaho job because of some inner error happened every time- default behavior is ok. But having extra XCom key indicating error, we can choose if specific DAG's task has to be marked as failed.

piffall commented 9 months ago

Hi @3pm

The main concern is that we would need to change the behavior to implement this change. That means breaking changes. So users that now are expecting what it's being returned now (last log line), will find another thing (an object with a key has_errors, set to True or False).

This can end in errors in production deployments of this plugin, so this is not a trivial change.

Thanks.

3pm commented 9 months ago

Hello,

As I wrote, I do no suggest exact code changes above (yes, it will break). I suggest to leave return_value key as it is, but add an extra key in XCom and putting there a value, by parsing log lines.

This will no break any production environments if we use XCom as Airflow intended it to use e.g. for metadata exchange between tasks.

Because we get XCom value by passing key parameter: def get_xcom_error_val(ti): val = ti.xcom_pull(task_ids=['mytrans'],dag_id='tracked_failing_subtransformation',key='THIS_IS_MY_EXTRA_KEY') if val and val[0] == "Untracked Error": raise ValueError("Upstream task has ERRORS!")

How could it breaks existing?

piffall commented 9 months ago

Hi @3pm

I've just read again some docs and code. And you are actually right. That would not break anything. It may take some time for me to implement this. Please, in case you have a working fork, you are welcome to open a Pull Request.

Thanks.

piffall commented 9 months ago

Hi @3pm

Can you share an example dummy transformation with me, that returns some errors in logs without failing, please?

Thanks.

piffall commented 9 months ago

Hi @3pm

I've just released a new version that adds an err_count key to xcom that will return you the number of errors in logs. I will appreciate you test it, and your feedback.

I will be looking forward to include this in carte Operators.

https://github.com/damavis/airflow-pentaho-plugin/compare/1.0.22...1.1.0

Thanks.

3pm commented 9 months ago

Hello, I guess no more needed, but here it is- failing trans with subtrans. failing_trans.zip

piffall commented 9 months ago

Thanks @3pm , Please, try the new version, hope this work for you.

3pm commented 9 months ago

Hello, For my airflow version to load I should change kettle.py like that: from airflow.models import BaseOperator #, XCOM_RETURN_KEY from airflow.models.xcom import XCOM_RETURN_KEY

piffall commented 9 months ago

Hi @3pm

Are you getting an error? Please, let me know which is your Airflow version.

3pm commented 9 months ago

2.6.0

Traceback (most recent call last): File "/home/xxx/.local/lib/python3.10/site-packages/airflow/plugins_manager.py", line 229, in load_entrypoint_plugins plugin_class = entry_point.load() File "/usr/lib/python3/dist-packages/importlib_metadata/__init__.py", line 168, in load module = import_module(match.group('module')) File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1050, in _gcd_import File "<frozen importlib._bootstrap>", line 1027, in _find_and_load File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 688, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 883, in exec_module File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed File "/usr/local/lib/python3.10/dist-packages/airflow_pentaho/plugin.py", line 23, in <module> from airflow_pentaho.operators.kettle import KitchenOperator File "/usr/local/lib/python3.10/dist-packages/airflow_pentaho/operators/kettle.py", line 26, in <module> from airflow.models import BaseOperator, XCOM_RETURN_KEY ImportError: cannot import name 'XCOM_RETURN_KEY' from 'airflow.models' (/home/xxx/.local/lib/python3.10/site-packages/airflow/models/__init__.py)

piffall commented 9 months ago

I've just published the v1.1.1 with a fix.

3pm commented 9 months ago

Thank you for the job. It is very helpful now.