Closed iwheelbuy closed 4 years ago
Hi – thanks for the report. Yes, looks like there was an issue with the TestScheduler that meant it failed to schedule any repeating actions – which is required by Combine's CollectByTime publisher. I've posted a fix on a branch called test-scheduler-intervals attached to PR #26. Let me know if that resolves your issue.
It works, thanks a lot!
Or maybe it doesn't...
I decided to test Debounce
let upstream: TestablePublisher<String, TestError> = scheduler.createRelativeTestablePublisher([
(1, .input("a")),
(2, .input("b")),
(7, .input("c")),
(99, .completion(.finished))
])
let publisher = Publishers.Debounce(
upstream: upstream,
dueTime: 5,
scheduler: scheduler,
options: nil
)
let subscriber = scheduler.start(configuration: configuration, create: { publisher })
And got ["c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c", "c"]
events.
Please don't waste your time if it's not an easy feature to implement.
Not at all. Good to get the issues ironed out. Try the latest commit to PR #26
There is still something wrong with it...
interval = 12
count = 5
values = [1, 5, 7, 11, 13, 27, 35, 43, 51, 53, 55, 61, 67, 69, 73, 77, 79, 83, 87, 89, 91, 93, 95]
With test scheduler I got:
[[1, 5, 7, 11, 13], [27, 35, 43, 51, 53], [55, 61], [67, 69, 73, 77, 79]]
As you can see [27, 35, 43, 51, 53]
is not correct because 53 - 27 > 12
. Also some values are lost.
I've written the function which produces the expected answer and the result should be:
[[1, 5, 7, 11], [13], [27, 35], [43], [51, 53, 55], [61, 67, 69], [73, 77, 79, 83], [87, 89, 91, 93, 95]]
I've tested the "expected answer" function and CollectByTime
publisher with ordinary DispatchQueue.global(qos: .userInteractive)
and got the result which matches with "expected answer" function result.
P.S. I think there is something wrong with the approach to repeating actions. I'm going to continue without test scheduler for CollectByTime
publisher for now. Maybe will come back later.
P.S. Debounce
publisher works correct in master
P.S. I have created a test-scheduler-intervals
branch in my test repository in case you want to try those tests yourself.
Hi @iwheelbuy, thanks for writing this up – I really appreciate it! However, when I run your example numbers, it matches the control.
As you can see [27, 35, 43, 51, 53] is not correct because 53 - 27 > 12. Also some values are lost.
I'm not sure our understanding CollectByTime
matches up.
My understanding of CollectByTime
is that: when using the 'time' strategy, it will collect all elements from the point the publisher started to the point the interval has elapsed. So if using an interval of 5, it will group all elements between 0..<5, 5..<10, 10..<15 and so on. (With 'zero' being the time at which the publisher was subscribed to.)
When also specifying count as part of the strategy, the behaviour is the same, except that if the count is reached within a time interval (0..<5 or 5..<10), the publisher will immediately output the elements it has already collected for that time interval, but otherwise continue as normal.
As an example, if we schedule [0,1,2,3,4,5,6,7,8,9] to be published with an interval of 5, and a count of 5, we will receive [[0,1,2,3,4],[5,6,7,8,9]] and if we ask for an interval of 5 and count of 2 we will receive [[0,1],[2,3],[4],[5,6],[7,8],[9]].
As the interval is determined by the subscribe time, if we offset everything by 3 ([3,4,5,6,7,8,9,10,11,12]), with an interval of 5 and a count of 5, we will receive [[3,4],[5,6,7,8,9],[10,11,12]] and if we ask for an interval of 5 and count of 2 we will receive [[3,4],[5,6],[7,8],[9],[10,11],[12]].
Using your own example, we can test without Entwine using a control by:
func testControlCollectByTimeSequence() {
let interval = 12
let count = 5
let expected = [
[1, 5, 7, 11],
[13],
[27, 35],
[43],
[51, 53, 55],
[61, 67, 69],
[73, 77, 79, 83],
[87, 89, 91, 93, 95],
]
let sut = DispatchQueue(label: "serial-queue")
let subject = PassthroughSubject<Int, Never>()
let strategy = Publishers.TimeGroupingStrategy<DispatchQueue>
.byTimeOrCount(sut, .init(.milliseconds(interval)), count)
let publisher = Publishers.CollectByTime(upstream: subject, strategy: strategy, options: nil)
var cancellables = Set<AnyCancellable>()
var results = [[Int]]()
publisher.sink(receiveValue: { results.append($0) }).store(in: &cancellables)
let testIntervals = expected.flatMap { $0 }
let now = DispatchTime.now()
let group = DispatchGroup()
group.enter()
sut.schedule(after: .init(now + .milliseconds(1000))) { group.leave() }
for interval in testIntervals {
sut.schedule(after: .init(now + .milliseconds(interval))) { subject.send(interval) }
}
_ = group.wait(timeout: now + .milliseconds(2000))
XCTAssertEqual(expected, results)
}
And with Entwine, we have:
func testCollectByTimeSequence() {
let interval = 12
let count = 5
let expected = [
[1, 5, 7, 11],
[13],
[27, 35],
[43],
[51, 53, 55],
[61, 67, 69],
[73, 77, 79, 83],
[87, 89, 91, 93, 95],
]
let sut = TestScheduler(initialClock: 0, maxClock: 2000)
let strategy = Publishers.TimeGroupingStrategy<TestScheduler>
.byTimeOrCount(sut, .init(interval), count)
let subject = PassthroughSubject<Int, Never>()
let publisher = Publishers.CollectByTime(upstream: subject, strategy: strategy, options: nil)
var cancellables = Set<AnyCancellable>()
var results = [[Int]]()
publisher.sink(receiveValue: { results.append($0) }).store(in: &cancellables)
let testIntervals = expected.flatMap { $0 }
for interval in testIntervals {
sut.schedule(after: .init(interval)) { subject.send(interval) }
}
sut.resume()
XCTAssertEqual(expected, results)
}
Were you expecting something different?
Our understanding matches up! It was me who made a mistake. Values1, 5, 7, 11, 13..
. represent not just values, but also a time when to publish the value. I have accidentally replaced time of value with number of index in array...
I'll check again (^
As I've mentioned 1, 5, 7, 11, 13...
represents a value and a virtual time when to publish. It means I publish Int(13)
on VirtualTime(13)
. So I've corrected the time each value is published. The new result I got is:
[[1, 5, 7, 11], [13, 27, 35, 43, 51], [53, 55, 61, 67, 69]]
The first emitted array [1, 5, 7, 11]
is okay because it matches the 0 ..< 12
time interval and array size of 5
not reached.
The second array we expect to be in 12 ..< 24
time interval. And there is only one item from values which matches the range, it is 13
. So I expect that the second emitted array should be [13]
. But it is not...
Thanks for checking! If you run this test:
func testCollectByTimeSequence() {
let interval = 12
let count = 5
let expected = [
[1, 5, 7, 11],
[13],
[27, 35],
[43],
[51, 53, 55],
[61, 67, 69],
[73, 77, 79, 83],
[87, 89, 91, 93, 95],
]
let sut = TestScheduler(initialClock: 0, maxClock: 2000)
let strategy = Publishers.TimeGroupingStrategy<TestScheduler>
.byTimeOrCount(sut, .init(interval), count)
let subject = PassthroughSubject<Int, Never>()
let publisher = Publishers.CollectByTime(upstream: subject, strategy: strategy, options: nil)
var cancellables = Set<AnyCancellable>()
var results = [[Int]]()
publisher.sink(receiveValue: { results.append($0) }).store(in: &cancellables)
let testIntervals = expected.flatMap { $0 }
for interval in testIntervals {
sut.schedule(after: .init(interval)) { subject.send(interval) }
}
sut.resume()
XCTAssertEqual(expected, results)
}
What do you get?
package(url: "git@github.com:iwheelbuy/Entwine.git", .branch("test-scheduler-intervals"))
XCTAssertEqual failed: ("[[1, 5, 7, 11], [13], [27, 35], [43], [51, 53, 55], [61, 67, 69], [73, 77, 79, 83], [87, 89, 91, 93, 95]]") is not equal to ("[[1, 5, 7, 11], [13, 27, 35, 43, 51], [53, 55, 61, 67, 69], [73, 77, 79, 83, 87]]")
Curious – that passes for me. Are you testing against the latest test-scheduler-intervals
commit?
Checked also on .package(url: "git@github.com:tcldr/Entwine.git", from: "0.9.0")
, same result
XCTAssertEqual failed: ("[[1, 5, 7, 11], [13], [27, 35], [43], [51, 53, 55], [61, 67, 69], [73, 77, 79, 83], [87, 89, 91, 93, 95]]") is not equal to ("[[1, 5, 7, 11], [13, 27, 35, 43, 51], [53, 55, 61, 67, 69], [73, 77, 79, 83, 87]]")
Yeah, it needs to be the branch of the PR for this issue: package(url: "git@github.com: tcldr/Entwine.git", .branch("test-scheduler-intervals"))
After switching between PR branch, 0.9.0 tag and back - It started to pass successfully..... I wonder if it was a SPM bug/cache
I'll do some more research, thank you for your reply.
No problem, let me know if it works out and I'll merge this into master. Thanks!
Everything works!
Great news! Glad it worked out.
I've started to test various Combine Publishers and got stuck with
Publishers.CollectByTime
. I'm usingTestScheduler
andVirtualTimeInterval
to test it. I might got it all wrong, but I expect the publisher to buffer and periodically publish arrays of signals. The first array is published correctly and it respects theVirtualTimeInterval
. But then ALL other signals are published in the second array andVirtualTime
of each signal is ignored.With
VirtualTimeInterval = 2
I publishI expect
Publishers.CollectByTime
to group signals this way:["ab", "cd", "ef"]
. But I get["ab", "cdef"]
. Can you please tell me what is wrong with my code or maybe with my understanding theCollectByTime
publisher?