ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

observable<T>::window(...) appends empty result on modulo window numbers #522

Open geiseri opened 4 years ago

geiseri commented 4 years ago

I have the following code:

std::cout << "start" << std::endl;
std::vector<uint8_t> in{....};  //24 items
auto obs = rxcpp::observable<>::iterate(in);
obs.window(12)
   .map([](rxcpp::observable<uint8_t> w) {
     return w.reduce(
       std::vector<uint8_t> {},
       [](auto v, auto b) {v.push_back(b);return v; },
       [](auto v) { return v; })
   .map([](auto v) { 
      std::cout << "a " << v.size() << std::endl;
      return std::vector(v.begin(), std::next(v.begin(),6));
   })
   .as_dynamic();
})
.merge()
.subscribe(
   [](auto v) { std::cout << "b: " << v.size() << std::endl; },
   [](std::exception_ptr ep) { std::cout << "c: " << rxcpp::rxu::what(ep) << std::endl; },
   []() { std::cout << "d:" << std::endl; });
std::cout << "end" << std::endl;

What I expected to see:

start
a: 12
b: 6
a: 12
b: 6
d:
end

What I do see is:

start
a 12
b: 6
a 12
b: 6
a 0

and then a segmentation fault on the std::next(v.begin(),6) part.

If I have an in of size 30 though I see my expected result of:

start
a 12
b: 6
a 12
b: 6
a 6
b: 6
d:
end
kirkshoop commented 4 years ago

Yes. A new window is created when the old window closes and then the end of the input sequence is signaled and the empty window is emitted. This is expected. Adding a filter after the window can be used to prevent any windows with less than 6 items.

geiseri commented 4 years ago

Aha, that was not obvious to me. Is there a sane way to check for the end of the stream before closing the window? Or if there is an end with an empty window drop it? In my case I do not care if it is less than 6, I just care it's not 0 True a filter can address that though, it just seems a bit odd.