apache / pulsar-dotpulsar

The official .NET client library for Apache Pulsar
https://pulsar.apache.org/
Apache License 2.0
234 stars 60 forks source link

Add support for HasMessageAvailable #207

Closed smbecker closed 5 months ago

smbecker commented 6 months ago

Fixes #206

Description

Add IReceive.HasMessageAvailable to allow a consumer to check if any messages could be received or not.

blankensteiner commented 5 months ago

Hi @smbecker and thank you for the PR :-) In regards to "bool HasReachedEndOfTopic();", this is a state and could be checked that way. In regards to "ValueTask HasMessageAvailable(CancellationToken cancellationToken = default);" I'm not sure why that is needed and why it is async? If you want to call "HasMessageAvailable" to make sure that Receive is not blocked, then that can not be guaranteed, because HasMessageAvailable might return true, but before you get to call Receive the Reader/Consumer can have lost the connection or faulted. Are that you really need something like this: bool TryReceive(out Message message)?

smbecker commented 5 months ago

In regards to "bool HasReachedEndOfTopic();", this is a state and could be checked that way.

It currently is not possible to check current state, which was the purpose of this change. Also, this allows a consistent method for checking that covers both readers and consumers, which seems to line up with the purpose of the IReceive interface. Should I move the changes to IStateChanged to a separate PR? Should I remove this method in favor of just allowing checking the current state?

In regards to "ValueTask HasMessageAvailable(CancellationToken cancellationToken = default);" I'm not sure why that is needed and why it is async? If you want to call "HasMessageAvailable" to make sure that Receive is not blocked, then that can not be guaranteed, because HasMessageAvailable might return true, but before you get to call Receive the Reader/Consumer can have lost the connection or faulted. Are that you really need something like this: bool TryReceive(out Message message)?

I was actually looking for something more similar to TryReceive but saw hasMessageAvailableAsync in the Java Client. However, it isn't simply just "Is there any messages that are already downloaded?"; It is also answering the question of "Is there any messages that could be downloaded from the broker?". The Java Client actually makes a call to GetLastMessageId to determine if the server has any additional messages that could be downloaded. Should I add TryReceive that only attempts to receive from what has already been downloaded? Should I update the implementation to be more inline with the Java Client in using GetLastMessageId?

Please advise...

smbecker commented 5 months ago

In regards to "ValueTask HasMessageAvailable(CancellationToken cancellationToken = default);" I'm not sure why that is needed and why it is async?

My original intent was to replicate something similar to EndOfStreamAction from the pulsar-client-reactive project

blankensteiner commented 5 months ago

It currently is not possible to check current state, which was the purpose of this change. Also, this allows a consistent method for checking that covers both readers and consumers, which seems to line up with the purpose of the IReceive interface. Should I move the changes to IStateChanged to a separate PR? Should I remove this method in favor of just allowing checking the current state?

Exposing a CurrentState property is doable, but I am a bit worried that people will use it instead of monitoring the state, which is what they really should be doing. When monitoring the state, changes are pushed to the user, instead of the user having to poll them by calling CurrentState. I have seen people wait for the state to change to "Connected" before using the producer (calling Send) or consumer/reader (calling Receive), but that makes no sense. As mentioned above, checking the state or asking if there are messages doesn't ensure that the following usage will succeed, because much can chance in the milliseconds before you get around to using the producer/consumer/reader. So the user should just call Send/Receive and if blocking is an issue, just set a timer on the cancellation token.

The Java Client actually makes a call to GetLastMessageId to determine if the server has any additional messages that could be downloaded.

Regarding "asking the broker if there are new messages", I have no idea why the other clients would do that (if that is the case) since messages are pushed from the broker to DotPulsar as soon as the broker gets them. So if there are any messages, DotPulsar already have them buffered. Creating TryReceive as an extension method is also doable and could make sense, would that float your boat? :-)

smbecker commented 5 months ago

@blankensteiner would something like this be more appropriate?

blankensteiner commented 5 months ago

TryReceive can be written as an extension method. I was think something like:

public static bool TryReceive<TMessage>(this IReceive<IMessage<TMessage>> receiver, out IMessage<TMessage>? message)
{
    message = null;
    var cts = new CancellationTokenSource();
    var messageTask = receiver.Receive(cts.Token);
    cts.Cancel();
    if (messageTask.IsCompletedSuccessfully)
    {
        message = messageTask.Result;
        return true;
    }

    return false;
}

I haven't tested this, but something like that would do the trick.

blankensteiner commented 5 months ago

Hi @smbecker, does this extension method float your boat? :-)

blankensteiner commented 5 months ago

Hi @smbecker An implementation of TryReceive is in master.