apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: Java expansion service does not respect the namespace for all environments. #30876

Open robertwb opened 6 months ago

robertwb commented 6 months ago

What happened?

This seems to primarily be leaking a unnamespaced "default" environment for the globally windowing strategy, which slides under the radar as the environment is not always inspected in this case, but it would be good to clean this up and add a check.

Issue Priority

Priority: 3 (minor)

Issue Components

robertwb commented 6 months ago

This was uncovered by https://github.com/apache/beam/pull/30864

It can be reproduced with

import apache_beam as beam
from apache_beam.transforms import external
import logging

from apache_beam.utils import subprocess_server

logging.getLogger().setLevel(logging.INFO)

with beam.Pipeline('DirectRunner') as p:
  i1 = p | "i1" >> beam.Create([beam.Row(name='john', id=1)])
  i2 = p | "i2" >> beam.Create([beam.Row(name='jane', id=1)])
  result = {'i1': i1, 'i2': i2} | 'Sql1' >> external.ExternalTransform(
        'beam:external:java:sql:v1',
        external.ImplicitSchemaPayloadBuilder(
          {'query': 'SELECT * FROM i1 INNER JOIN i2 ON i1.id = i2.id'}
        ).payload(),
    external.JavaJarExpansionService(
      subprocess_server.JavaJarServer.path_to_beam_jar(
        gradle_target='sdks:java:extensions:sql:expansion-service:shadowJar',
        artifact_id=None
      )
    )) | 'LogForTesting' >> external.SchemaAwareExternalTransform(
        'beam:schematransform:org.apache.beam:yaml:log_for_testing:v1',
    external.JavaJarExpansionService(
      subprocess_server.JavaJarServer.path_to_beam_jar(
        gradle_target='sdks:java:extensions:sql:expansion-service:shadowJar',
        artifact_id=None
      )
    ), rearrange_based_on_discovery=True)