Azure / amqpnetlite

AMQP 1.0 .NET Library
Apache License 2.0
401 stars 143 forks source link

Question on flow control #522

Closed paveldayneko closed 2 years ago

paveldayneko commented 2 years ago

Hi, I'd like to implement flow control on my server which is acting as a SourceLinkEndpoint. I see that SourceLinkEndpoint is benefiting form internal methods and properties allowing to get current credid from ListenerLink, but ListenerLink has no public properties for exposing current credit, even though it calculates is quiet carefully. My qyestion is, is it done on purpose and I should track current credit on my own or this info can be exposed from ListenerLink? Thanks

xinchen10 commented 2 years ago

It could be exposed but unless your code is executing inside the link's lock (which is internal), there is no guarantee that the value does not change after you read it. It might be fine for some use cases but problematic for others. What kind of flow control do you want to implement using the link's credit?

paveldayneko commented 2 years ago

@xinchen10 my use case is pretty simple so far. I'm implementing LinkEndpoint acting as a pure message source and want to count receiver's credits, once receiver's credit is 0 I want to stop sending messages. I've done counting logic myself, but just wandering whether it's possible to use existing one.

public sealed override void OnFlow(FlowContext flowContext)
{
    Interlocked.Add(ref _credit, flowContext.Messages);

    lock (_lock)
    {
        if (_producing || Interlocked.CompareExchange(ref _credit, 0, 0) == 0)
        {
            return;
        }

        _producing = true;
    }

    Task.Run(SendMessagesAsync, _processingCts.Token);
}

private async Task SendMessagesAsync()
{
    while (_link.LinkState < LinkState.DetachPipe)
    {
        try
        {
            Interlocked.Decrement(ref _credit);
            var message = await OnNextMessageAsync(_processingCts.Token);

            _link.SendMessage(message);

            lock (_lock)
            {
                if (Interlocked.CompareExchange(ref _credit, 0, 0) == 0)
                {
                    _producing = false;
                    return;
                }
            }
        }
        catch
        {
            Interlocked.Increment(ref _credit);
        }
    }
}

This is pretty similar to waht SourceLinkEndpoint is doing, but it benefits from internal access level of credit counted by LinkListener, like here

xinchen10 commented 2 years ago

We can certainly expose the credit property of ListenerLink, but as I mentioned without access to the internal synchronization object it could cause unexpected race conditions. Also is that the only thing you need for your implementation? Your LinkEndpoint is almost identical to SourceLinkEndpoint. Can you use that instead (or derive your class from it)? What else you need that cannot be done by SourceLinkEndpoint?

paveldayneko commented 1 year ago

Unfrotunatelly I still can't understand why internal lock is a problem, as long as linkEnpoint field in ListenerLink is initialized correctly then everything should be ok. I'd like to have my own analogy of SourceLinkEndpoint not only with flow control but with some additional features like auth, global error handling and maybe without injecting and implementing IMessageSource, this way I will get way more flexible solution and will benefit from already calculated credits. Should I maybe create PR for that and we can continue discussion there?