Closed mhartwig22 closed 6 months ago
I honestly don't know, I haven't seen that behavior before. There's a little bit of a delay on the inbox to use completed messages in idempotency checks, but the outgoing messages should be deleted as soon as they are sent out.
Are you seeing any error messages? Are you doing anything like running under a debugger and shutting down the app through the debugger where it's not shutting down gracefully by chance?
@mhartwig22 The pic up there of the inbox table is normal operation, see how the status says "Handled"? So that's fine. The outgoing behavior is a problem though. So back to my question, how are you shutting down your app? You might try running in the "Solo" mode in development:
https://wolverine.netlify.app/guide/durability/leadership-and-troubleshooting.html#solo-mode
Hi Jeremy,
Thanks for getting back on this.
So back to my question, how are you shutting down your app?
While running these tests under the debugger, I'm always trying to make sure I'm shutting down the application gracefully.
I just rerun a test with WolvFx DB recreated, brand new kafka topic - basically everything reset. Starting the app, emitting messages. I see the messages delivered to Kafka fine (and consumed back by the app as well) and they are also in the outgoing table in SQL. Nothing in the logs to notice. All messages seem to remain in the outgoing table. I waited a few minutes and shut down the app gracefully, this is what appeared in the logs:
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: Wolverine.SqlServer.Persistence.SqlServerMessageStore[0]
Reassigned 10000 incoming messages from 1 and endpoint at kafka://topic/events to any node in the durable inbox
info: Wolverine.Transports.ListeningAgent[0]
Stopped message listener at kafka://topic/events
info: Wolverine.Transports.ListeningAgent[0]
Stopped message listener at dbcontrol://f4f4c149-deb7-445a-817d-7900502f16f3/
...\net8.0\EventDrivenArchExample.exe (process 34932) exited with code 0.
Upon restarting the app, the messages get re-delivered to both, kafka and the application. There are several log entries like:
info: Wolverine.RDBMS.DurabilityAgent[0]
Found recoverable outgoing messages in the outbox for kafka://topic/events
info: Wolverine.RDBMS.DurabilityAgent[0]
Recovered 100 messages from outbox for destination kafka://topic/events while discarding 0 expired messages
This leads to all messages being replayed and when finished, the messages again remain in the outgoing table. Waiting for a few minutes to then shutdown the app, ends up with this log output
Application is shutting down...
info: Wolverine.SqlServer.Persistence.SqlServerMessageStore[0]
Reassigned 3700 incoming messages from 2 and endpoint at kafka://topic/events to any node in the durable inbox
info: Wolverine.Transports.ListeningAgent[0]
Stopped message listener at kafka://topic/events
info: Wolverine.Transports.ListeningAgent[0]
Stopped message listener at dbcontrol://30042aa7-9d5c-4270-b2da-b39d0ca3ffc4/
Now its about incoming messages.
Starting the app yet again results in all outgoing re-processed yet again:
Wolverine.Runtime.Agents.NodeAgentController: Information: Successfully started agent wolverinedb://default/
Wolverine.Runtime.WolverineRuntime: Information: Successfully started agent wolverinedb://default/ on node 0be879b5-631a-4772-893a-dec386085e5a
Wolverine.Runtime.WolverineRuntime: Information: Releasing node ownership in the inbox/outbox from dormant node 2
...
...
...
Wolverine.RDBMS.DurabilityAgent: Information: Found recoverable outgoing messages in the outbox for kafka://topic/events
Wolverine.RDBMS.DurabilityAgent: Information: Recovered 100 messages from outbox for destination kafka://topic/events while discarding 0 expired messages
There are no errors to be seen in the log anywhere. Like I said, it works all perfectly fine - apart from leaving the messages behind and replaying them.
I dont know if there is an issue with the housekeeping code somewhere not being executed or something. It's hard to tell as I dont know the internals of WolverineFx. We're so close to use WolverineFx in production and I'm keen as I love the library. If there is anything I can do to assist with testing, getting to the bottom of this, let me know Jeremy.
I have the same exact situation where the outbox does not get cleared, and the messages in the outbox keep getting pushed to the queue.
The configuration I am trying is very similar to OP's.
Program.cs:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddResourceSetupOnStartup();
var connectionString = builder.Configuration.GetConnectionString("SqlServer")!;
builder.Host.UseWolverine((context, opts) =>
{
opts.PersistMessagesWithSqlServer(connectionString);
opts.UseEntityFrameworkCoreTransactions();
opts.ConfigureKafka("localhost:19092")
.ConfigureClient(config =>
{
config.BootstrapServers = "localhost:19092";
config.BrokerAddressFamily = BrokerAddressFamily.V4;
});
opts.PublishMessage<OrderCreated>().ToKafkaTopic(nameof(OrderCreated))
.TelemetryEnabled(true);
opts.ListenToKafkaTopic("OrderInventoryAllocated")
.UseDurableInbox();
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
opts.Policies.AutoApplyTransactions();
if (context.HostingEnvironment.IsDevelopment())
{
opts.Durability.Mode = DurabilityMode.Solo;
}
});
builder.Host.ApplyOaktonExtensions();
builder.Services.AddDbContextWithWolverineIntegration<OrderDbContext>(x => x.UseSqlServer(connectionString));
var app = builder.Build();
await app.Services.GetRequiredService<OrderDbContext>().Database.EnsureCreatedAsync();
app.MapWolverineEndpoints();
app.UseSwagger();
app.UseSwaggerUI();
await app.RunOaktonCommands(args);
Handler:
public class CreateOrderRequestHandler
{
[WolverinePost("/order")]
[Transactional]
public async Task<OrderCreated> Handle(CreateOrder command,OrderDbContext dbContext, IMessageBus messageBus)
{
var items = command.Items
.Select(item => new OrderLineItem(item.ProductId, item.Amount))
.ToList();
var order = new Order(DateTimeOffset.UtcNow, items);
dbContext.Add(order);
var orderCreated = new OrderCreated(order.Id);
await messageBus.PublishAsync(orderCreated);
return orderCreated;
}
}
Sending one request to my handler results in an infinite processing of the outboxmessage:
info: Wolverine.RDBMS.DurabilityAgent[0]
Found recoverable outgoing messages in the outbox for kafka://topic/OrderCreated
info: Wolverine.RDBMS.DurabilityAgent[0]
Recovered 1 messages from outbox for destination kafka://topic/OrderCreated while discarding 0 expired messages
info: Wolverine.Runtime.WolverineRuntime[0]
Releasing node ownership in the inbox/outbox from dormant node 1
info: Wolverine.RDBMS.DurabilityAgent[0]
Found recoverable outgoing messages in the outbox for kafka://topic/OrderCreated
info: Wolverine.RDBMS.DurabilityAgent[0]
Recovered 1 messages from outbox for destination kafka://topic/OrderCreated while discarding 0 expired messages
Environment:
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.1" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0"/>
<PackageReference Include="WolverineFx" Version="2.3.0"/>
<PackageReference Include="WolverineFx.EntityFrameworkCore" Version="2.3.0"/>
<PackageReference Include="WolverineFx.Http" Version="2.3.0"/>
<PackageReference Include="WolverineFx.SqlServer" Version="2.3.0"/>
<PackageReference Include="WolverineFx.Kafka" Version="2.3.0"/>
I run into the same problem with the outbox and spent some time troubleshooting. The reason for the problem is the KafkaSenderProtocol
class. The SendBatchAsync
method takes an ISenderCallback callback
, but it doesn't notify the caller about the successful send.
If the service is restarted, Wolverine begins to resend all stuck messages.
Unfortunately, Wolverine doesn't provide any options to replace KafkaSenderProtocol
with a custom implementation (via DI, for example). So, I tried a dirty hack to replace the method via Harmony.
[HarmonyPatch(typeof(KafkaSenderProtocol), nameof(KafkaSenderProtocol.SendBatchAsync))]
#pragma warning disable
public class Patch
{
public static void Postfix(ref Task __result, ISenderCallback callback, OutgoingMessageBatch batch)
{
__result.ContinueWith(x => callback.MarkSuccessfulAsync(batch));
}
}
public static class Patcher
{
[ModuleInitializer]
public static void Patch()
{
var harmony = new Harmony("wolverine-outbox-fix");
var assembly = Assembly.GetExecutingAssembly();
harmony.PatchAll(assembly);
}
}
I'm going to create a merge request with the fix.
Pfft, @ilya-edrets got it. I'm pushing that change in Wolverine 2.6. Running the tests right now. Feeling more than a little bit embarrassed over this one...
Pfft, @ilya-edrets got it. I'm pushing that change in Wolverine 2.6. Running the tests right now. Feeling more than a little bit embarrassed over this one...
Can we get this fix in the 1.20.x version too plz ? I have to use .Net 6 on my project, and unfortunately I can't use the proposed Harmony fix as it makes the application generate InvalidProgramExceptions when published in production.
Hi,
Describe the bug I'm currently testing wolverine 1.13.2 in conjunction with Kafka and SQL Server/EFC integration. The message transport is working great and as expected so far, messages arrive in the corresponding Kafka topic and also get consumed from the topic correctly. When I turn on the durable in- and outbox, it would seem that messages which have been published successfully, get persisted in the wolverine_outgoing_envelopes table in Sql server and never deleted.
To Reproduce Wolverine startup configuration:
Message publishing code (outside of a handler, MVC model)
This is whats stored in SQL server _wolverine_outgoingenvelopes table
Equally, when receiving messages with DurableInbox enabled (and handling them successfully), they seem to remain in the _wolverine_incomingenvelopes table:
When I then stop the application gracefully, I get the below log entry:
on app startup, these messages are all picked up again, reassigned and replayed (as duplicates). The replay (re-assignment) doesn't happen every time though and is harder to reproduce but my main question is why these messages remain persisted in the first place (which they always seem to do).
Expected behavior Messages persisted in in- and outbox, but deleted once sent successfully to subscriber (or successfully received and handled). Is there a time limit of when these messages get deleted? I waited 12 hours now and the messages are still there and I'm concerned about replays and database filling up in this scenario.
Environment Local Development under VStudio (Win 10). Local SQL Server 15.0.2104. Single node/app.