matteobortolazzo / couchdb-net

EF Core-like CouchDB experience for .NET!
MIT License
128 stars 31 forks source link

Changes Feed for pushing new data #58

Closed biapar closed 4 years ago

biapar commented 4 years ago

Is't possibile to use this driver to receive new data from _changes?

matteobortolazzo commented 4 years ago

Hi, not yet. What's the best implementation for this?

biapar commented 4 years ago

Polling? SignalR? MQTT?

n9 commented 4 years ago

@matteobortolazzo Not sure what you mean exactly, I would say:

matteobortolazzo commented 4 years ago

Yes, my question was about what's be most effective way for developers. I was thinking of simple events but I will check IAsyncEnumerable. I want to work on this feature from this week onward

matteobortolazzo commented 4 years ago

FYI I am pretty sure I will use IAsyncEnumerable with CancellationToken

matteobortolazzo commented 4 years ago

@biapar @n9 I implemented an early code for this in the changes-feed branch . Can you provide me some feedback if you don't mind please?

The are 2 new methods for the database:

// Polling
Task<ChangesFeedResponse<TSource>> GetChangesAsync(ChangesFeedOptions options, ChangesFeedFilter filter)

// Continuous feed
IAsyncEnumerable<ChangesFeedResponseResult<TSource>> GetContinuousChangesAsync(ChangesFeedOptions options, ChangesFeedFilter filter, CancellationToken cancellationToken)

To use the continuous feed:

using var cancelSource = new CancellationTokenSource();
await foreach (var feedEvent in rebels.GetContinuousChangesAsync(option, filter, cancelSource.Token))
{
    if (/* Something */) 
       cancelSource .Cancel();
}
n9 commented 4 years ago

@matteobortolazzo I have some issues with cancellation token (seems not working properly). I will investigate more.

n9 commented 4 years ago

It seems to me that cancellation token is not working because it is not used during reading (~ waiting for new change): https://github.com/matteobortolazzo/couchdb-net/blob/24b851e5961cd06bd5f544e04d0a49c0df55bd4e/src/CouchDB.Driver/CouchDatabase.cs#L517

You might need to use ReadAsync, as ReadLineAsync is not (yet?) cancellable.

Btw. What situation handles the following condition? https://github.com/matteobortolazzo/couchdb-net/blob/24b851e5961cd06bd5f544e04d0a49c0df55bd4e/src/CouchDB.Driver/CouchDatabase.cs#L512-L515

matteobortolazzo commented 4 years ago

I got what you mean, in my test it was working because I cancel the token after receiving a specific document change. But if it waits in the readline it breaks I suppose. I will add tests for sure.

About that piece of code, I copied that somewhere. If not needed I'll remove it

matteobortolazzo commented 4 years ago

@n9 Can you test again with the latest update please?

matteobortolazzo commented 4 years ago

@biapar @n9 I released an alpha version of v2 if you want to try it out.

There is a complete rewrite of the internal query logic, the change feed API an other things.

https://github.com/matteobortolazzo/couchdb-net/blob/master/V2.md

n9 commented 4 years ago

@matteobortolazzo Great that you removed blocking calls (by AsyncContext.Run). Cancellation of GetContinuousChangesAsync is still not working. It's blocked on EndOfStream. Do not use it.

More thoughts: Do not allocate buffer in cycle (~ do not use ReadCharAsync). Allocate one buffer in ReadLineAsync instead. (You might use MemoryPool<T>.Shared.Rent() for allocation.)

More thoughts for v2: I would not recommend to throw exception in dispose. (In case, the Logout will fail.)

matteobortolazzo commented 4 years ago

@n9 Can you send me a test example? Because it's working for me so probably I am not considering all cases.

About logout, do you want a try/catch on logout?

n9 commented 4 years ago

ad example

I am using simple LINQPad code:

await foreach (var doc in db.GetContinuousChangesAsync(new ChangesFeedOptions
{
    IncludeDocs = true
}, null, this.QueryCancelToken))
{
    doc.Dump();         
}

this.QueryCancelToken is cancellation token. I cancel cancellation token after all changes are received.

ad Logout

As calling Logout in Dispose is not critical, I would add try/catch into Dispose method around Logout call. (You might use .NET logging to log Logout exception in Dispose.)

matteobortolazzo commented 4 years ago

I agree that ReadCharAsync is not the solution. However if I create E2E tests with a real DB the cancellation token works as expected. I never used LINQPad but could it be related to it?

About the logout, I agree. What do you think about a "ThrowExceptionOnLogoutFail" option?

n9 commented 4 years ago

ad example

However if I create E2E tests with a real DB the cancellation token works as expected.

For now, let's ignore the issue.

But just do not use reader.EndOfStream. It is doing blocking read call. (It is not asynchronous.) Someone might uses your lib in desktop or mobile app and s/he will not expect that asynchronous function will do a blocking call and might get issues with blocking of main thread.

And the issue with cancellation will disappear two:)

I never used LINQPad but could it be related to it?

LINQPad is just IDE.

Issue appears in situation when the function is cancelled after all existing changes are received and there are no new changes. Because in this situation reader.EndOfStream is waiting. (It internally calls blocking reader.Read() that is waiting for next input, but no input is coming; see the comment there.)

ad Logout

What do you think about a "ThrowExceptionOnLogoutFail" option?

What the option will do? Will it suppress all logout errors? (I would suppress only errors during dispose not in other cases.)

matteobortolazzo commented 4 years ago

@n9 this should work https://github.com/matteobortolazzo/couchdb-net/blob/v2/src/CouchDB.Driver/Extensions/StreamReaderExtensions.cs

n9 commented 4 years ago

I have following questions:

EndOfStream handling

                if (charRead == 0)
                {
                    continue;
                }

Why this? How this solve EndOfStream?

Multibyte characters

                var str = Encoding.UTF8.GetString(owner.Memory.Span[..charRead]);

What if buffer ends in the middle of multibyte character? Hint: Use Decoder instead.

Line handling

                IEnumerable<string> filteredList = lines
                    .Where(line => line.Length > 0 && line != currentRemainder);

Let's say the currentRemainder is "abc". You are going to remove all lines that equals to "abc".

matteobortolazzo commented 4 years ago

@n9

EndOfStream It is true if you cancel the task while ReadAsync is waiting. If ReadAsync is cancelled while reading isMessageComplete will be false and it will ignore the line.

Multibyte characters You are right. Sorry I am a noob with bytes and stream. How should I use the Decoder? GetCharCount and GetChars? But, are they going to be multibyte chars? Because the content of the document is not returned. Just rev, id and seq. Also, is it possible that HTTP handle this char thing on its own?

Line handling How can a remainder be equal to a full row? By definition it cannot be possible (in CouchDB)

I tested this implementation cancelling the token after receiving X messages and with > 4k bytes to read, and it works as expected.

n9 commented 4 years ago

Context

I was reviewing this function according its name. Later, you might use it in another context. Spec might change. And you may forget in future that you hacked it for one particular HTTP response. (I know it is currently in ChangesFeed namespace.)

EndOfStream

It is true when the stream has ended. What if CouchDB is going to stop (e.g. to update). Current version might reset the connection, later version might just finish the stream. In that case, it seems to me that it might start looping.

Multibyte characters

Same as Context. Moreover, are document IDs limited to ASCII? Create the Decoder outside the loop. (Decoder will keep the first part of char.) For GetCharCount and GetChars, check the example from official docs.

Line handling

Same as Context. Moreover, why do you need to compare all lines, why just not ignore the last line in the loop (as you do for the first line) if isMessageComplete is false?

matteobortolazzo commented 4 years ago

Implement a generic ReadLineAsync requires a lot of tests and work.

For multibyte I agree that the ID may be not ASCII (I just tested). So I will try to implement it with the Decoder.

About line handling, I didn't want to enumerate the list to know the number of elements. But yes, I will change it.

And with the end of stream, what do you suggest? Return string.empty and check for it where I use the readline?

btw thanks for the feedback

n9 commented 4 years ago

About line handling, I didn't want to enumerate the list to know the number of elements. But yes, I will change it.

And with the end of stream, what do you suggest? Return string.empty and check for it where I use the readline?

matteobortolazzo commented 4 years ago

@n9 The Decoder is smart! Let me know what you think about the latest changes

n9 commented 4 years ago

Cancellation works for me. I have added minor comments to your commit.

PS: In case you would like to tune performance, you could reduce allocations using StringSegment and StringTokenizer. Or better simply do not decode bytes to chars: just split raw bytes and use Utf8JsonReader to directly parse json from byte lines (ReadOnlySequence).

But I think that current implementation based on pools is not much worse. Always, it is good to remember that premature optimization is the root of all evil (Donald Knuth).

matteobortolazzo commented 4 years ago

Thanks for the comments! Let me know how it looks now. About Utf8JsonReader I will check later

n9 commented 4 years ago

LGTM

matteobortolazzo commented 4 years ago

Great, I close this for now. I'll optimize it if needed