apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.67k stars 4.19k forks source link

[Bug]: Python expansion with multiple SqlTransforms is extremely slow #31227

Open gergely-g opened 1 month ago

gergely-g commented 1 month ago

What happened?

When building a Pipeline with multiple SqlTransforms from Beam Python, the expansion that happens in SqlTransforms is currently (Beam 2.55.0) extremely inefficient.

This inefficiency has multiple sources.

  1. By default, a new BeamJarExpansionService() is started for each ExpansionService.
  2. The ResolveArtifacts call will unconditionally download the 300MB beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar to a temporary directory for each step (!).

The latter dominates execution time. For example running a Beam from a 4 vCPU, 2 core, 16 GB memory machine (standard Dataflow workbench setup) a Pipeline with 31 trivial SQL transforms takes 200 seconds to execute. (See example below.)

We found a somewhat dirty workaround to speed things up by skipping the SqlTransform._resolve_artifacts() altogether when working from inside Jupyter.

This brings down the execution speed from 200s to 22s.

I suspect these inefficiencies also contribute to beam_sql being extremely slow even for trivial queries.

apache_beam/runners/portability/artifact_service.py contains this code snippet that might be one of the culprits for this inefficiency (note the and False):

    if os.path.exists(
        payload.path) and payload.sha256 and payload.sha256 == sha256(
            payload.path) and False:
      return artifact
    else:
      return store_artifact(artifact, service, dest_dir)

In addition, once the ExpansionService is cached it only takes 100-200ms to perform the actual SQL expansion, but the ArtifactRetrievalService.ResolveArtifacts() call takes 1.5s per SQL query even without the downloading of the actual files. This dominates the expansion time, which dominates the overall time of launching and running a pipeline.

So the hotspot call sequence is something like:

The times may not sound like much, but latency is bad enough to ruin the Jupyter REPL experience when combining Python + SQL.

Code to repro and demonstrate the workaround.

import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
from apache_beam.transforms.external import BeamJarExpansionService
import time

# By default this pipeline of 31 SQL transforms takes 200s to execute.
# With the short-circuiting below execution time is reduced to 22s.

class FasterSqlTransform(SqlTransform):
    def _resolve_artifacts(self, components, service, dest):
        # Short circuit unnecessary call that results in the downloading of the 300MB
        # beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar for each transform.
        return components

start_time = time.time()

# FasterSqlTransform with naive but shared BeamJarExpansionService
with beam.Pipeline() as p:
    with BeamJarExpansionService(':sdks:java:extensions:sql:expansion-service:shadowJar') as shared_expansion_service:
        sql_result = (p |
                      "Begin SQL" >> FasterSqlTransform("SELECT 'hello' AS message, 1 AS counter",
                                                           expansion_service=shared_expansion_service))
        for i in range(30):
            sql_result = (sql_result
                          | f"SQL {i}" >>
                          FasterSqlTransform("SELECT message, counter + 1 AS counter FROM PCOLLECTION",
                                                expansion_service=shared_expansion_service))
        sql_result | beam.LogElements()

print(f"Pipeline took {time.time() - start_time} s to execute")

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

liferoad commented 1 month ago

The workaround looks good for the Python Direct Runner. @tvalentyn

tvalentyn commented 1 month ago

cc: @chamikaramj

chamikaramj commented 1 month ago

The ResolveArtifacts call will unconditionally download the 300MB beam-sdks-java-extensions-sql-expansion-service-2.55.0.jar to a temporary directory for each ste

The downloaded jars should be cached. Probably this caching doesn't work for your environment ?

You also have the option of manually specifying the jar [1] or manually starting up an expansion service [2].

[1] --beam_services="{\":sdks:java:extensions:sql:expansion-service:shadowJar\": \"$EXPANSION_SERVICE_JAR\"}" [2] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service

gergely-g commented 1 month ago

@chamikaramj The cache hit will never be detected for the downloaded JARs because of this line: https://github.com/apache/beam/blob/bb51380f1b29a2b69ab82ef795a8895ebd89f87e/sdks/python/apache_beam/runners/portability/artifact_service.py#L294

It always evaluates to False.

A worse problem though that, as mentioned above ArtifactRetrievalService.ResolveArtifacts() call takes 1.5s per SQL query even without the downloading of the actual files.

liferoad commented 1 month ago

@robertwb can you check this?

wollowizard commented 1 month ago

Hi any news? I have also encountered this exact same issue

chamikaramj commented 1 month ago

and False part does seem like a bug but I don't think that actually gets hit since the Java expansion response serves Beam artifacts as DEFERRED artifacts that are retrieved from the locally available expansion service (so URN is DEFERRED not FILE).

https://github.com/apache/beam/blob/fed6489124000b3f222dc444136009ab22e4846e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java#L452C31-L452C48

https://github.com/apache/beam/blob/fed6489124000b3f222dc444136009ab22e4846e/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1437

Expansion service jar is cached elsewhere when starting up the expansion service and served to Python side using the ArtifactRetrievalService.ResolveArtifacts() API. This might be adding the O(seconds) per-query delay you are observing unfortunately.

https://github.com/apache/beam/blob/fed6489124000b3f222dc444136009ab22e4846e/sdks/python/apache_beam/utils/subprocess_server.py#L366