RxSwiftCommunity / RxGRDB

Reactive extensions for SQLite
MIT License
218 stars 35 forks source link

On which Thread the actual fetching of the data is done ? #52

Closed erdem-inan closed 5 years ago

erdem-inan commented 5 years ago

Hi

First, thank you for this amazing library, it is really good and helps a lot.

I would like to know on which thread the actual fetching of the data is made ? Actually in our project we are using GRDB. Now I am working on integrating RxSwift and RxGRDB.

let say that I have this:

//On the main thread.
SQLRequest<String>(sql: query)
            .rx
            .fetchAll(in: databasePool)
            .do(onNext: { /* do amazing stuff */ })
            .subscribe()
            .dispose(by: disposableBag)

I understand that the emited values will be on the main thread which is good to me and even if it is not, the observeOn operator can help me change this :).

So I am wandering, Is the fetching is done on the main thread it self ? Main thread asynchronously ?

I try something like this after:

//On main thread
        print("BEFORE")
        SQLRequest<String>(sql: queryBuilder.query)
            .rx
            .fetchAll(in: databasePool)
            .do(onNext: { _ in print("DONEXT") })
            .subscribe()
            .dispose(by: disposableBag)
        print("AFTER")

I got printed: BEFORE AFTER DONEXT

I wanted an answer to be sure about what I am doing and be sure about what is going on under the hood :).

for information I am user DatabasePool for the DB connection.

Thank you for the help

groue commented 5 years ago

Hello @erdem-inan

I understand that the emit values will be on the main thread which is good to me and even if it is not, the observeOn operator can help me change this :).

Correct!

So I am wandering, Is the fetching is done on the main thread it self ? I try something like this after:

print("BEFORE")
SQLRequest<String>(sql: queryBuilder.query)
    .rx
    .fetchAll(in: databasePool)
    .do(onNext: { _ in print("DONEXT") })
    .subscribe()
    .dispose(by: disposableBag)
print("AFTER")

I got printed: BEFORE, AFTER, DONEXT

If you get BEFORE, AFTER, DONEXT, in this order, then I conclude that this code does not run from the main queue.

Since it does not run from the main queue, it prints BEFORE, then dispatches DONEXT asynchronously on the main queue, and then prints AFTER. This means DONEXT and AFTER run concurrently. Their order is not well defined.

On the other side, if you run this code from the main queue, then you are guaranteed to read BEFORE, DONEXT, AFTER, in this order, because the first DONEXT is run synchronously.

(If you witness something else, then you have found a bug, and it will have to be fixed.)

You can read a longer explanation of this behavior in the Value Observables documentation chapter.

Back to your question:

which thread the actual fetching of the data is made?

As all fetches: in a dispatch queue where database can be accessed:

let values = try databasePool.read { db in
    ... // in such a queue (which is not the main queue)
}

But the queue it runs in is independent from the synchronous or asynchronous behavior you witness when you subscribe to an RxGRDB observable:

RxGRDB, internally, does everything that is necessary for those public and documented rules to hold true. Those rules are what applications need in order to be able to build robust code, not really the implementation details.

At least, I hope so 😅 Please ask more questions if something is still not clear to you!

erdem-inan commented 5 years ago

Ok if I understood well, so the fetching it self is made on a queue manage by GRDB, so it is impossible for us to do a fetching on the main thread right ?

so if I got this:

        DispatchQueue.main.async {
            data.databasePool.read { db in
                .....
            }
        }

Even this time the read it self will be made on another queue which is not the main queue right ?

We are facing some performance issue, and I am refactoring/ rewriting the DB layer from the app. I want to be sur of that :)

so this point :

means that our main queue shouldn't be block right ?

Thanks for the quick reply !

groue commented 5 years ago

Ok if I understood well, so the fetching it self is made on a queue manage by GRDB, so it is impossible for us to do a fetching on the main thread right ?

This topic belongs more to GRDB than RxGRDB. GRDB allows you to fetch from any queue, except database queues themselves:

// On main queue: OK
databasePool.read { db in ... }

// On some background queue: OK
databasePool.read { db in ... }

// on some database queue: WRONG
databasePool.read { db in
    databasePool.read { db in ... } // fatal error: Database methods are not reentrant
}

The database access methods (read, write) are blocking: they return after you db job is performed. The db job itself is performed on some private GRDB queue:

// On some queue Q
var a = 0
databasePool.read { db in
    // On some GRDB queue
    a = 1
}
// Back on some queue Q
assert(a == 1) // always true

so if I got this:

DispatchQueue.main.async {
    data.databasePool.read { db in
    .....
    }
}

Even this time the read it self will be made on another queue which is not the main queue right ?

This code just postpones the reading, which happens on some GRBD queue, but blocks the main queue (because databasePool.read is called from the main queue, and blocks until your db job is completed).

So if you are on the main queue, want to fetch some stuff without blocking the main queue, and finally use those fetched values on the main queue, then do use some background queue, that's what they are for:

DispatchQueue.global(qos: .userInitiated).async {
    // Not on the main queue
    let stuff = data.databasePool.read { db in
        ...
    }
    DispatchQueue.main.async {
        // Back on the main queue
        use(stuff)
    }
}

But this is when you use GRDB.

RxGRDB, on the other side, makes everything it can to avoid blocking your main queue. Especially when you use a Database pool, which makes it possible to use application threads in a very efficient way.

But RxGRDB will block the main queue if you instruct it to do so. When you subscribe to a Value observable from the main queue, with all default configuration, you do instruct RxGRDB to block your main thread until it notifies the initial value:

// If run from the main queue, then it blocks the main queue until the first element is fetched
request.rx
    .fetchAll(in: databasePool)
    .subscribe(...)

This blocks the main queue because this is the documented behavior.

If this behavior is not the one you want, then something has to change in your code. For example:

// Use MainScheduler.asyncInstance
request.rx
    .fetchAll(in: databasePool, scheduler: MainScheduler.asyncInstance)
    .subscribe(...)

This change will make sure your main queue is never blocked by RxGRDB. EDIT: this was not correct. Here is some sample code which actually never blocks the main thread:

// Subscribe from some background queue:
request.rx
    .fetchAll(in: databasePool)
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .subscribe(...)

In this case the first value will not be emitted immediately:

request.rx
    .fetchAll(in: databasePool)
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .subscribe(onNext: { _ in print("NEXT") })
// <- Here NEXT is not printed yet.

It's up to your application to decide what's best for it.

groue commented 5 years ago

This change will make sure your main queue is never blocked by RxGRDB.

This is the intent, but as I'm double checking what I am saying, I become less sure. I'll figure this out shortly (EDIT: done)

groue commented 5 years ago

Meanwhile, @erdem-inan, could you please confirm my hypothesis, which is that the "performance issue" you talk about is the undesired blocking of the main queue due to RxGRDB? If not, please try to better explain the defect you want to fix.

erdem-inan commented 5 years ago

Thanks for the explanation, it is clear now. I have a better understanding of all of this :)

The performance issue that I was talking is not due to RxGRDB. For the story, few week ago we observe some performance issue in the app. So decided to take the time to look a this and so I started investigating the issue, I bring me to the Db layer of our app and started to dive in it. from there I started to see that some query were made from the main thread and so blocking it (like you said with your explanation).

So I decided to refactor that. And integrate RxGRDB with it. My questions were mainly for understand and going on the right way with the refactor.

Now it is clear and I am certain that the issue comes from that.

Thanks you very much for the time and explanation !

groue commented 5 years ago

All right, glad things are getting more clear to you :+1:

Now you just put a shadow of a doubt in my mind about the best technique to avoid blocking the MQ, and your question may well end up with a bug fix or a better documentation :-)