Closed evgdol closed 1 year ago
Hi @evgdol ,
I found a limitation in PDI Carte server. Transformation run ID is not returned by the Carte server, so is required to ask the status and logs by transformation name. I guess that, as you are running both transformations concurrently, you are getting logs from just the second transformation run.
My recommendation is, wrap that transformations inside a Job, and always use CarteJobOperator
.
I'll check if this can be mitigated somehow, but the error will always be possible because of concurrency.
@evgdol Can you update the plugin and try to run again, please?
Hi @piffall,
Unfortunately, this was not helpful
Hi @evgdol , I have one question. Are you running both transformations in parallel?
Yes. The full data flow in one transformation does not work because of memory issue (the original transformation is hard). But if split the flow to ranges and run the transformation by these ranges in parallel, it works fine.
Ok @evgdol . I recommend you to use those transformations inside a Job, and run using CarteJobOperator
. That will work, because ID is returned correctly.
Hi @piffall,
Thank you. Inside job it works properly.
Scenario:
I have a transformation that should be run several times according to previous step results. I am using expand_kwargs feature to do this. Simplified example:
t1 = CarteTransOperator.partial( task_id='tst_trans_params', pdi_conn_id='pentaho', trans='/public/test/tr_tst_input_params' ).expand_kwargs([{"params": {"PARAM": 1}},{"params": {"PARAM": 2}}])
The transformation tr_tst_input_params just outputps the input parameter into the log.Result:
All works fine. But the log for both map indexes gathers from the last parameter value only
On Pentaho side:
Expected result:
Log should be valid
Environment
Airflow 2.5.1 + Python 3.10 + Pentaho server CE 8.0