apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

Bug: apache_beam.io.gcp.pubsublite.ReadFromPubSubLite not working #28326

Open TimKleinlein opened 1 year ago

TimKleinlein commented 1 year ago

What happened?

Problem description

The function apache_beam.io.gcp.pubsublite.ReadFromPubSubLite is not working. This can be seen as a simple data processing pipeline for reading messages from a Google Cloud Pub/Sub Lite subscription and logging the messages' content is not working for apache-beam=2.50.0 and google-cloud-pubsublite=1.8.3.

Code to reproduce issue

This is the Python file defining the pipeline:

import logging
import apache_beam
from apache_beam.io.gcp import pubsublite
from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions

def run(pipeline_args=None):
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=False
    )

    with Pipeline(options=pipeline_options) as pipeline:
        elements = (
            pipeline
            | "Read from Pub/Sub" >> pubsublite.ReadFromPubSubLite(subscription_path="projects/<PROJECT_ID>/locations/<LOCATION>/subscriptions/<SUBSCRIPTION_NAME>")
            | "Print Elements" >> apache_beam.Map(lambda x: logging.info(x.getMessage()) or x)
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    run()

When running the pipeline locally the only logs printed are the following:

INFO:root:severity: INFO
timestamp {
  seconds: 1694002848
  nanos: 949000000
}
message: "Fn Harness started"
log_location: "org.apache.beam.fn.harness.FnHarness"
thread: "1"

INFO:root:severity: INFO
timestamp {
  seconds: 1694002849
  nanos: 579000000
}
message: "Entering instruction processing loop"
log_location: "org.apache.beam.fn.harness.FnHarness"
thread: "1"

Now I post a message on the Pub/Sub Lite topic the subscription has subscribed: gcloud pubsub lite-topics publish <TOPIC_NAME> --location <LOCATION> --project=<PROJECT_ID> --message="Hello world"

This message is not printed despite the pipeline process still running.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

liferoad commented 1 year ago

@dpcollins-google, can you check this?

liferoad commented 1 year ago

@TimKleinlein Does the same code work with the previous Beam version?

dpcollins-google commented 1 year ago

Please respond to @liferoad 's questions. If this has never previously worked, likely it is a permissioning or connectivity issue. If it has worked previously, please send the full topic and subscription path with a timeline to pubsub-lite-help@google.com

p13rr0m commented 1 year ago

Thank you for your help. We are not able to create a functioning pipeline with previous Beam versions. However, since we are both using Docker on Mac, we think this might be the problem here. There is already an issue that sounds related https://github.com/apache/beam/issues/23440. Unfortunately, the solution

# By default Beam expects workers (of the SDK worker pool) to connect to a Spark worker on `localhost`. When running
# the worker pool in docker on a Mac this isn't possible due to the lack of `host` networking. Using
# BEAM_WORKER_POOL_IN_DOCKER_VM=1, Beam will use `host.docker.internal` to communicate via the docker host instead.
export BEAM_WORKER_POOL_IN_DOCKER_VM=1

# DOCKER_MAC_CONTAINER=1 limits the ports on a Spark worker for communication with SDK workers to the range 8100 - 8200
# instead of using random ports. Ports of the range are used in a round-robin fashion and have to be published.
export DOCKER_MAC_CONTAINER=1

didn't work for us.

This is the whole log

python main.py
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
/Users/rome/.conda/envs/beam-test/lib/python3.9/site-packages/apache_beam/transforms/external.py:676: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  self._expansion_service, pipeline.options)
INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.50.0/beam-sdks-java-io-google-cloud-platform-expansion-service-2.50.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /Users/rome/.apache_beam/cache/jars/beam-sdks-java-io-google-cloud-platform-expansion-service-2.50.0.jar
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/Users/rome/.apache_beam/cache/jars/beam-sdks-java-io-google-cloud-platform-expansion-service-2.50.0.jar' '65272' '--filesToStage=/Users/rome/.apache_beam/cache/jars/beam-sdks-java-io-google-cloud-platform-expansion-service-2.50.0.jar']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:65272
INFO:apache_beam.utils.subprocess_server:Sep 07, 2023 12:20:22 PM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:transform:org.apache.beam:pubsub_read:v1, beam:transform:org.apache.beam:pubsub_write:v1, beam:transform:org.apache.beam:pubsublite_write:v1, beam:transform:org.apache.beam:pubsublite_read:v1, beam:transform:org.apache.beam:spanner_insert:v1, beam:transform:org.apache.beam:spanner_update:v1, beam:transform:org.apache.beam:spanner_replace:v1, beam:transform:org.apache.beam:spanner_insert_or_update:v1, beam:transform:org.apache.beam:spanner_delete:v1, beam:transform:org.apache.beam:spanner_read:v1, beam:transform:org.apache.beam:schemaio_jdbc_read:v1, beam:transform:org.apache.beam:schemaio_jdbc_write:v1, beam:transform:org.apache.beam:schemaio_pubsub_read:v1, beam:transform:org.apache.beam:schemaio_avro_write:v1, beam:transform:org.apache.beam:schemaio_bigquery_write:v1, beam:transform:org.apache.beam:schemaio_pubsub_write:v1, beam:transform:org.apache.beam:schemaio_bigquery_read:v1, beam:transform:org.apache.beam:schemaio_avro_read:v1, beam:transform:org.apache.beam:schemaio_datastoreV1_write:v1, beam:transform:org.apache.beam:schemaio_datastoreV1_read:v1, beam:external:java:generate_sequence:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered transforms:
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:pubsub_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@265adfad
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:pubsub_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@a3d9978
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:pubsublite_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@61544ae6
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:pubsublite_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@4b41dd5c
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:spanner_insert:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@3b96c42e
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:spanner_update:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5d066c7d
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:spanner_replace:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1e461e41
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:spanner_insert_or_update:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5ba88be8
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:spanner_delete:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@56928307
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:spanner_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@3899782c
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_jdbc_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1603cd68
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_jdbc_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@9ebe38b
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_pubsub_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@4b23c30a
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_avro_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@38089a5a
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_bigquery_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@30e868be
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_pubsub_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@66c92293
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_bigquery_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@332796d3
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_avro_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@4f0100a7
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_datastoreV1_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@3cdf2c61
INFO:apache_beam.utils.subprocess_server:   beam:transform:org.apache.beam:schemaio_datastoreV1_read:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@13ad5cd3
INFO:apache_beam.utils.subprocess_server:   beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1c33c17b
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders:
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:jdbc_write:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:spanner_cdc_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:bigtable_write:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:spanner_write:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:pubsublite_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:bigquery_storage_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:bigquery_export_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:jdbc_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:bigquery_storage_write:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:pubsub_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:pubsub_write:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:bigtable_read:v1
INFO:apache_beam.utils.subprocess_server:   beam:schematransform:org.apache.beam:pubsublite_write:v1
WARNING:root:Waiting for grpc channel to be ready at localhost:65272.
INFO:apache_beam.utils.subprocess_server:Sep 07, 2023 12:20:26 PM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from Pub/Sub/_ReadExternal(beam:transform:org.apache.beam:pubsublite_read:v1)' with URN 'beam:transform:org.apache.beam:pubsublite_read:v1'
INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
INFO:apache_beam.utils.subprocess_server:Sep 07, 2023 12:20:26 PM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig
INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.io.gcp.pubsublite.internal.ExternalTransformConfig$ReadConfig' has no schema registered. Attempting to construct with setter approach.
INFO:apache_beam.utils.subprocess_server:Sep 07, 2023 12:20:27 PM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig
INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.io.gcp.pubsublite.internal.ExternalTransformConfig$ReadConfig' has no schema registered. Attempting to construct with setter approach.
INFO:apache_beam.typehints.native_type_compatibility:Converting string literal type hint to Any: "Message"
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x116409b80> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x116409ca0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x11640a1f0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x11640a280> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x11640a430> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x11640a4c0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x11640a5e0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x11640a670> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x11640a700> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x11640a790> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x11640a9d0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function add_impulse_to_dangling_transforms at 0x11640aaf0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x11640a940> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x11640aa60> ====================
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting control server on port 65290
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting data server on port 65291
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting state server on port 65292
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting logging server on port 65293
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.DockerSdkWorkerHandler object at 0x11670bbb0> for environment external_1beam:env:docker:v1 (beam:env:docker:v1, b'\n\x1dapache/beam_java11_sdk:2.50.0')
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Attempting to pull image apache/beam_java11_sdk:2.50.0
2.50.0: Pulling from apache/beam_java11_sdk
Digest: sha256:2205d75d227f9d1eb816fded539bb0f4c6b138f4f1e3baafadf10cafe835e248
Status: Image is up to date for apache/beam_java11_sdk:2.50.0
docker.io/apache/beam_java11_sdk:2.50.0
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Waiting for docker to start up. Current status is running
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Docker container is running. container_id = b'e2b119f64211d7d86fb99fc25f2bb3b0927eda10e17229c82e84e532b88cfe02', worker_id = worker_0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x116725ca0> for environment ref_Environment_default_environment_2 (beam:env:embedded_python:v1, b'')
INFO:root:severity: INFO
timestamp {
  seconds: 1694082043
  nanos: 225000000
}
message: "Fn Harness started"
log_location: "org.apache.beam.fn.harness.FnHarness"
thread: "1"

INFO:root:severity: INFO
timestamp {
  seconds: 1694082043
  nanos: 535000000
}
message: "Entering instruction processing loop"
log_location: "org.apache.beam.fn.harness.FnHarness"
thread: "1"
tvalentyn commented 1 year ago

Have you by chance tried running this on Dataflow runner? I wonder if this is a Beam Direct Runner problem or some local connectivity issue.