Closed jwoots closed 1 month ago
Have you tried 8.2.0-b2 ?
If it works for you, I'll publish 8.2.0 right away!
(sorry for being so slow to get back to you! I've been bogged down with work the last couple of weeks, and somehow I did not notice this issue until now 😅 )
After your response, i tried the package and there was a problem about my code. I work with transaction scope for save my datas and publish event (in DDD style). I use rebus as an implementation on my event publisher abstraction. I'm not in rebus handler and it's difficult for me to use UseOutbox
.
I tried a simple solution based on Outgoing step :
public Task Process(OutgoingStepContext context, Func<Task> next)
{
var transactionContext = context.Load<ITransactionContext>();
//in Rebus handler context, outbox is initialized by Incomming step
//not in Rebus handler context and a rebus outbox transaction is explicitly (UseOutbox) created and outbox is initialized
if (transactionContext.Items.ContainsKey(OutboxExtensions.CurrentOutboxConnectionKey))
{
//all is done
return next();
}
//if an ambient transaction exists, create an oubox connection without transaction to connection enroll in current transaction
if(Transaction.Current != null)
{
var outboxConnection = _outboxConnectionProvider.GetDbConnectionWithoutTransaction();
transactionContext.Items[OutboxExtensions.CurrentOutboxConnectionKey] = outboxConnection;
transactionContext.OnDisposed(_ =>
{
outboxConnection.Connection.Dispose();
});
}
return next();
}
What do you think ?
Does it work?
Yes, i add two unit test and it work :
[TestCase(true, true)]
[TestCase(false, false)]
public async Task CanUseOutboxOutsideOfRebusHandler_AmbiantTransaction_Send(bool commitTransaction, bool expectMessageToBeReceived)
{
var settings = new FlakySenderTransportDecoratorSettings();
using var messageWasReceived = new ManualResetEvent(initialState: false);
using var server = CreateConsumer("server", a => a.Handle<SomeMessage>(async _ => messageWasReceived.Set()));
using var client = CreateOneWayClient(r => r.TypeBased().Map<SomeMessage>("server"), settings);
// set success rate pretty low, so we're sure that it's currently not possible to use the
// real transport - this is a job for the outbox!
settings.SuccessRate = 0;
// pretending we're in a web app - we have these two bad boys at work:
using (var tx = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await client.Send(new SomeMessage());
if (commitTransaction)
{
// this is what we were all waiting for!
tx.Complete();
}
}
// we would not have gotten this far without the outbox - now let's pretend that the transport has recovered
settings.SuccessRate = 1;
// wait for server to receive the event
Assert.That(messageWasReceived.WaitOne(TimeSpan.FromSeconds(15)), Is.EqualTo(expectMessageToBeReceived),
$"When commitTransaction={commitTransaction} we {(expectMessageToBeReceived ? "expected the message to be sent and thus received" : "did NOT expect the message to be sent and therefore also not received")}");
}
Great! So you would say it's OK if I go and publish 8.2.0 now?
Hum, i think pull request is needed. I can not write OutboxOutgoingStep outside of the package because some class are internal. And others questions :
I continue with some tests.
The cleanup seems not be implemented
Ok, weird! Don't remember who ported the code over from Rebus.SqlServer, but somehow it must have been missed during the port.
Why use a "sent" column with a purge task ? Why not delete directly from table using "returning" postgresql keyword?
Actually don't know what the difference would be. E.g. would one of them be faster?
Why not use existing (and configurable) backoff strategy ?
Where?
Actually don't know what the difference would be. E.g. would one of them be faster?
Is not necessarily about performance but complexity. Send a select, then a update and finaly a delete (with a job) vs only one request. Example :
DELETE FROM {_options.TableName}
WHERE id =
(
select id
from {_options.TableName}
order by id asc
for update skip locked
limit 1
)
returning id, payload
Or maybe it's optimized for vacuum consideration ? I don't know... But my question is just to know if there is a particular reason of this implementation
Where?
The SendRetrier
of OutboxForwarder
class.
Why not use existing (and configurable) backoff strategy ?
Sorry, i did not well understand the retrier. You can forget my question.
Can i push my branch with transaction scope support and create a pull request ?
Can i push my branch with transaction scope support and create a pull request ?
Definitely! 🙂
Thanks for the merge.
Now i try to port the cleanup from Rebus.SqlServer but it seems not implemented too... So, rather than implement the cleanup, i propose you to implement the storage with a DELETE as described above. Also, this technique generate less dead tuples for postgresql.
I propose you a pull request, if you are not ok with this, i will implement cleanup as expected.
Hi, when do you plan to release to stable version Rebus.Postgres 8.2.0?
Thanks in advance.