apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.82k stars 4.24k forks source link

[Bug]: Apache Beam FlinkRunner does not process Kafka messages #32709

Open Jerryporter opened 3 days ago

Jerryporter commented 3 days ago

What happened?

I am working on a pipeline that integrates Apache Beam with Kafka, using Flink as the runner for the pipeline. The pipeline works fine for small, bounded datasets (e.g., the WordCount example), but when I switch to Kafka to process unbounded streams, no output is generated. Despite the pipeline executing without obvious errors, it fails to process Kafka messages.

QUESTION:

Kafka is running inside Docker, and the Flink cluster is started locally using start-cluster.sh. The pipeline runs successfully, but the Kafka messages are not consumed or processed as expected. Below is the relevant code and additional details.

class ParseKafkaMessage(beam.DoFn):
    def process(self, kafka_message):
        image_path = kafka_message[1].decode('utf-8')
        yield {"image_path": image_path}

def map_message_image_path(result):
    return result['image_path']

def print_message(record):
    print(f"Received message: {record.decode('utf8')}")
    return record 

def run(argv=None, save_main_session=True):
    """
    Args:
      argv: Command line arguments defined for this example.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--bootstrap_servers', dest='bootstrap_servers', required=True,
                         help='Kafka bootstrap servers.')
     parser.add_argument('--topic', dest='topic', required=True,
                      help='Kafka topic to consume from.')
     known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions()
    logging.getLogger().setLevel(logging.INFO)

    pipeline_options.view_as(
        SetupOptions).save_main_session = save_main_session
    print('start!')

    CONSUMER_CONFIG = {
        "bootstrap.servers": "localhost:9092",
        # "auto.offset.reset": "earliest",
        'group.id': 'temp_group'
    }

    CONSUMER_TOPICS = ["test"]

    pipeline_options = beam.options.pipeline_options.PipelineOptions([
        "--runner=FlinkRunner",
         "--flink_master=localhost:8081",
         "--environment_type=LOOPBACK",
    ]
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        consumed = (
            pipeline
            | ReadFromKafka(
                consumer_config=CONSUMER_CONFIG,
                topics=CONSUMER_TOPICS,
                # max_num_records=3,
                # max_read_time=60,
            )
            # | beam.Map(print))
            # | beam.Map(lambda record: convert_kafka_record_to_dictionary(record)))
            | beam.Map(lambda record: print_message(record)))

if __name__ == '__main__':
    run("--output /home/root/beam-llm/kafka_output.txt --runner FlinkRunner".split())

Steps to reproduce:

  1. Set up a local Flink cluster (start-cluster.sh) and a Kafka broker running in Docker.
  2. Create a Kafka topic (test) and publish some messages containing image paths.
  3. Implement the following Beam pipeline to consume and process messages from Kafka:

Expected behavior:

The Kafka messages should be consumed by the Beam pipeline, and the print_message() function should output the messages to the console.

Environment: Apache Beam version: [2.59.0] Flink version: [1.18.1] Kafka version: [docker:last] Runner: FlinkRunner

Actual behavior:

The pipeline starts without any errors. The Flink job is successfully submitted, but the Kafka messages are not consumed or processed, and no output is generated. I have verified that Kafka is working correctly and that messages are available in the topic. However, the Beam pipeline does not consume the messages as expected.

What I have tried:

Log Files:

INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.59.0/beam-sdks-java-io-expansion-service-2.59.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar 
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar' '51799' '--filesToStage=/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar' '--alsoStartLoopbackWorker']
INFO:apache_beam.utils.subprocess_server:WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
......
INFO:apache_beam.utils.subprocess_server:INFO: Source: Impulse (1/1) (359af0cc3ed2f4c625e558bebd61c64b_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:43293.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService control
INFO:apache_beam.utils.subprocess_server:INFO: Beam Fn Control client connected with id 1-2
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:43 AM org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory createEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:48 AM org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory createEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:54 AM org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory createEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:59 AM 
....
INFO:apache_beam.utils.subprocess_server:SEVERE: Docker container a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5 logs:
INFO:apache_beam.utils.subprocess_server:2024/10/08 03:32:38 Failed to obtain provisioning information: failed to dial server at localhost:34507
INFO:apache_beam.utils.subprocess_server:   caused by:
INFO:apache_beam.utils.subprocess_server:context deadline exceeded
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.taskmanager.Task transitionState
INFO:apache_beam.utils.subprocess_server:WARNING: [3]ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/4)#0 (359af0cc3ed2f4c625e558bebd61c64b_24761c05670fa5069ce6a1b3d4c931eb_0_0) switched from INITIALIZING to FAILED with failure cause:
INFO:apache_beam.utils.subprocess_server:org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5
....
INFO:apache_beam.utils.subprocess_server:INFO: Stopping Pekko RPC service.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.pekko.event.slf4j.Slf4jLogger$$anonfun$receive$1 $anonfun$applyOrElse$3
INFO:apache_beam.utils.subprocess_server:INFO: Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.rpc.pekko.PekkoRpcService closeAsync
INFO:apache_beam.utils.subprocess_server:INFO: Stopping Pekko RPC service.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.rpc.pekko.PekkoRpcService lambda$closeAsync$7
INFO:apache_beam.utils.subprocess_server:INFO: Stopped Pekko RPC service.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.pekko.event.slf4j.Slf4jLogger$$anonfun$receive$1 $anonfun$applyOrElse$3
INFO:apache_beam.utils.subprocess_server:INFO: Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.blob.AbstractBlobCache close
INFO:apache_beam.utils.subprocess_server:INFO: Shutting down BLOB cache
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.blob.AbstractBlobCache close
INFO:apache_beam.utils.subprocess_server:INFO: Shutting down BLOB cache
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.blob.BlobServer close
INFO:apache_beam.utils.subprocess_server:INFO: Stopped BLOB server at 0.0.0.0:43151
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.rpc.pekko.PekkoRpcService lambda$closeAsync$7
INFO:apache_beam.utils.subprocess_server:INFO: Stopped Pekko RPC service.
start!
Traceback (most recent call last):
  File "/home/chy/beam-llm/kafka-beam.py", line 196, in <module>
    run()
  File "/home/chy/beam-llm/kafka-beam.py", line 176, in run
    with beam.Pipeline(options=pipeline_options) as pipeline:
  File "/home/chy/beam-llm/beam/lib/python3.10/site-packages/apache_beam/pipeline.py", line 621, in __exit__
    self.result.wait_until_finish()
  File "/home/chy/beam-llm/beam/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 568, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-chy-1008033013-53c548ce_c23a0db3-f61e-41fb-b989-f867c0923852 failed in state FAILED: java.lang.IllegalStateException: No container running for id a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5

It seems that the Docker container did not start, but in fact, when checking with docker ps, it shows that Flink has started a container and it is running. However, it appears that this container did not execute the task correctly.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

liferoad commented 1 day ago

Do you see the error mentioned in https://github.com/apache/beam/issues/32743?