mgravell / Pipelines.Sockets.Unofficial

.NET managed sockets wrapper using the new "Pipelines" API
Other
417 stars 54 forks source link

Pump a pair of PipeReader/PipeWriter #62

Open Giorgi opened 3 years ago

Giorgi commented 3 years ago

I'm working on adding TLS support to RSocket.NET library. Currently, the library runs two tasks for reading from socket and writing to a PipeWriter and reading from PipeReader and writing to socket:

while (true)
{
    var result = await Back.Input.ReadAsync();
    var buffer = result.Buffer;
    var consumed = buffer.Start;        //RSOCKET Framing

    try
    {
        if (result.IsCanceled) { break; }
        if (!buffer.IsEmpty)
        {
            try
            {
                consumed = await socket.SendAsync(buffer, buffer.Start, SocketFlags.None);     //RSOCKET Framing
            }
            catch (Exception)
            {
                if (!Aborted) { /*Log.ErrorWritingFrame(_logger, ex);*/ }
                break;
            }
        }
        else if (result.IsCompleted) { break; }
    }
    finally
    {
        Back.Input.AdvanceTo(consumed, buffer.End);     //RSOCKET Framing
    }
}   
while (!token.IsCancellationRequested)
{
    var memory = Back.Output.GetMemory(out var memoryframe, haslength: true);    //RSOCKET Framing
    var isArray = MemoryMarshal.TryGetArray<byte>(memory, out var arraySegment); Debug.Assert(isArray);
    var received = await socket.ReceiveAsync(arraySegment, SocketFlags.None);   //TODO Cancellation?

    Back.Output.Advance(received);
    var flushResult = await Back.Output.FlushAsync();
    if (flushResult.IsCanceled || flushResult.IsCompleted) { break; }
}

I added support for secure connections by wrapping the socket in SslStream and using StreamConnection.GetDuplex from this project as it was advised in one of the issues.

Is it possible to go further and get rid of the "manual pumping" between the PipeReader/PipeWriter?

The blogpost at Pipe Dreams, part 2 mentions that PipeScheduler can be used to "start these two pumps" but doesn't have any concrete example. Does this project provide any such mechanism?