apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Failing Test]: #31081

Open szxyks opened 6 months ago

szxyks commented 6 months ago

What happened?

I try running this script: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang.py

$ python wordcount_xlang.py --output ./ --expansion_service_jar beam-sdks-java-io-expansion-service-2.55.0.jar

I downloaded the JAR file from: https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.55.0/

Here is the output:

$ python wordcount_xlang.py --output ./ --expansion_service_jar beam-sdks-java-io-expansion-service-2.55.0.jar
Starting expansion service at localhost:8096
Apr 23, 2024 1:35:21 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO: Registering external transforms: [beam:transform:org.apache.beam:kafka_write:v2, beam:transform:group_into_batches:v1, beam:transform:org.apache.beam:kafka_read_with_metadata:v2, beam:transform:org.apache.beam:kafka_write:v1, beam:transform:combine_grouped_values:v1, beam:transform:group_into_batches_with_sharded_key:v1, beam:transform:create_view:v1, beam:transform:teststream:v1, beam:transform:sdf_process_keyed_elements:v1, beam:transform:combine_globally:v1, beam:external:java:generate_sequence:v1, beam:transform:window_into:v1, beam:transform:flatten:v1, beam:transform:impulse:v1, beam:runners_core:transforms:splittable_process:v1, beam:transform:write_files:v1, beam:transform:combine_per_key:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:group_by_key:v1, beam:transform:reshuffle:v1]

Registered transforms:
        beam:transform:org.apache.beam:kafka_write:v2: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@c430e6c
        beam:transform:group_into_batches:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@312aa7c
        beam:transform:org.apache.beam:kafka_read_with_metadata:v2: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@536f2a7e
        beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@72bc6553
        beam:transform:combine_grouped_values:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@66982506
        beam:transform:group_into_batches_with_sharded_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@70cf32e3
        beam:transform:create_view:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5a59ca5e
        beam:transform:teststream:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4bdeaabb
        beam:transform:sdf_process_keyed_elements:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6c4906d3
        beam:transform:combine_globally:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@65987993
        beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@71075444
        beam:transform:window_into:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4f32a3ad
        beam:transform:flatten:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6b695b06
        beam:transform:impulse:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@4d1bf319
        beam:runners_core:transforms:splittable_process:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@6f53b8a
        beam:transform:write_files:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@5c80cf32
        beam:transform:combine_per_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@7d900ecf
        beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@6f01b95f
        beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForBuilder@4007f65e
        beam:transform:group_by_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@1a245833
        beam:transform:reshuffle:v1: org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator@673fdbce

Registered SchemaTransformProviders:
        beam:schematransform:org.apache.beam:yaml:filter-java:v1
        beam:schematransform:org.apache.beam:yaml:flatten:v1
        beam:schematransform:org.apache.beam:kafka_read:v1
        beam:schematransform:org.apache.beam:kafka_write:v1
        beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1
        beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1
        beam:schematransform:org.apache.beam:generate_sequence:v1
        beam:schematransform:org.apache.beam:yaml:log_for_testing:v1
        beam:schematransform:org.apache.beam:yaml:explode:v1
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
Apr 23, 2024 1:35:27 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO: Expanding 'count' with URN 'beam:transforms:xlang:count'
Dependencies list: {}
Traceback (most recent call last):
  File "/home/steeve/wordcount_xlang.py", line 129, in <module>
    main()
  File "/home/steeve/wordcount_xlang.py", line 122, in main
    build_pipeline(p, known_args.input, known_args.output)
  File "/home/steeve/wordcount_xlang.py", line 62, in build_pipeline
    lines
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pvalue.py", line 138, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 667, in apply
    return self.apply(
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 678, in apply
    return self.apply(transform, pvalueish)
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 731, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/runners/runner.py", line 203, in apply
    return self.apply_PTransform(transform, input, options)
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/runners/runner.py", line 207, in apply_PTransform
    return transform.expand(input)
  File "/home/steeve/.local/lib/python3.10/site-packages/apache_beam/transforms/external.py", line 753, in expand
    raise RuntimeError(response.error)
RuntimeError: java.lang.UnsupportedOperationException: Unknown urn: beam:transforms:xlang:count
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:599)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:710)
        at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
        at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:351)
        at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
        at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

Can anyone please help me test this out, it looks like an issue with: RuntimeError: java.lang.UnsupportedOperationException: Unknown urn: beam:transforms:xlang:count

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be sure the product is healthy)

Issue Components

liferoad commented 6 months ago

I think the doc about this part is bad. https://github.com/apache/beam/blob/master/sdks/java/testing/expansion-service/src/test/java/org/apache/beam/sdk/testing/expansion/TestExpansionService.java#L80 defines that Java external transform. You need to add that to the expansion service, something like https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service.

Or following https://github.com/apache/beam/tree/master/examples/multi-language to use the maven jar Beam releases: beam-examples-multi-language*