apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.87k stars 4.26k forks source link

[Bug]: [Go SDK] Memory seems to be leaking on 2.49.0 with Dataflow #28142

Closed boolangery closed 9 months ago

boolangery commented 1 year ago

What happened?

Hi,

We updated a Pubsub streaming job on Dataflow from 2.46.0 to 2.49.0. See these memory diagrams:

2.46.0 memory utilisation:

Capture d’écran 2023-08-24 à 09 44 18

2.49.0 memory utilisation:

Capture d’écran 2023-08-24 à 09 44 04

We sent back on the 2.46.0 for this job as workers were running out of memory and a lot of lag was introduced.

Do you have an explanation? What changed on memory management between 2.46.0 and 2.49.0?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

Abacn commented 1 year ago

Hi, could you please raise a customer issue to Dataflow, as jobId / job graph are needed for triaging ?

The symptom reported here is general and hard to find cause without the job info available

Abacn commented 1 year ago

one thing at least could check is to see if 2.47 and 2.48 had the same symptom thus narrow down the issue

boolangery commented 1 year ago

Hi, could you please raise a customer issue to Dataflow, as jobId / job graph are needed for triaging ?

The symptom reported here is general and hard to find cause without the job info available

Sure, where I can submit this? Can't find anything in GCP, do you have a link? Thanks

scwhittle commented 1 year ago

This issue appears to occur in 2.48 as well with a pipeline just consuming from Cloud Pubsub.

    _ = pipeline | "Read pubsub" >> io.ReadFromPubSub(
        subscription=sub, with_attributes=True
    )
liferoad commented 1 year ago

Hi, could you please raise a customer issue to Dataflow, as jobId / job graph are needed for triaging ? The symptom reported here is general and hard to find cause without the job info available

Sure, where I can submit this? Can't find anything in GCP, do you have a link? Thanks

Please check this: https://cloud.google.com/dataflow/docs/support/getting-support#file-bugs-or-feature-requests

tvalentyn commented 1 year ago

@boolangery to confirm, was this a Go or Python pipeline?

chleech commented 1 year ago

I’ve been experiencing the same issue. To validate, we also set up a pipeline that only reads from a single sub, ran it for 2 weeks and the mem is constantly increasing.

Got a response from DF team and their suggestion was to try 2.46.0. Will update here once we manage to test.

IMG_1092

boolangery commented 1 year ago

@boolangery to confirm, was this a Go or Python pipeline?

A Go one

boolangery commented 1 year ago

Issue has been created: https://issuetracker.google.com/issues/297918533

lostluck commented 1 year ago

Adding the following service option when starting the job will let you get / provide CPU and HEAP profiles of the SDK worker in dataflow:

--dataflow_service_options=enable_google_cloud_profiler

From https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#enable_for_pipelines

tvalentyn commented 1 year ago

FYI, we have observed a memory leak in Python SDK, which we correlated with a protobuf dependency upgrade: https://github.com/apache/beam/issues/28246. This issue may or may not be similar in nature.

kennknowles commented 1 year ago

If this makes the Go SDK unusable in 2.49.0 and beyond then per https://beam.apache.org/contribute/issue-priorities/ I would agree with P1. If it is usable in some cases then P2 is appropriate.

kennknowles commented 1 year ago

And if P1 it should not be unassigned and should have ~daily updates and block releases.

boolangery commented 9 months ago

This issue is still here in 2.53

lostluck commented 9 months ago

@boolangery where does the heap profile show the memory is being held? The heap profile can be collected as described in the earlier comment:

https://github.com/apache/beam/issues/28142#issuecomment-1698281500

Otherwise, additional information would be useful for me to replicate the issue. A rough throughput, and message size would be very useful.

lostluck commented 9 months ago

Ah I see https://github.com/apache/beam/issues/28142#issuecomment-1696952206 has been updated with profiles! Thank you.

lostluck commented 9 months ago

The allocation is in makeChannels, which likely means it's the map from instruction/bundle ids to element channels. Something isn't getting cleaned up for some reason.

I believe it's a quick fix, and as the 2.54.0 release manager, I'm going to cherry pick it in once I've got it, since we're still in the "stabilization" phase of the new release. Thank you for your patience and cooperation.

lostluck commented 9 months ago

I've successfully locally reproduced the issue locally using a lightly adjusted local prism runner, executing the pipeline in loopback mode and pprof, and narrowed down the leak to the channel cache in the read loop. It's not as aware of finished instructions as it should be.

Very localized as a fix at least.

lostluck commented 9 months ago

The root cause is a subtle thing from the design of the Beam FnAPI protocol, but otherwise going to be on an SDK to SDK basis.

Essentially, the data channel and the control channel are coordinated. But they are independant. The data could come in before the bundle that processes that data, but we need to hold onto it. Similarly, the ProcessBundle request could come in earlier, and it needs to wait until the data is ready. Or any particular interleaving of the two.

The leak in the code is from that former case, where we're able to pull in all the data before ProcessBundle even starts up. Unfortunately, the Data channel doesn't know if it may close the Go Channel (elementsChan in the code) that sends the elements to the execution layer, until it knows what BundleDescriptor is being used so it can see if the Bundle uses Timers or not, and if so, how many transforms. In practice, there's likely to only be 2 streams, One for data, and the other for timers, but per the protocol, it could be arbitrary, so the SDK can't make an assumptions.

So the flow causing the leak is: See Data for an unseen instruction. Create and cache a elementChan in the read loop. Get all the data. Marks off how many "is-last" signals we see. (Once we have all the IsLasts, the read loop never sees a reference to that instruction ever again). Receives the ProcessBundle request. Know we have everything, close the channel, so the ProcessBundle can terminate.

But leak is because the read loop never "learns" that the data is complete and it can evict that reader from its cache, since the read loop never sees the instructionID again.

PubSub ends up triggering this behavior because outside of backlog catch up, each bundle is for a single element, so this causes a great deal of readers in the cache.

I should have a PR shortly.

boolangery commented 9 months ago

Thank you for the explanation and the fix!

lostluck commented 8 months ago

FYI, 2.54.0 is now available. While I'm pretty sure this issue is now resolved, it's good to get confirmation from affected users too.