Closed philvmx1 closed 5 months ago
Can you please share a minimal but complete project that would reproduce the problem? A console application should be fine. Note I understand your theory requires something specific to happen to trigger the issue, so I understand this repro project won't be able to consistently reproduce the problem, that's fine, still I'd expect it would reproduce it if we run it long enough. We need to see exactly how your code looks like, for instance, you mention StartNew
but we don't have a StartNew
method or type in Google.Cloud.PubSub.V1
.
my apologies, I meant StartAsync. Will follow up with a sample console app.
We also did some metrics analysis and this seems to happen while there is a sharp spike in ModifyAckDeadline requests across all of our services. This specific service is shown below - it's dropping Number of open streaming pull requests.
This specific topic uses message ordering. At this time, the message ordering key is unique per message (filename since we're using Object Create triggers in GCS). It is also the only subscription where we see this issue. We have a change coming soon to make Ordering Key per file prefix instead.
Sample Code:
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
const string PROJECT_NAME = "myproject";
const string SUBSCRIPTION_NAME = "mysubscription";
var subscriberClientBuilder = new SubscriberClientBuilder
{
Settings = new ()
{
FlowControlSettings = new FlowControlSettings(10,null),
},
SubscriptionName = new (PROJECT_NAME, SUBSCRIPTION_NAME)
};
var cts = new CancellationTokenSource();
var subscriber = await subscriberClientBuilder.BuildAsync(cts.Token);
var task = subscriber.StartAsync(async (message, ct) =>
{
Console.WriteLine($"Message Received {message.MessageId}");
if (ct.IsCancellationRequested) Console.WriteLine("Cancellation Requested");
return await Task.FromResult(SubscriberClient.Reply.Ack);
});
Console.WriteLine("Waiting for Messages, Press any key to cancel and exit.");
Console.ReadKey();
cts.Cancel();
// Oddly, without StopAsync, StartAsync doesn't stop even though we called cancel on the ambient CTS.
await subscriber.StopAsync(cts.Token);
await task;
if (cts.Token.IsCancellationRequested) Console.WriteLine("StartAsync done because Cancelled!");
Console.WriteLine("DONE!");
relevant terraform:
resource "google_pubsub_subscription" "thesubscription" {
name = "thetopicname"
topic = "thetopic"
ack_deadline_seconds = 600
enable_message_ordering = true
enable_exactly_once_delivery = true
retry_policy {
minimum_backoff = "60s"
}
dead_letter_policy {
dead_letter_topic = google_pubsub_topic.thetopic-dlq.id
max_delivery_attempts = 15
}
}
Metrics showing dropoff of Streaming Pull connections over time until it finally dies all the way and we have to restart the service. The service runs as a GKE Deployment with 2 Replicas.
Streaming Pull dropping off over time. All subscriptions ModifyingAckDeadline:
Filtered modify ack deadline to the one subscription_id:
Dropoff and Ack Message count:
Given that last chart, I wonder if it's related to the unique OrderingKey per message more than anything. I saw that the code had a queue for Ordered messages, but it doesn't make sense that it would behave this way. When one message got pulled in with "File123" and is ACK'd then another comes with "File234" the first one is ACK'd so should no longer have an impact on ordering.
FWIW: we process 9 distinct files each day. Each have a distinct name from day to day. Each one may take 10-20 minutes to complete, hence the deadline extension.
Thanks for the code and the detailed explanation. One reamining question is which Google.Cloud.PubSub.V1 version are you using, exactly.
I'll try to get to this later today or tomorrow.
3.7.0
Thanks!
We deployed the change to OrderingKey yesterday, usually takes a day to see the StreamingPull drop off. Will be checking for the remainder of the week to see if it follows the same pattern as before.
It seems we are still in the same situation even after changing the OrderingKey to file prefix "FileTypeA", "FileTypeB", ... instead of "FileTypeA_20240228" "FileTypeA_20240229", "FileTypeB_20240228", "FileTypeB_20240229", ...
Every time the daily batch messages are processed (approx 9 individual files per day with 5 different file prefixes, originating from a batch of files uploaded to bucket), there are fewer open streaming pulls until eventually they stop altogether.
If it helps, some of the message processing requires a ModifyAckDeadline due to taking longer than 10 mins. Perhaps this is part of the problem, perhaps not.
Thanks for the extra info. I'll take a look as soon as I can but just for expectations it will probably be early next week.
Assigning to @jskeet as he's more familiar with this library than I am.
Okay, I'm looking into this now - I'm afraid my experience with diagnosing Pub/Sub issues is that there'll be a lot of back and forth with various questions and tests before we get to the root problem. Thanks so much for all the info you've already provided.
I can explain one thing off the bat: the reason that cts.Cancel()
isn't doing anything is that the cancellation token passed into BuildAsync
is only used for the build task itself. Building a client involves obtaining a credential, which can involve network requests to a metadata server - the cancellation token allows you to cancel that. After the client has been created, that cancellation token is irrelevant.
Now, to understand your situation a bit better:
Task.Delay
to simulate that, if necessary)All of this is just trying to play divide-and-conquer at the moment (and giving me enough information to reproduce the issue myself, which will massively reduce the back-and-forth). I don't have any concrete ideas about what's happening at the moment - but if I can reproduce the issue myself, I can easily add a load of logging etc.
I'm going to try a simple repro to start with of just starting a single client on my Windows box, using the same topic configuration that you've got, and occasionally adding messages to it, with a mixture of "needs modify ack deadline" and "ack quickly".
Okay, my simplest tests didn't show anything - which is pretty much what I expected. I'll try leaving the code running for multiple hours (if I can remember not to shut my laptop down!) before moving onto the next scenarios...
Nearly 24 hours later, the subscriber is still running fine, and the number of open streaming pull requests is stable (goes up and down a bit, as expected, but returns to the right level). I've sent large and small batches of messages, with a mixture of different simulated processing times. No joy reproducing the problem yet :(
(I'll stop this test now, as it's not getting anywhere.)
Dropping in to add a very similar case we're encountering. We've got a few .NET apps on GKE that use the Pub/Sub queues. They're fairly high-volume queues - processing ~110k messages a day, peaking at around 3-4k messages an hour. It works fine on weekdays, but on weekends the number of messages published drops down a lot and we might get 1-2 an hour.
We recently updated the PubSub NuGet package from 2.9.x
to 3.7.x
. Since updating, we've noticed that the application seems to drop the connection to the Pub/Sub queue if there are no messages published in the last ~15 minutes or so. It seems fairly similar to this issue on the NodeJS lib - https://github.com/googleapis/nodejs-pubsub/issues/1135.
So far our workaround is to just restart it, but we are looking at something more automated if we can't land on a fix in the upstream library. I hope this additional context might help with determining the cause of the issue.
@Mihier-Roy: Are you able to reproduce the issue outside GKE? Given the mention of Docker boundaries in the linked NodeJS issue, I'm wondering whether I might be able to reproduce this in a "simple" Docker container. (Rather than having to deploy to GKE.) Will give it a try. Any more information you can provide would be really useful - see my earlier comments for the sort of thing I'm interested in. (If you could let us know your topic configuration too, that'd be great. Things like message ordering and exactly-once delivery have some interesting effects in terms of how the clients behave.)
@jskeet: I haven't tried to re-produce outside of GKE just yet, but I'll take a stab at it today/tomorrow! I hope it's something we can reproduce on a Docker container.
Some additional info:
As for topic configuration:
Getting a break at last from the day-to-day to respond @jskeet - your name is familiar, I think I've read your writing or watched some videos in the past. Anyhow, here's some info you requested and some findings of our own:
We're definitely running in linux containers with .NET 6 runtime. mcr.microsoft.com/dotnet/aspnet:6.0 in fact.
Thanks for clarifying about the Build vs Run context for the cancellation token, it seems relevant in our workaround. As we poured over the code, that token was quite difficult to track down. There are many cancellation tokens in play and tokens created from others IIRC (it was many lines of code ago).
We found a workaround that seems stable. In a shared library, we've had a method that sets up the subscriber.
IMessageBusSubscriber<TEventType>
Task ProcessMessages<TEventHandler>(CancellationToken cancellationToken);
Previously:
var subscriber = await subscriberClientBuilder.BuildAsync(cancellationToken);
await subscriber.StartAsync(
async (message, cancelToken) =>
{
try
{
using var scope = _serviceScopeFactory.CreateScope();
var processor = scope.ServiceProvider.GetService<IEventPullProcessor<TEventType>>();
if (scope.ServiceProvider.GetService<TEventHandler>() is not IEventHandler<TEventType> handler)
throw new NotImplementedException();
return await processor.ProcessMessage(message, handler.Handle, cancelToken);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
throw;
}
});
Because if the process was being shut down gracefully - during deployment, for example - we'd want the subscriber to shut down gracefully as well.
We now have the following implementation in which we no longer pass the ambient cancellationToken to BuildAsync.
// DO NOT USE THE CANCELLATION TOKEN THAT IS PASSED INTO THIS METHOD.
// We need a new cancellation token because the SubscriberClient can cancel the token itself. If the
// SubscriberClient cancels, we want to restart the subscription. The caller of ProcessMessages should re-invoke this
// method when it completes.
var subscriberCancellationToken = new CancellationToken();
var subscriber = await subscriberClientBuilder.BuildAsync(subscriberCancellationToken);
await subscriber.StartAsync(
async (message, cancelToken) =>
{
try
{
using var scope = _serviceScopeFactory.CreateScope();
var processor = scope.ServiceProvider.GetService<IEventPullProcessor<TEventType>>();
if (scope.ServiceProvider.GetService<TEventHandler>() is not IEventHandler<TEventType> handler)
throw new NotImplementedException();
return await processor.ProcessMessage(message, handler.Handle, cancelToken);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
throw;
}
});
However, since StartAsync doesn't take a cancellation token and we're awaiting, there doesn't seem to be a way to tell the subscriber client to stop.
Our consumer is a BackgroundService
public class EventWorker<TEventHandler, TEventType> : BackgroundService
{
private readonly ILogger<EventWorker<TEventHandler, TEventType>> _logger;
private readonly IMessageBusSubscriber<TEventType> _messageBusSubscriber;
public EventWorker(
ILogger<EventWorker<TEventHandler, TEventType>> logger,
IMessageBusSubscriber<TEventType> messageBusSubscriber)
{
_logger = logger;
_messageBusSubscriber = messageBusSubscriber;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await _messageBusSubscriber.ProcessMessages<TEventHandler>(stoppingToken);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
}
await Task.Delay(1000, stoppingToken);
}
}
}
Background service is registered as a Singleton service
services.AddTransient<TEventHandler>();
services.AddSingleton<IHostedService>(x =>
ActivatorUtilities.CreateInstance<EventWorker<TEventHandler, TEventType>>(x,
ActivatorUtilities.CreateInstance<GcpPubSubMessageBusSubscriber<TEventType>>(x,
Options.Create(gcpPubSubSubscriberSettings)))
);
As I said, this is in a library that we use for several services to wire up subscribers so it uses generics. A more concrete implementation might be easier to debug.
Our fix works, so...I think what was happening is this:
Since we were passing the stoppingToken
to the BuildAsync...
I'm thinking something inside somewhere was switching that stoppingToken.IsCancellationRequested == true and our re-subscribe loop was opened causing it to stop altogether.
This is speculation on my part, but it led to a solution that has been working for some time now.
However... something isn't quite right with this scenario. What's causing it to stop? This single change to NOT pass that token into BuildAsync fixed the problem.
one error was logged recently which may (or not) be related, this was after our fix was in place. We don't retain logs long enough to have any from before our fix.
System.IO.IOException: The request was aborted.
---> System.IO.IOException: Unable to read data from the transport connection: Connection reset by peer.
---> System.Net.Sockets.SocketException (104): Connection reset by peer
--- End of inner exception stack trace ---
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource<System.Int32>.GetResult(Int16 token)
at System.Net.Security.SslStream.EnsureFullTlsFrameAsync[TIOAdapter](TIOAdapter adapter)
at System.Net.Security.SslStream.ReadAsyncInternal[TIOAdapter](TIOAdapter adapter, Memory`1 buffer)
at System.Net.Http.Http2Connection.ProcessIncomingFramesAsync()
--- End of inner exception stack trace ---
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandleRpcFailure(Exception e)
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandlePullMoveNext(Task initTask)
at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.StartAsync()
at Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d.MoveNext()
@philvmx1: Your new code is still somewhat confusing.
Firstly, the cancellation token here is pointless:
var subscriberCancellationToken = new CancellationToken();
var subscriber = await subscriberClientBuilder.BuildAsync(subscriberCancellationToken);
That's just equivalent to not passing in a cancellation token:
var subscriber = await subscriberClientBuilder.BuildAsync();
Secondly, this comment is pretty misleading in its current location:
// DO NOT USE THE CANCELLATION TOKEN THAT IS PASSED INTO THIS METHOD.
// We need a new cancellation token because the SubscriberClient can cancel the token itself. If the
// SubscriberClient cancels, we want to restart the subscription. The caller of ProcessMessages should re-invoke this
// method when it completes.
It looks like you're talking about the cancellation token passed into BuildAsync
, but I'm guessing you actually mean the cancellation token passed to the delegate that's passed to subscriber.StartAsync
... that's the cancellation token which SubscriberClient
can (and should!) cancel, if the subscriber is being shut down. If you really did mean the cancellation token passed into BuildAsync
, then the comment is just incorrect. That cancellation token is only used within BuildAsync, and couldn't be cancelled by SubscriberClient (as noted at the bottom of this GitHub comment).
However, since StartAsync doesn't take a cancellation token and we're awaiting, there doesn't seem to be a way to tell the subscriber client to stop.
The task returned by StartAsync
will only complete when the subscriber has been asked to stop. It's a bit like WebApplication.RunAsync
. It's not clear to me where you're calling StartAsync
, but you quite possibly want to just keep hold of the task and await it on application shutdown. You could potentially do it in some kind of loop so that if the task completes in a faulted state due to some error, you restart the subscription - but I'd expect that in most cases such a fault would be permanently fatal and your application should fail at that point anyway.
If you want a kind of StartAsync
which observes a cancellation token and stops the subscriber client when it's cancelled, I believe at the moment you'd need to write that yourself. It would be doable, and probably not too hard, but you'd need to bear in mind that stopping the subscriber client then has a separate cancellation token for a "hard stop" in case the "soft stop" request doesn't complete as quickly as you want.
Note that in your original code, when you were passing cts.Token
into BuildAsync
, nothing in the Pub/Sub code could cancel that token. It's a fundamental principle of .NET cancellation that a CancellationToken
can only be used to observe a cancellation request - if you want to request cancellation, you need a CancellationTokenSource
.
@philvmx1: Separately, regarding this:
one error was logged recently which may (or not) be related, this was after our fix was in place. We don't retain logs long enough to have any from before our fix.
That looks like it could very well be related. That certainly looks like it's a single channel failing - which would then (if I'm reading the code correctly) stop the whole SubscriberClient
. So if your fix effectively changed how you were restarting SubscriberClient
in the face of failure, then that could explain the previous behavior. (If you weren't restarting appropriately and now are.)
Of course, that just means that the next question is why you're seeing that exception. That's where it may be GKE-specific. I've tried a long-running task in plain Docker (not running on GCP) and it's working fine for me. I'd still be really interested in knowing whether you can reproduce the error at all when not running in GKE. Additionally, if you're collecting metrics about when you're restarting the SubscriberClient
, and can compare that with the problems you were having before, that would be good to know about.
@jskeet I realized that the comment is not correct now that you've cleared up that the cancellation token in BuildAsync is not passed to the built client. It doesn't quite explain why it seems to have solve our problem. There must be something else. I'll look closely if anything else changed.
What I can't quite understand is how the client doesn't reconnect on its own after the other side hangs up.Or if it can't, why it wouldn't throw, in which case we would catch, log an error (which we never did see in the logs or this would have been much clearer to us) and StartAsync again - even in the old code.
Or if it can't, why it wouldn't throw
I believe that's what it is doing, effectively, by making the task returned by StartAsync
complete in the faulted state. I would expect that's how you got the log entry you've got. (What other behavior would you expect from an async method?)
I would expect errors like this to usually indicate a fatal problem - something we wouldn't want to just retry, or at least not for long. (It's possible that there's already retry going on, although we normally don't retry for streaming calls.) If the application wants to retry the whole subscription, that's up to the app (and it sounds like you're doing that).
It's hard to say anything more until we can reproduce the failure though.
If StartAsync completed in any state, we should expect to see the Warning logged at the end of the following (our log level is >= Warning).
public async Task ProcessMessages<TEventHandler>(CancellationToken cancellationToken)
{
... snipped setup and BuildAsync code for brevity ....
await subscriber.StartAsync(
async (message, cancelToken) =>
{
try
{
... snipped for brevity ...
return await processor.ProcessMessage(message, handler.Handle, cancelToken);
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
throw;
}
});
_logger.LogWarning($"PubSub listener for subscription {_config.SubscriptionName} has disconnected");
}
So how did you capture the log you included in https://github.com/googleapis/google-cloud-dotnet/issues/11793#issuecomment-1995807014? I'd assumed that was already from awaiting StartAsync.
Note that the code you've shown above wouldn't log a warning if StartAsync
returned a task that was then faulted, because when you await the task that await expression would throw. (So you wouldn't get to the _logger.LogWarning
statement. You might have something logging ProcessMessages
failing, but you haven't shown that.)
Rather than getting snippets of code in various places, I'd really like to go back to a minimal but complete example which actually demonstrates the problem. It's not clear to me what you observed with the code in https://github.com/googleapis/google-cloud-dotnet/issues/11793#issuecomment-1967456165. Did you actually run that code and see the problem, or was it just meant to be sample code to show how you set things up? (It's really important that we end up with shared code which does definitely demonstrate the problem.)
Error should have been logged in the BackgroundWorker in https://github.com/googleapis/google-cloud-dotnet/issues/11793#issuecomment-1995807014 via _logger.LogError(e, e.Message);
. Then it should loop around after the delay, build a new PubSub client and StartAsync again. The referenced log message is more recent, after removing the cancellationToken from BuildAsync.
Understood about the working sample, I am working on creating something that models the problem with much less abstraction. I do have test environments to run them in. What I don't have is a lot of dedicated time for this since I'm running a project with a fixed deadline. So I have to squeeze in time to try things, build isolated test code, and dig for additional information.
Digging into metrics in DataDog, found that each GRPC POST last 15 minutes until error. Interestingly, the number of errors tapers off over time. As does the number of requests.
Errors:
Requests
minimal problem model code I have so far:
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
Host.CreateDefaultBuilder(args)
.ConfigureServices((b, s) => {
s.AddLogging(
x => x.AddSimpleConsole()
)
.AddHostedService<MessageHandler>();
})
.Build()
.Run();
public class MessageHandler : BackgroundService
{
const string PROJECT_NAME = "<your-project-id>";
const string SUBSCRIPTION_NAME = "<your-sub-name>";
private readonly ILogger _logger;
private readonly SubscriberClientBuilder _builder;
public MessageHandler(IServiceProvider sp)
{
_logger = sp.GetRequiredService<ILogger<MessageHandler>>();
_builder = MakeBuilder();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ExecuteAsync");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var subscriber = await _builder.BuildAsync(stoppingToken);
_logger.LogInformation("subscriber.StartAsync");
await subscriber.StartAsync(
async (message, cancelToken) =>
{
try
{
_logger.LogInformation("Message {}", message.MessageId);
if (stoppingToken.IsCancellationRequested) _logger.LogWarning("Cancelling stoppingToken");
if (cancelToken.IsCancellationRequested) _logger.LogWarning("Cancelling cancelToken");
await HandleMessage(message, cancelToken);
if (stoppingToken.IsCancellationRequested) _logger.LogWarning("Cancelling stoppingToken");
if (cancelToken.IsCancellationRequested) _logger.LogWarning("Cancelling cancelToken");
_logger.LogInformation("Acking {}", message.MessageId);
return SubscriberClient.Reply.Ack;
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
if (stoppingToken.IsCancellationRequested) _logger.LogWarning("Cancelling stoppingToken");
throw;
}
});
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
}
await Task.Delay(1000, stoppingToken);
}
}
private async Task HandleMessage(PubsubMessage message, CancellationToken cancelToken)
{
int delayMilliseconds = GetDelayFromMessage(message);
_logger.LogInformation("Delay {} ms", delayMilliseconds);
await Task.Delay(delayMilliseconds, cancelToken);
}
private static int GetDelayFromMessage(PubsubMessage message)
{
int delayMilliseconds = TimeSpan.FromMinutes(10).Milliseconds;
if (message.Attributes.TryGetValue("delay", out string delayValue)
&& int.TryParse(delayValue, out int delayMinutes)
) delayMilliseconds = TimeSpan.FromMinutes(delayMinutes).Milliseconds;
return delayMilliseconds;
}
private static SubscriberClientBuilder MakeBuilder()
{
var subscriptionName = new SubscriptionName(PROJECT_NAME, SUBSCRIPTION_NAME);
var settings = new SubscriberClient.Settings
{
FlowControlSettings = new FlowControlSettings(
maxOutstandingElementCount: 10,
maxOutstandingByteCount: null)
};
var subscriberClientBuilder = new SubscriberClientBuilder
{
Settings = settings,
SubscriptionName = subscriptionName
};
return subscriberClientBuilder;
}
}
csproj
<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Google.Cloud.PubSub.V1" Version="3.9.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
</Project>
Correction, our change did not solve the issue for us. With enough time, we are still seeing the downward stair step from Number of open streaming pulls.
Thanks - I'll get back to trying to repro this tomorrow morning.
Thank you! I've got one running on my machine now. 10 open streaming pulls. Had a small error in my sample code, needed TotalMilliseconds rather than Milliseconds.
If I can't repro either, will do some other things next week.
Just looking at the error handling code, we do stop and restart the pull request on errors we deem to be recoverable - on others we abort the whole subscriber (and return a faulted task). I tested this by deleting the subscription that was being used, and it faulted as expected. The error handling code is here.
I note that with your current worker service, you never end up stopping the subscriber. I added this to your code (within the while
loop):
// If we're asked to stop, call StopAsync and give a deadline of 10 seconds.
using var registration = stoppingToken.Register(() => subscriber.StopAsync(new CancellationTokenSource(10000).Token));
That shouldn't be causing the failures you're seeing, but I thought it worth mentioning.
After exploratory testing overnight and this morning, here are some interesting things I'm seeing:
The same message with 10 minute processing time being handled multiple times. This is the tail of the logs.
MessageHandler: Information: subscriber.StartAsync
info: MessageHandler[0]
subscriber.StartAsync
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Net.Sockets.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Net.NameResolution.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Security.Cryptography.Primitives.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Collections.NonGeneric.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Security.Cryptography.Encoding.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Formats.Asn1.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Text.Encoding.Extensions.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Threading.Channels.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Runtime.CompilerServices.Unsafe.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Runtime.Intrinsics.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
info: MessageHandler[0]
Message 9420066951064735
MessageHandler: Information: Message 9420066951064735
info: MessageHandler[0]
Delay 600000 ms
MessageHandler: Information: Delay 600000 ms
MessageHandler: Information: Acking 9420066951064735
info: MessageHandler[0]
Acking 9420066951064735
info: MessageHandler[0]
Message 9420066951064735
MessageHandler: Information: Message 9420066951064735
info: MessageHandler[0]
Delay 600000 ms
MessageHandler: Information: Delay 600000 ms
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Diagnostics.StackTrace.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Reflection.Metadata.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
Loaded '/usr/local/share/dotnet/shared/Microsoft.NETCore.App/6.0.11/System.Collections.Immutable.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled.
info: MessageHandler[0]
Acking 9420066951064735
MessageHandler: Information: Acking 9420066951064735
info: MessageHandler[0]
Message 9420066951064735
MessageHandler: Information: Message 9420066951064735
info: MessageHandler[0]
Delay 600000 ms
MessageHandler: Information: Delay 600000 ms
info: MessageHandler[0]
Acking 9420066951064735
MessageHandler: Information: Acking 9420066951064735
MessageHandler: Information: Message 9420066951064735
info: MessageHandler[0]
Message 9420066951064735
MessageHandler: Information: Delay 600000 ms
info: MessageHandler[0]
Delay 600000 ms
NOTE: I had to restart this morning because the client token expired overnight.
In Google Console, comparing ModifyAckDeadline to ExpiredAckDeadlines...
And the unack'd messages.
At this point, it seems like our issue is more closely related to timing or result of the ModAckDeadline requests, WDYT? To me it seems like ModAckDeadline is not happening, the ACK deadline is timing out, and the message(s) are being pulled over and over again.
Hmm... it does sound like that's feasible.
Can I check whether these were results you got from running your code locally or in GKE? (If you've managed to reproduce it outside GKE, that would be a major step forward.)
We can get more useful log information too:
Enable timestamps (and single line output)
.ConfigureServices((b, s) =>
{
s.AddLogging(x => x.AddSimpleConsole(opt =>
{
opt.TimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.FFF'Z' ";
opt.SingleLine = true;
}))
.AddHostedService<MessageHandler>();
})
Enable client-level logging
Change the constructor to:
public MessageHandler(IServiceProvider sp)
{
_logger = sp.GetRequiredService<ILogger<MessageHandler>>();
var clientLogger = sp.GetRequiredService<ILogger<SubscriberClient>>();
_builder = MakeBuilder(clientLogger);
}
Change the MakeBuilder method to:
private static SubscriberClientBuilder MakeBuilder(ILogger clientLogger)
{
var subscriptionName = new SubscriptionName(PROJECT_NAME, SUBSCRIPTION_NAME);
var settings = new SubscriberClient.Settings
{
FlowControlSettings = new FlowControlSettings(
maxOutstandingElementCount: 10,
maxOutstandingByteCount: null),
};
var subscriberClientBuilder = new SubscriberClientBuilder
{
Settings = settings,
SubscriptionName = subscriptionName,
Logger = clientLogger
};
return subscriberClientBuilder;
}
... then add "Google.Cloud.PubSub.V1.SubscriberClient": "Trace"
into appsettings.json.
That way we'll be able to see exactly when ModifyAckDeadline is being called, etc.
This is locally not in GKE.
A little ahead of you here, I've already added timestamps and additional info and restarted after restarting the service a couple times, it's acking some that have been unacked for many hours. Will add the tracing though, that's going to help a lot.
After restarting debug, it ended up acking a couple more that were stuck for a looong time. These were several messages with the same OrderingKey sent around the same time.
2024-03-15 15:24:55Zinfo: MessageHandler[0] Message 9420066951068600 OrderingKey kyc PublishTime "2024-03-15T02:42:09.539Z"
2024-03-15 15:24:55Zinfo: MessageHandler[0] Delay 600000 ms
2024-03-15 15:25:25Zinfo: MessageHandler[0] Acking 9421075702395937
Maybe you'd want to model the expired token in your own debugging session - when I reauthenticated using cgloud command, it did not pick up the new token. How often to tokens expire in GKE-land? Do they autorefresh every 24 hours or so? I saw error logs when this happened that we aren't seeing in prod, so it's probably a separate issue if anything.
It finally ack'd all messages but took restarting the program. This is essentially what we do in production too. I would say it kind of models the problem fairly well. How to actually make it fail on purpose is still a bit of a mystery though.
Adding Trace now and will give it another go with various OrderingKey and delay attributes. Might even disconnect my machine from the internet and reconnect to see what that does.
Tangentially:
Maybe you'd want to model the expired token in your own debugging session - when I reauthenticated using cgloud command, it did not pick up the new token. How often to tokens expire in GKE-land? Do they autorefresh every 24 hours or so? I saw error logs when this happened that we aren't seeing in prod, so it's probably a separate issue if anything.
Can you show exactly which credentials you are using in GKE and which credentials you are using locally. In general you shouldn't need to worry about access tokens expiring in neithre of these environments as we have credential types that will automatically refresh the token for you etc.
@amanda-tarafa it's gcloud auth application-default login locally. They probably should expire in that case.
@amanda-tarafa it's gcloud auth application-default login locally. They probably should expire in that case.
In GKE as well? Can you show how you are initializing the clients in both environments, basically what, if any, credentials are you passing to the SubscriberClientBuilder
?
@amanda-tarafa We're using service credentials in GKE. 99% sure this was a "locally debugging using dev credentials" issue only and not an issue in GKE. As I mentioned, we didn't see any log entries in prod.
@amanda-tarafa We're using service credentials in GKE. 99% sure this was a "locally debugging using dev credentials" issue only and not an issue in GKE. As I mentioned, we didn't see any log entries in prod.
Ok, my point was then that you don't need to worry about tokens expiring in GKE, those will be refreshed automatically for you is you are using explicit service account credentials or default account credentials.
I'm also surprised that you have to re-authenticate locally when using gcloud auth application-default login
(if you are using default application credentials to initialize the subscriber) as the credential generated by the gcloud command should include a refresh token that we use to refresh the access token. Again, if you can share how you are initiating the SubscriberClient I can then ask follow up questions to see what's happening there. And any info on why and when you had to re-authenticate locally? Why did you have to do this? Where you actually seeing authentication errors?
@amanda-tarafa I'm creating the client using the code shared in an above comment. look for MakeBuilder()
and _builder.BuildAsync(
. I don't think I have bandwidth to split between two topics with the time I have available. In any case, it seems unlikely that it's related to the issues we saw in our prod system at this point. I saw logs that said the token was expired locally but never in prod so I don't think that was the issue.
@philvmx1 I also think the token expiring is not related to your issues, I simply wante to address your previous comment:
Maybe you'd want to model the expired token in your own debugging session - when I reauthenticated using cgloud command, it did not pick up the new token. How often to tokens expire in GKE-land? Do they autorefresh every 24 hours or so? I saw error logs when this happened that we aren't seeing in prod, so it's probably a separate issue if anything.
If you are ever ready to look into why you need to re-authenticate locally, feel free to create a separate issue.
I was able to repro this issue from local process to GCP subscription.
I kept the subscriber program running over the weekend.
It logged the following message over and over again, so it looks like it's connecting.
2024-03-18 15:57:17Ztrce: Google.Cloud.PubSub.V1.SubscriberClient[0] Starting duplex streaming API call to StreamingPull.
2024-03-18 15:57:21Ztrce: Google.Cloud.PubSub.V1.SubscriberClient[0] Starting duplex streaming API call to StreamingPull.
2024-03-18 15:57:47Ztrce: Google.Cloud.PubSub.V1.SubscriberClient[0] Starting duplex streaming API call to StreamingPull.
I published a message and waited, but it did not get picked up for processing by the streaming pull. Then I published another, same thing. They're sitting unacked. No open streaming pull requests since March 16.
@philvmx1: Really glad we've been able to reproduce locally.
Do you still have the logs, and are you able to show more of the lines (e.g. about 30 of them)? I'd like to get a sense of the timing. (I'm also setting ClientCount=1 in the builder in my current tests, to try to make the logging simpler to understand.)
Also, just to go back to auth briefly - what auth were you using this time? I've got a sneaking suspicion about this, but I'm going to run some tests locally to confirm.
One thing I do want to do is add more diagnostics to the library - now that we've got ILogger
integration, we should use it a lot more. I'm looking into that now...
I've made some potential progress, but I've got an odd situation in terms of credentials now which I don't think is due to the Pub/Sub library itself (I'm seeing invalid JWT grants). However, I have seen a situation where a subscriber appears to start but never makes progress. It's a bit of a mess at the moment, but I'm hopeful that by the end of the day we'll have more progress.
Okay, I know what was odd about the credentials: I had a service account key that had expired. I can now reproduce four different situations:
UseJwtAccessWithScopes
= false is set to false: StreamingPull
fails with an RPC status of Internal, which is deemed retriable, and we just retry, foreverMy next step will be to try the same code with Compute credentials (which admittedly might be slightly different in GKE than other environments, but it'll be good to at least see).
I'll definitely be working on adding logging into SubscriberClient and PublisherClient. (Admittedly the log levels to use end up being slightly tricky to decide - if the library is going to "silently" retry, should we actually be logging an error, or just debug?) Additionally, we need to consider a retry policy that doesn't keep going forever.
Now, what I don't know yet is whether this is the situation you're facing. Please could you let me know:
I've just tried with a Compute credential which is bogus in some way (it's not entirely clear how, but that's a different matter) - and again, we ended up with an Internal status code which was retried forever. So this could indeed be what you're seeing in GKE.
So token stops refreshing for whatever reason, pull thinks it's working but it's actually failing silently forever. May or may not be the same issue in prod, but who knows. I'll see if there was any change to our credentials between the last time it failed there and when it started seemingly working again...although since the # of open pulls keeps dropping every day nearly at the 24 hour mark (token lifetime is how long again? ;) ), I'm thinking it's fragile at best.
I don't fully understand the question "What credentials..." but I'll give answering a try anyway. Locally, I'm logging in with my own credentials using gcloud auth application-default login
NOTE: I also do gcloud auth login
daily.
here we go... IAM metrics for the SA.
fetch consumed_api | metric 'serviceruntime.googleapis.com/api/request_count' | filter (resource.credential_id == 'serviceaccount:redacted' && resource.method == 'google.pubsub.v1.Subscriber.StreamingPull' && resource.service == 'pubsub.googleapis.com') | align rate() | group_by [resource.method, metric.response_code_class, metric.grpc_status_code], [value_request_count_aggregate: aggregate(value.request_count)]
So in prod, are you doing anything to authenticate, or just using the default credentials for the cluster? (I suspect the latter - basically unless you've got a service account JSON file somewhere.) Will talk with Amanda about this. I think we may well be onto something. (It could be a red herring in the end of course.)
In terms of "what kind of credentials" there are three different types I can see at play here:
gcloud auth application-default login
GOOGLE_APPLICATION_CREDENTIALS
environment variableThen there's the matter of whether token exchange is involved or self-signed JWTs, but that's a second order.
To clarify this:
pull thinks it's working but it's actually failing silently forever.
I believe it actually knows it's failing, but just retries with exponential backoff (30s max) - but it retries forever rather than eventually failing, which is probably a mistake.
I've merged some logging - I'll chat with Amanda, but I suspect we'll be able to release a beta (or patch) tomorrow which will clarify things.
If you're able to test that version both locally and in GKE and collect any logs (you'll need to specify a Logger
in the builder, of course) that would be really useful. It's far from impossible that it's an auth token refresh issue locally, but something entirely different in GKE - but the logs should tell us.
Try this log file. I started the debug session from zero. I remember something when I was first starting to debug with this test program: I started it, but it did not pull messages. I consulted a colleague. He reminded me to auth. Now I see exactly why it was failing - the lib keeps retrying auth without throwing. To me, it looks like it was running but inside it was silently failing over and over again.
In GKE, we create a K8s service account for each service. Then bind that to an IAM role. I'll see what I can do to get something running with Trace in one of our test envs and post some logs from that as well.
All done via Terraform.
resource "google_service_account_iam_binding"
...
role = "roles/iam.workloadIdentityUser"
Firstly - thanks for jumping on this, that's really useful. So yes, as we somewhat-expected, it's an auth error being surfaced as "internal" (I suspect due to the way gRPC wraps authentication). Internal errors are deemed retriable, so we try again.
It'll be really interesting to see if we get the same sort of pattern on GKE - it's unlikely to be the same auth issue, but if it's an auth issue that surfaces as an internal error, that would explain a lot. (And if something else is going wrong, hopefully the logs will show that instead.)
@Mihier-Roy: You may want to upgrade to 3.10.1 and specify a logger as well, to see if that helps to pin down the issue for you.
@jskeet I'm working on getting something running in a dev env to model the problem. We don't see a decline in open streaming pulls in our dev/test envs today - only in prod where the worker is busy around the same every day slogging through files that it pulls down from a bucket.
First pass at modeling this will be a Task.Delay that's controlled by a message attribute. No CPU, Memory pressure, Get object from bucket calls, or API calls between microservices in the GKE cluster, additional logging, etc that we would normally have.
I'll be out until next week so I won't have findings to post until then. Will ask a colleague to check up and post if anything interesting comes up in the meantime.
@philvmx1: Any updates with more logging?
This is a consistent problem for us and many others. My theory is that the server side hangs up in the middle of a keepalive and RpcException has IsRpcCancellation on the line linked below which shuts down the subscription.
Since we expect StartNew to continue to run and RPC connection issues to be resolved, this causes our workers to stop silently leaving us dead in the water.
https://github.com/googleapis/google-cloud-dotnet/blob/2e2659b8b970ba3e0cf2937f91016f1835fb4c0c/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs#L232