apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.85k stars 4.25k forks source link

[Bug]: BeamRunJavaPipelineOperator issue #23288

Closed Rajkante closed 1 year ago

Rajkante commented 2 years ago

What happened?

I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data using dataflow from google cloud storage to Bigquery using a airflow DAG.The dataflow job is submitted successfully but I want to wait until the dataflow job runs successfully in the background and then move on to the next task.I am thinking to tackle this using 'DataflowJobStatusSensor' which checks the status of the job in the background. This requires the job ID we want to check which is supposed to be returned as a Xcom by the 'BeamRunJavaPipelineOperator' but it does not return the desired Xcom.

    start_java_pipeline = BeamRunJavaPipelineOperator(
    task_id="start_java_pipeline",
    runner='dataflow',
    jar="<path-to-java-jar>",
    pipeline_options=
    {'airflowBucket': '<bucket-path>', 'jobName': '<job-name>', 'inputfileBucket': '<input-file-path>', 'maxNumWorkers': '10', 
    'targetTableProject': '<Project-name>', 'datasetName': '<dataset-name>', 'serviceAccount': '<service-account>', 'runConfig': 
    '<path-config-files>', 'project': '<project-name>', 'workerMachineType': 'n1-standard-2', 'region': '<region>', 'subnetwork': 
    "<subnetwork>", 'usePublicIps': 'false', 'stagingLocation': '<Staging-loaction>', 'tempLocation': '<temp-location>' },
    job_class='<class-name-in-jar>',
    do_xcom_push=True,
    dag=dag)

    wait_for_done = DataflowJobStatusSensor(
    task_id="wait-for-java-dataflow",
    job_id="task_instance.xcom_pull('Get_job_id')",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    project_id="xxx-xx-xxx",
    gcp_conn_id='google_cloud_default',
    location='us-central1',
    )
    start_java_pipeline  >> wait_for_done

Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch the same using "DataflowJobStatusSensor" without any issues.But this is a deprecated operator.

Issue Priority

Priority: 3

Issue Component

Component: beam-community

aromanenko-dev commented 1 year ago

It looks like a Dataflow issue, not a Beam one.

@aaltay @pabloem Could you confirm and handle it properly if needed?

aaltay commented 1 year ago

I think this is actually an airflow operator issue.

@aijamalnk - do you know who in airflow might be working on dataflow operators?

Thank you for reporting @Rajkante and thank you @aromanenko-dev for tagging us.

Rajkante commented 1 year ago

You're welcome

On Fri, Nov 4, 2022 at 12:08 AM Ahmet Altay @.***> wrote:

I think this is actually an airflow operator issue.

@aijamalnk https://github.com/aijamalnk - do you know who in airflow might be working on dataflow operators?

Thank you for reporting @Rajkante https://github.com/Rajkante and thank you @aromanenko-dev https://github.com/aromanenko-dev for tagging us.

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/23288#issuecomment-1302522324, or unsubscribe https://github.com/notifications/unsubscribe-auth/A3E3KYHCXZO3GXCLON4J3UDWGQBDBANCNFSM6AAAAAAQPZUUPE . You are receiving this because you were mentioned.Message ID: @.***>

--

Thanks & Regards, Raj Kante Associate Engineer - l - Solutioning and Software Engineering +91 8806435676 | www.datametica.com Datametica Solutions Private Limited, Pune - India

https://www.linkedin.com/company/datametica https://twitter.com/DataMetica https://instagram.com/datametica_lifeatdm?utm_medium=copy_link https://www.facebook.com/datametica1 https://www.youtube.com/channel/UCTBR2-f1mDSSpwD0BteETPQ

kennknowles commented 1 year ago

Agree this is an airflow issue. I suggest filing a ticket over there. In Beam you can use waitUntilFinish on the pipeline result so this is just about how to integrate with Airflow.

Once you have filed an airflow issue, can you link it here and close this?

kennknowles commented 1 year ago

Actually since I am cleaning up Beam issues and this does not appear to be a Beam issue I am going to close it, but still you can comment here with the airflow issue # if you want.