aerospike / aerospike-client-csharp

Aerospike C# Client Library
70 stars 48 forks source link

Potential command loss in AsyncCommandDelayingQueue #64

Closed thhave closed 2 years ago

thhave commented 2 years ago

Hi folks! We've got a trouble with AsyncClient when using MaxCommandAction.DELAY. Sometimes our code hangs when operating huge amount of records in an asynchronous manner. We think that the issue is in how AsyncCommandDelayingQueue schedules incoming commands.

Let's look at source code of AsyncCommandDelayingQueue: https://github.com/aerospike/aerospike-client-csharp/blob/88a10b0f547bfd6515145d3e265c35cbda77c239/Framework/AerospikeClient/Async/AsyncCommandDelayingQueue.cs#L59-L77 There's a place where a passed command can be lost (maybe an exception should be thrown there?):

...
if (argsQueue.TryDequeue(out e))
{
    if (delayQueue.IsEmpty)
    {
        command.ExecuteInline(e);
        return;
    }
    else
    {
        // Is the command just ignored?
        argsQueue.Enqueue(e);
    }
}
... 

In some cases it can occur and it seems that we have encountered it in production when there's not enough available connections to process commands.

We use TaskCompletionSource to wait for all operations to finish:

internal abstract class AerospikeOperationCompletionSource<T>
{
    private readonly TaskCompletionSource<T> _tcs;
    private readonly CancellationTokenRegistration _ctr;

    protected AerospikeOperationCompletionSource(CancellationToken cancellationToken)
    {
        _tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);

        _ctr = cancellationToken.Register(() => _tcs.TrySetCanceled(), useSynchronizationContext: false);
    }

    public Task<T> Task => _tcs.Task;

    protected void SetResult(T result)
    {
        _tcs.SetResult(result);

        _ctr.Dispose();
    }

    protected void SetException(Exception exception)
    {
        _tcs.TrySetException(exception);

        _ctr.Dispose();
    }
}

private class RecordCompletionSource : AerospikeOperationCompletionSource<OperationResult>, RecordListener
{
    private int _operationCount;

    private readonly ConcurrentBag<KeyValuePair<Key, Record>> _records = new();
    private readonly ConcurrentBag<AerospikeException> _exceptions = new();

    public RecordCompletionSource(int operationCount, CancellationToken cancellationToken) : base(cancellationToken)
    {
        if (operationCount <= 0)
        {
            throw new ArgumentOutOfRangeException(nameof(operationCount));
        }

        _operationCount = operationCount;
    }

    public void OnSuccess(Key key, Record record)
    {
        _records.Add(new KeyValuePair<Key, Record>(key, record));

        OnOperationCompleted();
    }

    public void OnFailure(AerospikeException exception)
    {
        _exceptions.Add(exception);

        OnOperationCompleted();
    }

    private void OnOperationCompleted()
    {
        var leftOperations = Interlocked.Decrement(ref _operationCount);

        if (leftOperations == 0)
        {
            SetResult(new OperationResult(_records, _exceptions));
        }
    }
}

Using:

var completionSource = new RecordCompletionSource(keys.Length);

foreach (var key in keys)
{
    client.Operate(policy: null, completionSource, key, ops);
}

var result = await completionSource.Task;

If at least one command is lost a completion task won't be resolved at all (but can be cancelled if we pass cancellation token).

Client policy:

new AsyncClientPolicy
{
    asyncMaxCommandsInQueue = 0,
    asyncMaxCommands = 200,
    asyncMaxCommandAction = MaxCommandAction.DELAY,
    writePolicyDefault = new WritePolicy
    {
        maxRetries = 0,
        sendKey = true
    }
}

Aerospike.Client 4.2.4 aerospike-server:5.6.0.9

thyn commented 2 years ago

same thing.

BrianNichols commented 2 years ago

I suppose the following race condition is possible.

Thread 1: ScheduleCommand() is called and argsQueue.TryDequeue() succeeds.

Thread 2: ScheduleCommand() is called and argsQueue.TryDequeue() fails. The command is added to the delayQueue before thread 1 checks if delayQueue is empty.

Thread 1: delayQueue empty check fails and the command is ignored.

I will investigate a cleaner solution. At a minimum, all unexpected edge cases should result in an exception and not a silent ignoring of the command.

BrianNichols commented 2 years ago

C# client 4.2.6 is released: https://download.aerospike.com/download/client/csharp/notes.html#4.2.6

This should fix the race condition.

thhave commented 2 years ago

Thanks, seems to work fine