googleapis / google-cloud-dotnet

Google Cloud Client Libraries for .NET
https://cloud.google.com/dotnet/docs/reference/
Apache License 2.0
946 stars 369 forks source link

[BigQuery] High Number of Memory Allocations when migrating from legacy insertAll to Storage Write API. #10034

Closed indy-singh closed 1 year ago

indy-singh commented 1 year ago

Environment details

Steps to reproduce

  1. https://github.com/indy-singh/bigquery_storage_write_api_high_memory_allocation - note that I can't get this to leak, but it isn't 1:1 with production code. Just a test harness in which to try different scenarios.

Additional context: https://github.com/protocolbuffers/protobuf/issues/12217

We have recently migrated from the legacy insertAll to the Storage Write API and we've found that a marked increased in memory (see screenshot in link above). The migration is 1:1 as close as can be.

I've diff'ed the methods ExpensiveStreamingInsert and StorageWriteApi from this file https://github.com/indy-singh/bigquery_storage_write_api_high_memory_allocation/blob/c3b50881b61ccaa8fddfea01945d7f67563cab15/bigquery_storage_write_api_high_memory_allocation/WatchTowerBigQuerySaver.cs (this is the actual production code) and nothing looks super suspicious. Online diff: https://www.diffchecker.com/gAJLZUKz/

Which leads to me believe we are using the Storage Write APIs incorrectly (at construction).

We basically keep one AppendRowsStream open/alive for the duration of the application lifetime (which is typically 12+ hours)

var bigQueryWriteClientBuilder = new BigQueryWriteClientBuilder
{
    Credential = googleCredential,
}.Build();

_appendRowsStream = bigQueryWriteClientBuilder.AppendRows();

Is this incorrect? https://cloud.google.com/bigquery/docs/write-api-best-practices seems to imply what we are doing is correct.

Thanks for any help.

Cheers, Indy

jskeet commented 1 year ago

(Admittedly I still wouldn't expect a leak - I'd expect those byte arrays to be reusable - so it's really weird that there are tens of thousands of them available...)

indy-singh commented 1 year ago

This is definitely whack a mole, and made harder by the fact our platform monitoring is built around 472 and dotnetnew isn't backwards compatible with the perf counters we monitor :(

Something else I noticed, it looks like it is all going to Gen1:-

Leaky box:-

image

Non-leaky box:- image

I also noticed this on the leaky box:- image

Which brings up back to your suggestion of maybe we need to consume the response?

One way of testing whether this is causing the problem would be to take your batch size down from 400 (which you estimated to be "between 60K and 70K") to 300 (which should then be ~45K-52.5K).

Yeah can do, I'll knock it down to 200 items per batch to be extra safe.

Cheers, Indy

jskeet commented 1 year ago

That's interesting - I'll look at the Grpc.Core.Internal types. Are you easily able to try consuming the response? That would make some sense... but it's not an area that I'm sufficiently familiar with to speak with confidence on.

indy-singh commented 1 year ago

Are you easily able to try consuming the response?

Yes, but I want to give the chunk change from 400 -> 200 a few hours before I make another change. The slow leak means my feedback loop is pretty lengthy :(

Cheers, Indy

jskeet commented 1 year ago

Yes, but I want to give the chunk change from 400 -> 200 a few hours before I make another change.

Absolutely! I'm very much a fan of "one change at a time" :)

Btw, GAX doesn't use MemoryStream at all. Google.Protobuf does, but only in ByteString.FromStream/FromStreamAsync, which I don't think will be in use here. Admittedly my MemoryStream theory is purely speculative at the moment anyway :)

indy-singh commented 1 year ago

So while we wait for hours to pass by, I'm poking about the Google.Protobuf src and looking for factors of 131072.

And I can only really find:-

https://github.com/protocolbuffers/protobuf/blob/main/csharp/src/Google.Protobuf/CodedInputStream.cs#L82

https://github.com/protocolbuffers/protobuf/blob/main/csharp/src/Google.Protobuf/CodedOutputStream.cs#L63

Neither of which are multiplied at all from what I can see.

indy-singh commented 1 year ago

Poking around the leaky box memory dump using dotnet-dump and running finalizequeue

finalizequeue.log

I have no idea if any of that is relevant, or if it is just confirmation bias. But do we expect Grpc.Core.Internal.BatchContextSafeHandle to be at the top?

indy-singh commented 1 year ago

gchandleleaks.log

I'm on the edge of my knowledge here, so take all of this with a truck load of salt. But I find it sus that it finds 802 handles, and then can't find them. Weird.

jskeet commented 1 year ago

I have no idea if any of that is relevant

Me either! Weirdly enough, I can't even find that (or CallSafeHandle) in the new code. They're both here though for 2.46: https://github.com/grpc/grpc/tree/v1.46.x/src/csharp/Grpc.Core/Internal

Just to check - was this memory dump taken when using Grpc.Core, or Grpc.Net.Client (the default)? (Don't be fooled by the name of Grpc.Core.Internal - that can be seen with both implementations.)

indy-singh commented 1 year ago

The 400 -> 200 batch size change was deployed to our DEMO environment from 16:15.

image

The slow leak is still there, it doesn't look it has changed at all. I've no idea why the shape - I'm calling them sawtooth's - have elongated after 18:00.

Are you easily able to try consuming the response?

Yes, I've made this change, and only enabled it on one box. So I'm leaving 104 as completely disabled, 103 is consuming the response, 102 & 101 are left as is.

Just to check - was this memory dump taken when using Grpc.Core, or Grpc.Net.Client (the default)?

Taken with <PackageReference Include="Grpc.Core" Version="2.46.6" /> which I've also now ripped out as it made no discernible difference.

Cheers, Indy

jskeet commented 1 year ago

Just to check, when you added the Grpc.Core reference, you did also set the GrpcAdapter when building the client, right?

indy-singh commented 1 year ago

Just to check, when you added the Grpc.Core reference, you did also set the GrpcAdapter when building the client, right?

Yes.

image

Another thing which would be useful if you could do it would be to change the "off" box to still think it's writing the records - but use a fake gRPC implementation which just discards all requests. I can help write that code if it would be useful. (There are a couple of different options there.) If we see the memory go up even with a fake, then we should be able to work on something that reproduces the issue without ever having to touch GCP, which would really help. My expectation is that you'd be able to have the same code on all machines, and just a change to how you construct the BigQueryWriteClientBuilder for the "off" machine.

Totally missed this - this sounds like a great thing to try tomorrow.

Cheers, Indy

indy-singh commented 1 year ago

I think we have a winner;

image

101 - as normal 102 - as normal 103 - consume grpc response 104 - BQ saver disabled.

Next steps are to bump back up to chunks of 400 and roll out response consumption to all boxes.

It's rather black box at the moment, I'd love to know where this slow leak is actually occurring.

Cheers, Indy

jskeet commented 1 year ago

Okay, so presumably our working theory is that if we don't consume the responses, then in the real implementation they gradually build up and are never garbage collected. In your fake implementation presumably the response never actually gets populated, or is eligible for garbage collection.

I suspect it should be possible to repro this with just a local gRPC client and server (with a simple streaming API definition) - but that's definitely not something I'd expect you to look into. If just consuming the gRPC response works, I'd stick with that :) I can ask my colleagues in the gRPC team to take a look.

indy-singh commented 1 year ago

In your fake implementation presumably the response never actually gets populated, or is eligible for garbage collection.

No, as far as I can tell the response is there and can be read. But the main factor here is time, none of the benchmarks we ran have been over six hours. I would like to fire and forget, I don't necessarily care about the response in this circumstance, It's been interesting!

Cheers, Indy

indy-singh commented 1 year ago

Oh, I just noticed that appendRowsStream.GetResponseStream() is a disposable.

So currently I'm doing this to consume the response stream:-

await foreach (var appendRowsResponse in appendRowsStream.GetResponseStream())
{

}

I'm wondering if I should actually be doing this?

await using (var asyncResponseStream = appendRowsStream.GetResponseStream())
{
    await foreach (var appendRowsResponse in asyncResponseStream)
    {

    }
}

I'm also wondering if the simple act of foreach'ing over it is implicitly disposing of it?

jskeet commented 1 year ago

Hmm... I don't think actually disposing of it, but I suspect that iterating over everything has the same effect. The latter approach is probably a better approach until I can confirm for sure that the former approach is okay... although I'd really hope it is, just from a usability perspective.

indy-singh commented 1 year ago

This is with all the boxes consuming the response:-

image

I've just made the change to wrap the consumption in a using too.

Cheers, Indy

jskeet commented 1 year ago

That's very good to see :) I'm planning on trying to reproduce the problem with a local server and local client next week - will you have graphs from over the weekend just to confirm that it's stayed this way?

indy-singh commented 1 year ago

Yeah mate, we have graphs going back four years 😄 I'm confident that this is the fix as it normally only required 4-6 hours for the memory baseline to start drifting.

Cheers for your help 👍🏽, Indy

indy-singh commented 1 year ago

Morning - this is the weekend.

DEMO: image

LIVE: image

You can see the slow ramp at the very beginning but once the response consumption fix was deployed you can see that the slow ramp was eliminated. There is a lot of churn in Gen0 with all the byte arrays so it is still more spiky than insertAll. But the leak is fixed as far as I can tell! 👌

Cheers, Indy

jskeet commented 1 year ago

Excellent! I'm glad that the immediate issue is fixed - now we're a little clearer about it, I'll work on reproducing it for the gRPC team.

jskeet commented 1 year ago

I've managed to reproduce this remarkably easily. I'm talking with the gRPC team about what the next steps are - but fundamentally, reading the responses is worth doing. Disposing of the underlying gRPC call prevents the leak, but appears to be "unfriendly" to the connection in some way. I'd recommend code like this:

var stream = client.AppendRows();
// With this in place, the call will be disposed even if something fails.
using var grpcCall = stream.GrpcCall;

// Make your writes here
await stream.WriteAsync(request);
await stream.WriteCompleteAsync();

// And read the responses at the end
await using var responses = stream.GetResponseStream();
await foreach (var response in responses) {}

That's making the assumption that writes can succeed without previous responses having been read. I expect that's the case, but I don't have concrete evidence for that.

indy-singh commented 1 year ago

I think perhaps there is dependent on use case; in this case (and actually all cases where we use BQ and gRPC) we don't care about the response at all. It is very much fire and forget.

await using var responses = stream.GetResponseStream();
await foreach (var response in responses) {}

I'm fine to do that, but maybe the docs need to be updated to explicitly warn about what happens if response consumption is absent?

Alternatively, if I could configure the BQ client to be "fire-and-forget" that would be ideal too.

jskeet commented 1 year ago

We'll absolutely be doing some combination of documentation and code changes.

Ideally gRPC would be able to clean up in the background aside from anything else, but if we at least make the type returned by AppendRows implement IDisposable, then it would be an improvement.

indy-singh commented 1 year ago

using var appendRowsStream = _bigQueryWriteClientBuilder.AppendRows(); would be perfect as it would fall into the path of success with static code analyzers and other tooling that warns about undisposed resources.

Yeah I gotta admit I was of the opinion that the gRPC in the background would clean itself up after the method invocation had finished. Ah well.

Do you mind tagging me when the documentation is updated? No rush, just keen to understand more of how we should be using BQ gRPC calls.

jskeet commented 1 year ago

Do you mind tagging me when the documentation is updated? No rush, just keen to understand more of how we should be using BQ gRPC calls.

Sure. It may well be in multiple stages though, as each individual component or piece of documentation is updated.

indy-singh commented 1 year ago

Hmmm, the mystery continues! It's been rock solid for a few days and I'm just checking the graphs post-easter-eggs and the slow growth has re-appeared again. To be clear, the code around consuming the response and disposal is still in and enabled.

image

I'll dig into it more post-lamb :)

Hope you are having a nice Easter.

Cheers, Indy

indy-singh commented 1 year ago

It looks like all the growth is in gen 2 - we've only just rolled out querying dotnet counters so the graphs are a little patchy.

image

indy-singh commented 1 year ago

Ahhh, it seems like BQ was "down" and the exception path in that scenario somehow leaks? We have a few logs on our DEMO environment that seem to line up at first glance. I'm used to the legacy insertAll method logging a lot and timing out (5 seconds). The gRPC stuff has been relatively rock solid and highly performant thus far.

I've attached the raw logs.

logs.zip

Looks like all the exceptions fall into one the following seven:-

"Grpc.Core.RpcException: Status(StatusCode=""Aborted"", Detail=""Closing the stream because server is restarted. This is expected and client is advised to reconnect. Entity: projects/indy-test-watchtower/datasets/DEMO_watchtower/tables/watchtower_v2/streams/_default"")
"Grpc.Core.RpcException: Status(StatusCode=""Cancelled"", Detail=""Call canceled by the client."")
"Grpc.Core.RpcException: Status(StatusCode=""Cancelled"", Detail=""CANCELLED"", DebugException=""Grpc.Core.Internal.CoreErrorDetailException: ""CANCELLED"""")
"Grpc.Core.RpcException: Status(StatusCode=""Internal"", Detail=""Internal error encountered. Entity: projects/indy-test-watchtower/datasets/DEMO_watchtower/tables/watchtower_v2/streams/_default"")
"Grpc.Core.RpcException: Status(StatusCode=""Unavailable"", Detail=""502:Bad Gateway"")
"Grpc.Core.RpcException: Status(StatusCode=""Unavailable"", Detail=""Error starting gRPC call. HttpRequestException: An error occurred while sending the request. IOException: The request was aborted. IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host.. SocketException: An existing connection was forcibly closed by the remote host."", DebugException=""System.Net.Http.HttpRequestException: An error occurred while sending the request.
"Grpc.Core.RpcException: Status(StatusCode=""Unavailable"", Detail=""Request 'AppendRows' from role 'cloud-dataengine-globalrouting' throttled: Task is overloaded (memory-protection) go/tr-t."")

I guess in the case of any RpcException I should be over writing the existing _bigQueryWriteClientBuilder with a new instance.

Cheers, Indy

jskeet commented 1 year ago

I guess in the case of any RpcException I should be over writing the existing _bigQueryWriteClientBuilder with a new instance.

I wouldn't expect that to help, although I guess it could.

Do you definitely have the code that disposes the underlying gRPC call, even if there's an error while reading the responses? This line of code should do it:

using var grpcCall = stream.GrpcCall;

If you haven't already got that, I'd add it. (We'll make stream itself implement IDisposable soon.)

If you do already have that, I don't know where the leak has come from...

indy-singh commented 1 year ago

Do you definitely have the code that disposes the underlying gRPC call, even if there's an error while reading the responses?

No we don't. I've added it now 👍🏽

This is what we have post-add:-

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var appendRowsStream = _bigQueryWriteClientBuilder.AppendRows(CallSettings.FromCancellationToken(cancellationTokenSource.Token));
// as per https://github.com/googleapis/google-cloud-dotnet/issues/10034#issuecomment-1502784359
using var grpcCall = appendRowsStream.GrpcCall;
var sizes = new List<int>();

// ToByteString creates a lot of memory churn on Gen0
// The average watchtower record when protobuf is around 162.4876265161838294 bytes (sample size of 115215).
// so we now chunk over the list in chunks of 400 which would put us at around 64,800 bytes for this list in it's entirety.
// previously the average list size is 453047.083047893235 bytes (sample size of 115319)
// which was going onto the LOH
foreach (var records in list.Chunk(400))
{
    sizes.Add(records.Sum(x => x.CalculateSize()));

    var protoData = new AppendRowsRequest.Types.ProtoData
    {
        WriterSchema = _writerSchema,
        Rows = new ProtoRows
        {
            SerializedRows = { records.Select(x => x.ToByteString()) },
        },
    };

    await appendRowsStream.WriteAsync(new AppendRowsRequest
    {
        ProtoRows = protoData,
        WriteStream = _writeStreamName,
    }).ConfigureAwait(false);
}

await appendRowsStream.WriteCompleteAsync().ConfigureAwait(false);

// this is disposable and recommended here: https://github.com/googleapis/google-cloud-dotnet/issues/10034#issuecomment-1491902289
await using (var asyncResponseStream = appendRowsStream.GetResponseStream())
{
    // need to consume the response even if we don't do anything with it
    // other slow ass memory leak: https://github.com/googleapis/google-cloud-dotnet/issues/10034#issuecomment-1491449182
    await foreach (var appendRowsResponse in asyncResponseStream)
    {

    }
}

Cheers, Indy