Open thenameless314159 opened 1 year ago
I'm wondering about the ConcurrentPipeWriter.cs from Kestrel sources, wouldn't it be "better" to rely on it instead of a SemaphoreSlim ? And in which ways ?
I am actually not sure whether the ProtocolWriter should noop on write complete/cancel, because it may allow the execution of logic that shouldn't have been executed because of a failed send or something, what's your thought @davidfowl ?
I think it should throw
Alright done @davidfowl ! This change allowed me to greatly improve the clarity of the fast sync path, it's a lot cleaner now but it still require some review, here is how the fast path looks like now (with .NET 7) :
public ValueTask WriteAsync<TMessage>(IMessageWriter<TMessage> writer, in TMessage message, CancellationToken cancellationToken = default)
{
// This will always finish synchronously so we do not need to bother with cancel
if (!TryWaitForSingleWriter(cancellationToken: CancellationToken.None))
return WriteAsyncSlow(writer, message, cancellationToken);
bool release = true, hasWritten = false;
try
{
if (_disposed) throw new ObjectDisposedException(nameof(ProtocolWriter));
var task = WriteCore(writer, in message, cancellationToken);
if (task.IsCompletedSuccessfully)
{
// If it's a IValueTaskSource backed ValueTask,
// inform it its result has been read so it can reset
var result = task.GetAwaiter().GetResult();
if (result.IsCanceled)
throw new OperationCanceledException(cancellationToken);
hasWritten = !result.IsCompleted;
return hasWritten
? ValueTask.CompletedTask
// REVIEW : could DisposeAsyncCore(false) here if we add a !_dispose check between disposing && !TryWaitForSingleWriter()
// but it'd require to implement a ThrowAfter extension method for ValueTask
: throw new ObjectDisposedException(nameof(ProtocolWriter));
}
else
{
release = false; // do not release if we need to go async to complete the write
return new ValueTask(CompleteWriteAsync(task, messagesWritten: 1));
}
}
finally { if (release) ReleaseSingleWriter(hasWritten ? 1 : 0); }
}
And this is the extension method that'd be required in order to DisposeAsyncCore
instead of throwing and ObjectDisposedException
:
public static ValueTask ThrowAfter(this in ValueTask valueTask, Exception exception)
{
if (valueTask.IsCompletedSuccessfully)
{
// Signal consumption to the IValueTaskSource
valueTask.GetAwaiter().GetResult();
throw exception;
}
else return AwaitAndThrow(valueTask.AsTask(), exception);
static async ValueTask AwaitAndThrow(Task task, Exception exception)
{
await task.ConfigureAwait(false);
throw exception;
}
}