realm / realm-swift

Realm is a mobile database: a replacement for Core Data & SQLite
https://realm.io
Apache License 2.0
16.32k stars 2.15k forks source link

Combine to support Runloop schedulers #7444

Closed timstudt closed 3 years ago

timstudt commented 3 years ago

Problem

Currently the .receive(on: Scheduler), .subscribe(on: Scheduler) throw a fatal error when scheduler is not a DispatchQueue

// Combine.swift
    guard let queue = scheduler as? DispatchQueue else {
                fatalError("Cannot subscribe on scheduler \(scheduler): only serial dispatch queues are currently implemented.")
            }

Solution

But why not support Runloop schedulers? It is quite common to use following in SwiftUI:

someRealmResult
     .collectionPublisher
     .receive(on: Runloop.main)
     .assign(to: &$variable)

Actually with supporting .subscribe(on: customRunloop), I guess it would improve Realm since it requires a thread with Runloop for autorefresh feature https://docs.mongodb.com/realm/sdk/ios/advanced-guides/threading/

more info: https://forums.swift.org/t/runloop-main-or-dispatchqueue-main-when-using-combine-scheduler/26635/27

How important is this improvement for you?

Would be a major improvement

dianaafanador3 commented 3 years ago

Hi @timstudt Thanks for suggesting the feature enhancement, I'll take a look at this and inform you if we are adding it to our backlog of must-do's.

tgoyne commented 3 years ago

Our implementation of receive(on: Dispatch.main) actually maps it to receiving on the main runloop, as supporting Realms confined to both the main runloop and the main dispatch queue separately would result in all sorts of wacky and confusing behavior. We could probably support Runloop.main as well for better discoverability pretty easily. Arbitrary runloops would also be possible, but it's unclear if there's any good reason to receive on a non-main runloop rather than on a dispatch queue. Fully custom schedulers would be difficult.

dianaafanador3 commented 3 years ago

@timstudt any particular reason for using non-main runloop ?

timstudt commented 3 years ago

I understand that supporting Runloop.main is redundant. I was aiming for a subscription on a custom dispatchQueue, but that gave me a fatal error. that's why I thought using a runloop as a workaround, like this

  return try Realm().objects(objectType) // called on .main or any other
                    .filter(predicate)
                    .collectionPublisher
                    .subscribe(on: DispatchQueue(label: "", qos: .utility) // NOTE: realm needs subscriptions to be handled on queue with run loop else: 'NSException *    "Can only add notification blocks from within runloops."'
                    .freeze() 
                    .map(\.elements)

but I cannot reproduce anymore on latest Realm.

tgoyne commented 3 years ago

When a Realm is confined to a dispatch queue (either via Realm(queue:) or things like .subscribe(on:) and receive(on:)) it uses the GCD queue instead of the current thread's runloop, so the current thread doesn't need to have a runloop running. Autorefresh and notifications work mostly the same as they would with a runloop.

The Realm(queue:) initializer requires that you currently be on that queue. .subscribe(on:), .receive(on:) and .observe(on:) do not and support any serial queue.

timstudt commented 3 years ago

thanks for clarifying. in this case support for runloop schedulers is not needed

dianaafanador3 commented 3 years ago

@timstudt I proceed to close the issue, as @tgoyne comments seems to be answering your question.

timstudt commented 3 years ago

@tgoyne Damn just closed it. but it was the Combine.Publisher.subscribe(on:) operator that causes fatal error. not Realms custom Value.subscribe(on:) .

is that a bug?

see:

    _ =   observe(SomeObject.self, predicate: somepredicate)
           .subscribe(on: DispatchQueue(label: "")) // fatal error: 'Realm accessed from incorrect thread.'
           .sink { ... }

   func observe(_ type: Object.Type, predicate: NSPredicate) -> AnyPublisher<[Object], Error> {
                return self.realm().objects(objectType)
                    .filter(predicate)
                    .collectionPublisher
                    .subscribe(on: DispatchQueue(label: "")) // this works as expected
                    .map(\.elements)
     }
tgoyne commented 3 years ago

.subscribe(on:) only makes sense as the first step in a Combine pipeline. Calling it multiple times does strange things which are unlikely to ever be what you want. .deliver(on:) is most likely what you actually want for making the .sink() block be called on a specific queue. The Realm-specific bit is that you need to do .threadSafeReference().deliver(on:) if the current value of the pipeline is a Realm thread-confined object so that we can do our own logic to pass it to the target scheduler.

timstudt commented 3 years ago

Ok, thanks. I think I can work with that.