apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.21k forks source link

[Bug]: ReadFromKafka not forwarding in streaming mode version on portable runners #25114

Open jihad-akl opened 1 year ago

jihad-akl commented 1 year ago

What happened?

ReadFromKafka not forwarding in streaming mode. using apache-beam 2.44.0

beam_options = PipelineOptions(streaming = True) pipeline = beam.Pipeline(options=beam_options)

messages = ( pipeline | 'Read from Kafka' >> ReadFromKafka( consumer_config=json.load(open("config/consumer_config_beam.json")), topics=topic ) | 'Print messages' >> beam.Map(lambda message: print("received!")) )

Hello, in the code above, the code is stuck on ReadFromKafka. Adding max_num_records will only wait for the specific amount of data and them forward them to the next step and ends the codes. (I am using the DirectRunner I need to run the code locally)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

Abacn commented 1 year ago

streaming by definition will not end; despite python directly runner is not for production and do not have full support for streaming. This is most likely working as intended

jihad-akl commented 1 year ago

True, so how can use the apache beam pipeline in streaming mode if it only gather data and not send them to the next step? The print received does not trigger every time I receive a message from kafka locally. Isn't it an important bug? and the direct runner need some fixes?

tvalentyn commented 1 year ago

https://github.com/apache/beam/issues/24528 tracks various issues related to streaming direct runner. I am not sure if it is able to run a simple KafkaIO pipeline. Are you able to use a portable Flink Runner by chance

jihad-akl commented 1 year ago

I am trying to to implement it but till now I am facing the same issue, I can see my pipeline in the apache flink localhost:8081 but nothing happens, I am debugging it to see if I made any mistake

jihad-akl commented 1 year ago

So after researching and testing, I found that the Flink Runner does not help because apache flink gather a lot of data before releasing them, my use case is every message I receive from kafka I need to forwarded to the next step in the pipeline (locally).

Abacn commented 1 year ago

fyi if flink runner has the same issue, it may hit #22809, the issue in python side may still persist CC: @johnjcasey I will also take a look

jihad-akl commented 1 year ago

To reproduce: consumer_config.json: { "bootstrap.servers": "127.0.0.1:9092" } main.py: topic = ["multi-video-stream"]

beam_options = PipelineOptions(["--runner=FlinkRunner","--flink_version=1.15","--flink_master=localhost:8081"
                                ,"--environment_type=LOOPBACK","--streaming"])
with beam.Pipeline(options=beam_options) as p:
    messages = p | 'Read from kafka' >> ReadFromKafka(consumer_config=json.load(open("consumer_config.json"))
                                                   ,topics=topic)
    messages | 'Print Messages' >> beam.Map(print)
jihad-akl commented 1 year ago

I am using this producer_config: { "bootstrap.servers": "localhost:9092", "enable.idempotence": true, "retries": 100, "max.in.flight.requests.per.connection": 5, "compression.type": "snappy", "linger.ms": 5, "batch.num.messages": 1, "queue.buffering.max.ms": 0, "queue.buffering.max.messages": 10 } and this kafka: yml https://github.com/conduktor/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml for the producer code:

producer = Producer(json.load(open("producer_config.json"))) frame_no = 0 while True:

    frame_bytes = "hello" + str(frame_no)
    producer.produce(
        topic="multi-video-stream", 
        value=frame_bytes, 
        on_delivery=delivery_report,
        timestamp=frame_no,
        headers={
            "test": str.encode("test")
        }
    )
    frame_no+=1
    # producer.poll(1)
    producer.flush()

    time.sleep(0.1)
jihad-akl commented 1 year ago

Please Note that if I use flink runner 1.14 I get:

ERROR:apache_beam.utils.subprocess_server:Starting job service with ['java', '-jar', '/root/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.44.0.jar', '--flink-master', 'http://localhost:8081', '--artifacts-dir', '/tmp/beam-temp1of29sbe/artifactsz8je29uo', '--job-port', '33755', '--artifact-port', '0', '--expansion-port', '0'] ERROR:apache_beam.utils.subprocess_server:Error bringing up service Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/apache_beam/utils/subprocess_server.py", line 88, in start raise RuntimeError( RuntimeError: Service failed to start up with error 1 Traceback (most recent call last): File "main.py", line 38, in with beam.Pipeline(options=beam_options) as p: File "/usr/local/lib/python3.10/dist-packages/apache_beam/pipeline.py", line 600, in exit self.result = self.run() File "/usr/local/lib/python3.10/dist-packages/apache_beam/pipeline.py", line 577, in run return self.runner.run_pipeline(self, self._options) File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/flink_runner.py", line 45, in run_pipeline return super().run_pipeline(pipeline, options) File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/portable_runner.py", line 439, in run_pipeline job_service_handle = self.create_job_service(options) File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/portable_runner.py", line 318, in create_job_service return self.create_job_service_handle(server.start(), options) File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/job_server.py", line 81, in start self._endpoint = self._job_server.start() File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/job_server.py", line 110, in start return self._server.start() File "/usr/local/lib/python3.10/dist-packages/apache_beam/utils/subprocess_server.py", line 88, in start raise RuntimeError( RuntimeError: Service failed to start up with error

Abacn commented 1 year ago

per #22809 the cause is likely #20979. It is due to a feature lacking on python portable runner. Dataflow runner is not affected. The thing I am not sure is why the unbounded reader also not working.

vjixy commented 1 year ago

So there is bugs in the portable runners? :(

Abacn commented 1 year ago

yes, or feature missing

alexmreis commented 1 year ago

The implementation of Kafka in the Python SDK + Portable Runner is unfortunately rather broken for streaming use cases. I don't understand why there isn't a native python implementation based on https://github.com/confluentinc/confluent-kafka-python that doesn't have to deal with the portability layer. It would be much more reliable, even if maybe less capable of parallel compute.

Our company has abandoned Beam and Dataflow for this very reason. Last bug I opened in August 2022, #22809 was closed today but still depends on 2 other issues, one of which remains unsolved #25114 half a year later. The Python SDK is clearly not a priority for the core team. Maybe they're too busy focusing on GCP-specific products like PubSub to put in the effort to make open source tools, like Kafka, work properly in Beam's Python SDK. There isn't even a single unit test in the test suite for an unbounded Kafka stream being windowed and keyed.

As someone who really believes in Beam as a great portable standard for data engineering, it's sad to see the lack of interest from the core team in anything that is not making Google money (although we would still be paying for Dataflow if it worked).

Abacn commented 1 year ago

Hi @alexmreis sorry if there is any misunderstanding, #22809 is closed because the issue on KafkaIO side is fixed, by #24205 (it comments closes #22809: https://github.com/apache/beam/pull/24205#issuecomment-1353257737) That said, the use case of Dataflow Runner should be fixed in upcoming Beam v2.45.0

It still experiencing issues on portable runner (flink, direct streaming) is an issue not limited to kafka source. It affects all "splittable DoFn" streaming source. This functionality is not yet supported by portable runner (#20979). I also got bite by this issue quite often (when I validating the fix of #24205, see comments of #22809 I had). The gap between Dataflow and local runners is definitely an important thing need improve. This has direct impact to developers.

Besides, no unit test in Python Kafka IO is intended. Within the cross-language framework, the code running kafka read is Java's KafkaIO and unit test is exercised there. We have CrossLanguage Validation Runner (XVR) Tests for each xlang IO and each SDK exercised in schedule. And I recently added a Python KafkaIO performance test also. That said KafkaIO in both Java and Python are our team's priority.

hadikoub commented 1 year ago

Was this issue addressed in the new version 2.45?

Abacn commented 1 year ago

Was this issue addressed in the new version 2.45?

Not yet. This is the feature gap in portable runner. May need substantial effort. I am trying to work on it currently though

jihad-akl commented 1 year ago

Any update for this issue in version 2.47?

jihad-akl commented 1 year ago

Any update for this issue in version 2.48?

jihad-akl commented 1 year ago

Was this issue addressed in the new version 2.45?

Not yet. This is the feature gap in portable runner. May need substantial effort. I am trying to work on it currently though

Any update?

Abacn commented 1 year ago

Not able to get into this.

jihad-akl commented 1 year ago

Any update for this issue in version 2.49?

jihad-akl commented 1 year ago

Not able to get into this.

Any idea where the problem is to try and make a work around?

jihad-akl commented 1 year ago

Any news for version 2.50?

jihad-akl commented 11 months ago

Almost 1 year and didn't get any clear response if that bug will be fixed or no, 7 versions from version 2.44 to 2.51 and the bug remains. Will this bug be fixed?

MrYanMYN commented 2 weeks ago

For anybody stumbling upon this issue, a year later this bug is still present

liferoad commented 2 weeks ago

There is no plan to fix this for Python DirectRunner. We are moving to Prism Runner (https://github.com/apache/beam/issues/29650). The goal is make this as the default one for all SDKs to allow users to do local tests and developments with this local runner. This work is currently on-going.

@kennknowles FYI. For Beam on Flink.