apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

Allow override beam version for PythonExternalTransform via pipeline option #31691

Closed Abacn closed 3 months ago

Abacn commented 3 months ago

Fix #31680

This was exposed in bootstrap_beam_venv.py, incuding the ability to send a tarball:

https://github.com/apache/beam/blob/90d3f8a177ff935a09852fd85892bb1399b85c91/sdks/java/extensions/python/src/main/resources/org/apache/beam/sdk/extensions/python/bootstrap_beam_venv.py#L70-L71

However, it is not exposed in PythonExternalTransform that uses it.

Tested by manually adding a test (after adding direct-java dependency in test runtime classpath)

  @Test
  public void testCustomBeamRequirement() {
    PythonExternalTransformOptions options = PipelineOptionsFactory.fromArgs("--customBeamRequirement=2.55.0").create().as(PythonExternalTransformOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply(Create.of("a")).apply(PythonExternalTransform
        .<PCollection<String>, PCollection<String>>from("DummyTransform"));
  }

then there is log

[Test worker] INFO org.apache.beam.sdk.extensions.python.PythonService - Running bootstrap command [python3, /var/folders/wg/hwmcqjwd4zz75mjs0r5z_3f400y2yj/T/bootstrap_beam_venv4367253676595224584.py, --beam_version=2.55.0]
...
[Test worker] INFO org.apache.beam.sdk.extensions.python.PythonService -   Created wheel for apache_beam: filename=apache_beam-2.55.0-cp311-cp311-macosx_12_0_arm64.whl size=5128626 sha256=c737e750a451fddb5718cbb518ea92b44d7575817fd2f94438fe2b481d0af658
[Test worker] INFO org.apache.beam.sdk.extensions.python.PythonService -   Stored in directory: /Users/yathu/Library/Caches/pip/wheels/1f/20/49/92e3d6469697bbae302bab858f6fb07a89ea897f9afc95298f
[Test worker] INFO org.apache.beam.sdk.extensions.python.PythonService - Successfully built apache_beam
[Test worker] INFO org.apache.beam.sdk.extensions.python.PythonService - Starting python service with arguments [/Users/yathu/.apache_beam/cache/venvs/py-3.11-beam-2.55.0-da39a3ee5e6b4b0d3255bfef95601890afd80709/bin/python, -m, apache_beam.runners.portability.expansion_service_main, --port=54179, --fully_qualified_name_glob=*, --pickle_library=cloudpickle]

however it is hard to unit testing as it either requires to communicate to the real pypi service, or have a beam sdk python tarball in place.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Abacn commented 3 months ago

R: @chamikaramj

github-actions[bot] commented 3 months ago

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

chamikaramj commented 3 months ago

You can override this whole setup by starting up an expansion service manually and specifying it via the expansionService constructor parameter of PythonExternalTransform. Did you consider that ?

https://github.com/apache/beam/blob/2f517188fa88a81b2709974189349c8fd02e0fc1/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java#L98

Also, note that for this to work end-to-end, the ExpansionService started here should also produce an expanded transforms that point to an Environment with a valid container.

Abacn commented 3 months ago

starting up an expansion service manually and specifying it via the expansionService constructor parameter of PythonExternalTransform.

Thanks for pointing that.

In the case of release candidate validation, e.g.,

https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/1681#issuecomment-2187900474

it would be great to require minimum change (like flag flip) to be able to run the same workflow using release candidate vs. released version. Manual expansion service in theory works, but it then no longer validate the components of auto spin up a expansion service during pipeline expansion, which is how external transform works by default with released beam.

Also, note that for this to work end-to-end, the ExpansionService started here should also produce an expanded transforms that point to an Environment with a valid container.

We do publish 2.xx.0RC1 container to dockerhub so it should be valid.

chamikaramj commented 3 months ago

Agree that it's better if we can use the auto-started expansion service for validation. That would be the closest to what most customers will use.

So LGTM :)

chamikaramj commented 3 months ago

(please run a test to make sure that this works end-to-end for RCs)

Abacn commented 3 months ago

(please run a test to make sure that this works end-to-end for RCs)

Tested. Steps:

Patched v2.57.0 with this PR:

git checkout tags/v2.57.0 -b tag-v2.57.0

git cherry-pick cd5bd291fc4b59e1d951e077fa51074202870377

./gradlew -Ppublishing :sdks:java:extensions:python:publishToMavenLocal

Then created a standalone project, copy-pasted PythonExternalTransformTest.trivialPythonTransform

Direct runner (pipeline expansion succeeded, pipeline execution has run time error, expected):

Exception in thread "main" java.lang.NullPointerException: No evaluator for PTransform "beam:transform:external:v1"
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:1010)
    at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:157)

Dataflow runner (pipeline option "--customBeamRequirement=2.56.0") (2.57.0RC1 is already deleted from PyPI)

[main] INFO org.apache.beam.sdk.extensions.python.PythonService - Running bootstrap command [python3, /var/folders/wg/hwmcqjwd4zz75mjs0r5z_3f400y2yj/T/bootstrap_beam_venv1386699985397287694.py, --beam_version=2.56.0]
[main] INFO org.apache.beam.sdk.extensions.python.PythonService - /Users/.../.apache_beam/cache/venvs/py-3.11-beam-2.56.0-da39a3ee5e6b4b0d3255bfef95601890afd80709/bin/python
[main] INFO org.apache.beam.sdk.extensions.python.PythonService - Starting python service with arguments [/Users/yathu/.apache_beam/cache/venvs/py-3.11-beam-2.56.0-da39a3ee5e6b4b0d3255bfef95601890afd80709/bin/python, -m, apache_beam.runners.portability.expansion_service_main, --port=61984, --fully_qualified_name_glob=*, --pickle_library=cloudpickle]

Dataflow worker log

argv[12]: '--sdk_harness_ids=sdk-0-0,gcr.io/cloud-dataflow/v1beta3/beam_python3.11_sdk:2.56.0,sdk-1-0,gcr.io/cloud-dataflow/v1beta3/beam_java11_sdk:2.57.0'

and pipeline run succeeded.