aspnet / SignalR

[Archived] Incredibly simple real-time web for ASP.NET Core. Project moved to https://github.com/aspnet/AspNetCore
Apache License 2.0
2.38k stars 446 forks source link

async streaming methods on hub #1708

Closed bugged84 closed 6 years ago

bugged84 commented 6 years ago

I was messing around with a typical ChatHub demo. On my hub class, I've defined a streaming method that returns a ChannelReader and an async non-streaming method that writes the passed argument to the channel. Things are working great.

Now, I had the idea to update the streaming method to return Task<ChannelReader> to make my hub interface more consistent for clients. However, this causes the following error:

Error: The client attempted to invoke the non-streaming 'StreamMessage' method in a streaming fashion.

Does SignalR really not support async streaming methods, or am I overlooking something obvious?

public class ChatHub : Hub
{
    private static Channel<string> s_channel = Channel.CreateUnbounded<string>();

    public async Task<ChannelReader<string>> StreamMessage()
    {
        await s_channel.Writer.WriteAsync("hello subscriber");

        return s_channel.Reader;
    }

    public async Task UpdateMessage(string message)
    {
        await s_channel.Writer.WriteAsync(message);
    }
}
BrennanConroy commented 6 years ago

This was fixed after preview1, if you want to try it out you can grab the latest dev bits https://github.com/aspnet/Home#daily-builds

bugged84 commented 6 years ago

Will this be available in the 2.1 stable release?

BrennanConroy commented 6 years ago

Of course!

analogrelay commented 6 years ago

Closing this as the issue has been resolved in the upcoming release.

bugged84 commented 6 years ago

@BrennanConroy, I thought about this some more and I'm not sure returning a Task<ChannelReader<>> makes sense. It seems like the caller might want the reference to the reader immediately in order to start listening for messages rather than wait on that reference while the server waits on data to write to the channel.

davidfowl commented 6 years ago

Not sure what you mean.

analogrelay commented 6 years ago

It's up to you to return Task<ChannelReader<>> or ChannelReader<>. If you don't have any async code leading up to when you create your channel, you can just return ChannelReader<> instead.

bugged84 commented 6 years ago

Consider the following streaming method:

public async Task<ChannelReader<string>> GetChatChannel()
{
    string chats = await GetRecentChatsAsync(); // takes a long time

    // everything below is continued once the task above completes
    var channel = Channel.CreateUnbounded<string>();

    await channel.Writer.WriteAsync($"Recent: {chats}");

    return channel;
}

Does this mean the caller of this method will have only a Task<ChannelReader<string>> for a "long time" until the recent chats are returned? In other words, the caller will not have a ChannelReader<string> to read current chats until after the recent chats are written to the channel?

analogrelay commented 6 years ago

The method you wrote could just as easily return a List<string>, you're not using the ChannelReader for asynchrony at all. You use a ChannelReader when you want to emit items as they arrive.

However, you bring up a good point. Consider this code:

public async Task<ChannelReader<string>> GetChatChannel()
{
    var channel = Channel.CreateUnbounded<string>();
    var message = await GetNextMessage(); // takes a long time
    await channel.Writer.WriteAsync(message);
    message = await GetNextMessage(); // takes a long time
    await channel.Writer.WriteAsync(message);
    message = await GetNextMessage(); // takes a long time
    await channel.Writer.WriteAsync(message);

    return channel;
}

That would result in the client waiting a long time, then getting all three messages immediately after (defeating the purpose of using a Channel). Since you didn't return the channel until the very end, it was filled up with messages before the server got the Reader in order to consume them. You'd want to kick off a background processor. Imagine you had some async work to do in order to set up for streaming, any then you wanted to stream messages as they arrived. Something like this would be closer to what you want:

public async Task<ChannelReader<string>> GetChatChannel()
{
    // Do some async things that must finish before ANY messages can be returned
    var userInfo = await GetUserInformationFromDatabase();

    var channel = Channel.CreateUnbounded<string>();

    // Kick off a background thing, but don't wait for it.
    _ = Task.Run(async () => {
        try {
            while(HasMessagesToStream(userInfo)) {
                await channel.Writer.WriteAsync(await GetNextMessage(userInfo));
            }
        } finally {
            // If you don't do this, the client will never realize the stream has ended!
            channel.Writer.Complete();
        }
    });

    // Return the read end of the channel to SignalR so it can pump items to the client.
    return channel.Reader;
}

(I would avoid using a Task.Run right there but it illustrates the point. In practice I'd suggest using a DI service or other component that can track and log the lifetime of these "streamers")

bugged84 commented 6 years ago

Interesting. Once channel.Writer.Complete() executes, how will messages that arrive later be handled? Does the client have to periodically invoke the GetChatChannel() to stay up-to-date with new streams? I was planing to keep the channel writer alive for the lifetime of the service. Are you suggesting in your code example that this would be a bad idea? What would be wrong with never ending the stream?

I've updated my code sample below to illustrated how I imagined it to work. Notice that I'm intentionally never completing the channel writer.

private static Channel<string> s_channel = Channel.CreateUnbounded<string>();

public async Task<ChannelReader<string>> GetChatChannel()
{
   // a client app spins up and asks for a channel with some initial data

    string chats = await GetRecentChatsAsync(); // takes a long time

    // everything below is continued once the task above completes
    await s_channel .Writer.WriteAsync($"Recent: {chats}");

    return channel;
}

public async Task SendChat(string chat)
{
    // this method can be invoked by all client apps indefinitely during the
    //  lifetime of the service

    // all client apps get new chats immediately as they arrive
    await s_channel.Writer.WriteAsync(chat);
}

In reality, I am injecting my channels (e.g. a channel per "topic") into the Hub class. The channels are registered as SingleInstance with my Autofac container. I was not planning to invoke channel.Writer.Complete() until the channel was disposed of by the Autofac container. Do you have any guidance to the contrary?

davidfowl commented 6 years ago

Interesting. Once channel.Writer.Complete() executes, how will messages that arrive later be handled?

They wouldn't be, if you need it to be alive then don't complete it.

Does the client have to periodically invoke the GetChatChannel() to stay up-to-date with new streams?

No, that's not a thing, you could build that into your own protocol though.

I was planing to keep the channel writer alive for the lifetime of the service. Are you suggesting in your code example that this would be a bad idea? What would be wrong with never ending the stream?

That's fine, nothing is wrong with that the lifetime. I'm wondering if there's any quirk with returning the same channel for everyone (multiple producers) /cc @stephentoub . You may want to use an IObservable here.

bugged84 commented 6 years ago

@davidfowl when you say "multiple producers" are you referring to the GetChatChannel and SendChat methods using the same writer? Or were you thinking that I was returning the channel to the caller and every caller had access to the writer? Note that the return type of the GetChatChannel method is a ChannelReader as there is an implicit conversion happening there.

davidfowl commented 6 years ago

Sorry, I meant multiple consumers not producers