kubemq-io / kubemq-CSharp

C# Library for KubeMQ server
MIT License
30 stars 8 forks source link

Transaction AckMessage/RejectMessage is hanging up, don't return any result #21

Closed dust63 closed 1 year ago

dust63 commented 1 year ago

When I try to use AckMessage or RejectMessage of the transaction object the code is hanging and no result was returned:

Here a sample to reproduce the consumer problem

var KubeMQServerAddress = "localhost:50000";
var queue = new KubeMQ.SDK.csharp.Queue.Queue("q1", "Csharp-sdk-cookbook-queues-batch-client", null, 12, KubeMQServerAddress, null);
        var transaction = queue.CreateTransaction();
        Message message = null;
        try
        {
            var resRec = transaction.Receive(1, 12);
            if (resRec.QueueErrors == KubemqQueueErrors.ErrNoNewMessageQueue)
            {
                return;
            }

            if (resRec.IsError)
            {
                throw new IOException($"Message dequeue error: {resRec.Error}");
            }
            message = resRec.Message;

            if (message != null)
                if (message.Attributes.Sequence % 2 == 0)
                {

                    var resAck = transaction.AckMessage(message.Attributes.Sequence);
                    // Hanging, can't pass to the next line
                    Console.WriteLine(resAck.Message);
                }
                else
                {
                    var resReject = transaction.RejectMessage(message.Attributes.Sequence);
                    // Hanging, can't pass to the next line
                    Console.WriteLine(resReject.Message);
                }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
            if (message != null)
                transaction.RejectMessage(message.Attributes.Sequence);
        }
        finally
        {
            transaction.Close();
        }
dust63 commented 1 year ago

When I debug from code I can see that the call to AckMessage method is hanging on await _stream.RequestStream.WriteAsync(sr); in the Transaction.cs file

      private async Task<StreamQueueMessagesResponse> StreamQueueMessage(StreamQueueMessagesRequest sr, CancellationToken cancellationToken)
        {
            if (_stream == null)
            {
                throw new RpcException(new Status(StatusCode.NotFound, "stream is null"), "Transaction stream is not opened, please Receive new Message");
            }

            // implement bi-di streams 'SendEventStream (stream Event) returns (stream Result)'

            // Send Event via GRPC RequestStream,
            // Hanging here
            await _stream.RequestStream.WriteAsync(sr);
            await _stream.ResponseStream.MoveNext(cancellationToken);

            return _stream.ResponseStream.Current;
        }
kubemq commented 1 year ago

Hi, Please use the QueueStream implementation