apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[New Feature][Go SDK]: Calling Python transforms from Go pipeline results #22931

Open ronoaldo opened 2 years ago

ronoaldo commented 2 years ago

What happened?

Following the documentation available here for Beam 2.4.1, I am trying to write a simple pipeline in Go using an external transform defined in Python.

I followed the docs with my own "word count" code but ended up having an error that was very obscure. Then I decided to run the Python test cross lang suite (pip installed at apache_beam/runners/portability/expansion_service_test.py) to use it as expansion service entrypoint and call a sample from the Go SDK tree, specifically, the xlang/wordcount sample. To my surprise, It failed with the same kind of error.

After several trial and errors I failed to get Go pipeline call an external Python transform following the docs and even reading some code from the stack traces.

Here is the output from the Python expansion service:

(env) $ python ../env/lib/python3.9/site-packages/apache_beam/runners/portability/expansion_service_test.py -p 12345
INFO:apache_beam.runners.portability.stager:Executing command: ['/home/ronoaldo/workspace/micro-beam/env/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/tmp/tmpomfse_4t/tmp_requirements.txt', '--exists-action', 'i', '--no-deps', '--implementation', 'cp', '--abi', 'cp39', '--platform', 'manylinux2014_x86_64']
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.41.0
INFO:root:No image given, using default Python SDK image
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.41.0
INFO:root:Python SDK container image set to "apache/beam_python3.9_sdk:2.41.0" for Docker environment
INFO:__main__:Listening for expansion requests at 12345
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.

This is the error from the pipeline (very similar one I got with my own code, except it was complaining for 'n5' but same KeyError at the same place):

(env) $ wordcount --expansion_addr=localhost:12345
panic:  tried cross-language for beam:transforms:xlang:count against localhost:12345 and failed
        expanding external transform
        expanding transform with ExpansionRequest: components:{pcollections:{key:"n3" value:{unique_name:"n3" coder_id:"c0@OfMQSsncEv" is_bounded:BOUNDED windowing_strategy_id:"w0@OfMQSsncEv"}} windowing_strategies:{key:"w0@OfMQSsncEv" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c1@OfMQSsncEv" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY on_time_behavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0@OfMQSsncEv" value:{spec:{urn:"beam:coder:string_utf8:v1"}}} coders:{key:"c1@OfMQSsncEv" value:{spec:{urn:"beam:coder:global_window:v1"}}} environments:{key:"go" value:{}}} transform:{unique_name:"External" spec:{urn:"beam:transforms:xlang:count"} inputs:{key:"i0" value:"n3"} environment_id:"go"} namespace:"OfMQSsncEv"
expansion failed
        caused by:
Traceback (most recent call last):
  File "/home/ronoaldo/workspace/micro-beam/env/lib/python3.9/site-packages/apache_beam/runners/portability/expansion_service.py", line 77, in Expand
    inputs = transform._pvaluish_from_dict({
  File "/home/ronoaldo/workspace/micro-beam/env/lib/python3.9/site-packages/apache_beam/runners/portability/expansion_service.py", line 79, in <dictcomp>
    with_pipeline(context.pcollections.get_by_id(pcoll_id), pcoll_id)
  File "/home/ronoaldo/workspace/micro-beam/env/lib/python3.9/site-packages/apache_beam/runners/portability/expansion_service.py", line 49, in with_pipeline
    component.producer, component.tag = producers[pcoll_id]
KeyError: 'n3'

goroutine 1 [running]:
github.com/apache/beam/sdks/v2/go/pkg/beam.CrossLanguage({0xc000480ac0?, 0xc0004b1090?}, {0xfdbd06, 0x1b}, {0x0?, 0x8?, 0x7f39103faa68?}, {0x7fffd9812cdc, 0xf}, 0xc0006bfc68, ...)
        /home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.41.0/go/pkg/beam/xlang.go:162 +0x136
github.com/apache/beam/sdks/v2/go/examples/xlang.Count({0xc0004809c0?, 0xc0004b1090?}, {0x7fffd9812cdc, 0xf}, {0x0?})
        /home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.41.0/go/examples/xlang/transforms.go:113 +0x3d4
main.main()
        /home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.41.0/go/examples/xlang/wordcount/wordcount.go:94 +0x22e

I tested this using a Python virtual env with apache-beam[gcp] and go install'ed this example: github.com/apache/beam/sdks/v2/go/examples/xlang/wordcount@latest. From one terminal session I launched the expansion service and on another the wordcount Go program.

Issue Priority

Priority: 2

Issue Component

Component: sdk-go

ronoaldo commented 2 years ago

I'm indeed curious about what I am missing from docs regarding calling xlang transforms.

Following the chapter 13 of Beam Programming Model guide, I wrote three Pipelines, one on each language. The Python and Java one expose their SplitWords implementation to an expansion service.

I can run each pipeline, both locally and on Google Cloud Dataflow remote worker, to make sure they all are properly working by their own. Each main has some arguments to allow for a cross language call configuration.

I have then tested several combinations of xlang calls and only one worked:

So, to summarize, my experiments yielded these results:

Callee \ Calling to => Java Python Go
Java - Unstested Unsupported
Python Works - Unsupported
Go Fail Fail -

This is my Go code: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/go/pipeline.go This is my Python code: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/pipeline.py This is my Java Pipeilne: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/java/src/main/java/com/ronoaldo/WordCountPipeline.java and my Java exported PTransform: https://github.com/ronoaldo/micro-beam/blob/main/05_xlang/java/src/main/java/com/ronoaldo/SplitWordsFromJava.java

Am my missing something?

This is the Go output after I launch the Java expansion server:

2022/09/01 18:02:49 Using external transform SplitWordsFromJava at localhost:12345
2022/09/01 18:02:50 Executing pipeline with the direct runner.
2022/09/01 18:02:50 Pipeline:
2022/09/01 18:02:50 Nodes: {1: []uint8/bytes GLO}
{2: string/string GLO}
{3: string/string GLO}
{4: KV<string,int64>/KV<string,varint> GLO}
{5: string/string GLO}
{6: string/string GLO}
{7: KV<string,int>/KV<string,int[varintz]> GLO}
{8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}
{9: KV<string,int>/KV<string,int[varintz]> GLO}
{10: main.CountedWord/R[main.CountedWord] GLO}
{11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}
{12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}
{13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}
{14: []main.CountedWord/[]main.CountedWord[json] GLO}
{15: string/string GLO}
{16: KV<int,string>/KV<int[varintz],string> GLO}
{17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string GLO}]
3: ParDo [In(Main): string <- {2: string/string GLO}] -> [Out: string -> {3: string/string GLO}]
4: ParDo [In(Main): string <- {3: string/string GLO}] -> [Out: KV<string,int64> -> {4: KV<string,int64>/KV<string,varint> GLO}]
5: ParDo [In(Main): KV<string,int64> <- {4: KV<string,int64>/KV<string,varint> GLO}] -> [Out: string -> {5: string/string GLO}]
6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
7: ParDo [In(Main): T <- {6: string/string GLO}] -> [Out: KV<T,int> -> {7: KV<string,int>/KV<string,int[varintz]> GLO}]
8: CoGBK [In(Main): KV<string,int> <- {7: KV<string,int>/KV<string,int[varintz]> GLO}] -> [Out: CoGBK<string,int> -> {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}]
9: Combine [In(Main): int <- {8: CoGBK<string,int>/CoGBK<string,int[varintz]> GLO}] -> [Out: KV<string,int> -> {9: KV<string,int>/KV<string,int[varintz]> GLO}]
10: ParDo [In(Main): KV<string,int> <- {9: KV<string,int>/KV<string,int[varintz]> GLO}] -> [Out: main.CountedWord -> {10: main.CountedWord/R[main.CountedWord] GLO}]
11: ParDo [In(Main): T <- {10: main.CountedWord/R[main.CountedWord] GLO}] -> [Out: KV<int,T> -> {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}]
12: CoGBK [In(Main): KV<int,main.CountedWord> <- {11: KV<int,main.CountedWord>/KV<int[varintz],R[main.CountedWord]> GLO}] -> [Out: CoGBK<int,main.CountedWord> -> {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}]
13: Combine [In(Main): T <- {12: CoGBK<int,main.CountedWord>/CoGBK<int[varintz],R[main.CountedWord]> GLO}] -> [Out: KV<int,[]T> -> {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}]
14: ParDo [In(Main): KV<X,Y> <- {13: KV<int,[]main.CountedWord>/KV<int[varintz],[]main.CountedWord[json]> GLO}] -> [Out: Y -> {14: []main.CountedWord/[]main.CountedWord[json] GLO}]
15: ParDo [In(Main): []main.CountedWord <- {14: []main.CountedWord/[]main.CountedWord[json] GLO}] -> [Out: string -> {15: string/string GLO}]
16: ParDo [In(Main): T <- {15: string/string GLO}] -> [Out: KV<int,T> -> {16: KV<int,string>/KV<int[varintz],string> GLO}]
17: CoGBK [In(Main): KV<int,string> <- {16: KV<int,string>/KV<int[varintz],string> GLO}] -> [Out: CoGBK<int,string> -> {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}]
18: ParDo [In(Main): CoGBK<int,string> <- {17: CoGBK<int,string>/CoGBK<int[varintz],string> GLO}] -> []
2022/09/01 18:02:50 Failed to execute job: translation failed
    caused by:
unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]
panic: Failed to execute job: translation failed
    caused by:
unexpected edge: 6: External [In(Main): string <- {5: string/string GLO}] -> [Out: string -> {6: string/string GLO}]

goroutine 1 [running]:
github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x114d0e0, 0xc0000420e8}, {0xfeb8b2?, 0x18?}, {0xc0006bff60?, 0x0?, 0x464f1b?})
    /home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.41.0/go/pkg/beam/log/log.go:153 +0xa5
main.main()
    /home/ronoaldo/workspace/micro-beam/05_xlang/go/pipeline.go:144 +0x2a8
exit status 2
chamikaramj commented 2 years ago

I don't think using Python transforms from Go SDK is fully supported yet. I believe @riteshghorse is looking into this.

riteshghorse commented 2 years ago

Yes, Python transform from Go will be available by mid Sept in Go SDK at the Head or we will try to get it in for 2.42 release.

Note: Cross Language transforms doesn't work on Go Direct Runner. You have to use Python Portable/Flink/Dataflow runner.

Here is an example code for running KafkaIO in Go SDK using Cross-Language with Java SDK: https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go

ronoaldo commented 2 years ago

Yes, Python transform from Go will be available by mid Sept in Go SDK at the Head or we will try to get it in for 2.42 release.

Yay! Super! Thanks

Note: Cross Language transforms doesn't work on Go Direct Runner. You have to use Python Portable/Flink/Dataflow runner.

Here is an example code for running KafkaIO in Go SDK using Cross-Language with Java SDK: https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go

Oh, that is interesting. While reading over that example I didn't noticed it was not using the Direct Runner from Go. Just to make sure I understood properly, if I use a different runner, such as Dataflow, my sample that calls Go from Java should work, right? I mean, using a custom-made jar from the Java side instead of using a Java Beam SDK transform.

Calling Python from Go would be very nice in order to have access to some very handy transforms like DataframeTransform.

To avoid cluttering the issue tracker, you guys can close / relate this issue to the one tracking the implementation of Go -> Python xlang calls.

lostluck commented 2 years ago

@ronoaldo Yes, Dataflow, Flink, and the Python Portable runner support executing Cross Language transforms. The Go Direct Runner doesn't and is going to be replaced with something more useful in the next 6 months.

I think we currently plan to wrap the Dataframe transforms too, along with Run Inference.

At this point, we're aiming for the 2.43.0 or 2.44.0 releases though, as 2.42.0 is cut.

I don't think we have a public issue for tracking this though, so I'll assign to @riteshghorse , and he can close this issue after referencing any existing issues. Thank you for the clear reports though!

lostluck commented 2 years ago

(swapped some labels and edited the title things for book keeping)

ronoaldo commented 2 years ago

Thanks @lostluck!

A small follow up - using the Dataflow Runner I could call Java from Python, but could not call Java from Go in a sample/custom pipeline. The error in Dataflow was ... odd! I could not decipher it. I'll open a separate report to track this use case (Go -> Java xlang for a custom Java pipeline).

ronoaldo commented 2 years ago

I would also point out that the docs state this as being a possible workflow as of now, which is what confused me in the beginning:

13.2.3. Using cross-language transforms in a Go pipeline If a Go-specific wrapper for a cross-language is available, use that. Otherwise, you have to use the lower-level CrossLanguage function to access the transform.

Expansion Services

The Go SDK supports automatically starting Java expansion services if an expansion address is not provided, although this is slower than providing a persistent expansion service. Many wrapped Java transforms manage perform this automatically; if you wish to do this manually, use the xlangx package’s UseAutomatedJavaExpansionService() function. In order to use Python cross-language transforms, you must manually start any necessary expansion services on your local machine and ensure they are accessible to your code during pipeline construction.

If 2.42 docs can be updated, it worth mentioning that calling Python from Go is unsupported, instead of telling that we must start the expansion service manually.