apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.21k forks source link

[Bug]: RedisIO#readKeyPatterns failing with OutOfMemory on version 2.39.0 #21825

Open djaneluz opened 2 years ago

djaneluz commented 2 years ago

What happened?

I'm using RedisIO to read/write values on cache and it was working fine with version 2.38.0.

When I moved to version 2.39.0 I started getting errors like:

Execution of work for computation 'P13' for key '<�֤#�͵.;]�NDC���^m��=�w<@癢P�u:k<��ުm�7����6�';� �z �< z0��>2QN����9EX�-����k\����eL� �( ���a����;��φ' failed with out-of-memory. Work will not be retried locally. Heap dump not written.

With stacktrace:

java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
    at java.base/java.lang.Thread.start0(Native Method)
    at java.base/java.lang.Thread.start(Thread.java:803)
    at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:937)
    at java.base/java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1583)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:346)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
    at java.base/java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:779)
    at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed(OutputAndTimeBoundedSplittableProcessElementInvoker.java:312)
    at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:60)
    at org.apache.beam.sdk.io.redis.RedisIO$ReadFn.processElement(RedisIO.java:399)

The error happens when calling RedisIO.readKeyPatterns()

The pipeline fails and gets stuck in the Step: .../ReadRedis/ParDo(Read)/ParMultiDo(Read)/ProcessKeyedElements.out0

When I move back to version 2.38.0 the problem no longer happens

Issue Priority

Priority: 2

Issue Component

Component: io-java-redis

aromanenko-dev commented 2 years ago

It can be related to this change #15549 as the latest big change in RedisIO.

@MiguelAnzoWizeline @benWize Could you take a look, please?

benWize commented 2 years ago

Hi @aromanenko-dev! Miguel who made the larger changes in RedisIO is no longer working on this project, but I will sync with him and provide a response in a couple of days.

aromanenko-dev commented 2 years ago

@benWize Many thanks!

aromanenko-dev commented 2 years ago

@benWize Kind ping on this. Do you have any news by chance?

benWize commented 2 years ago

Hi @aromanenko-dev, sorry for the late response, Miguel is busy and he can't take the issue, but we will find someone else in our team, to take this.

roger-mike commented 2 years ago

Hi, could you give more details about how to reproduce this error?

aromanenko-dev commented 2 years ago

Ping @djaneluz

djaneluz commented 1 year ago

Hello @roger-mike!

I have a streaming pipeline that consumes Pub/Sub messages, extracts keys, group them by window (FixedWindow of 5 min) and call Redis to get the values, that are used as side input to another step.

Something like:

    final PCollection<KV<String, Iterable<MyMessage>>> myMessageKV = pipeline
        .apply("ReadPubSubMessages", PubsubIO.readMessages().fromSubscription(options.getSubscription()))
        .apply("ExtractAndParse", ParDo.of(new ExtractAndParse()))
        .apply("MapMessageWithKey", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
    TypeDescriptor.of(MyMessage.class)))
            .via(m-> KV.of(m.getMyKey(), m)))
        .apply("5MinFixedWindow", Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("GroupByKey", GroupByKey.create());

    final PCollectionView<Map<String, Iterable<String>>> myCache = myMessageKV 
            .apply("ExtractKeys", Keys.create())
            .apply("ReadRedis", RedisIO.readKeyPatterns()
                    .withEndpoint(redisHost, REDIS_PORT)
                    .withOutputParallelization(false))
            .apply("ViewAsMultiMapFromCache", View.asMultimap());

    final PCollectionTuple outputs = myMessageKV 
            .apply("EnrichMessage", ParDo.of(new EnrichMessages())
                     .withSideInput(REF_CODE_CACHE_TAG_ID, refCodeCache));

    ...

I just ran the pipeline again with BEAM version 2.41.0 and got the error. With version 2.38.0 it works just fine.

Let me know if you need any more information,

Thanks

djaneluz commented 1 year ago

Hello, is there any update on this? Thanks

djaneluz commented 4 months ago

I tested the pipeline with Apache Beam version 2.56.0 and the problem still happens

djaneluz commented 4 months ago

ping @roger-mike @aromanenko-dev