staltz / callbag-share

👜 Callbag operator that broadcasts a single source to multiple sinks
MIT License
22 stars 3 forks source link

Completed streams issue #5

Closed Andarist closed 6 years ago

Andarist commented 6 years ago

Shouldn't newly added subscribers on completed stream receive completion? Atm source will get called again in such situation.

If my understanding is correct I can prepare a PR fixing this, imho it would need to be a major bump of this package though.

staltz commented 6 years ago

I think we should just make a separate package for that. In RxJS, there are multiple ways to implement multicasting. share in RxJS is a shortcut for multicast(new Subject())+refCount. There are so many ways to make variations. I'd like to keep callbag share as just a simple util that does "basic multicasting", but if anything more custom is needed, then people should use more fine-grained operators. You could think of share as a coarse-grained operator in the abstraction stack. The solution is creating more fine-grained operators and composing those to build coarse-grained operators. Ideally, this package here would import callbag-multicast and callbag-refcount and just compose them together. :)

Andarist commented 6 years ago

I'll certainly give it a thought. I'm not a stream expert, hence such questions - I really like tinkering with those abstractions though, however I still have much to learn.

This means however that description of the package should be updated, right? Because it doesn't behave like .publish().refCount().

staltz commented 6 years ago

How does it not behave like publish refCount? As far as I remember, publish is a shortcut for multicast(() => new Subject()) which means a new Subject is created for each execution of the source stream (from start to complete), which is what you mentioned in this issue. Or am I missing a corner case?

Andarist commented 6 years ago

Rx

var Rx = require('rxjs') 
var { take, publish, refCount } = require('rxjs/operators') 

var makeSub = (label = '') => ({
  next: val => console.log(`${label}: val ${val}`),
  complete: () => console.log(`${label}: complete`)
})

var stream$ = Rx.interval(100).pipe(
  take(1),
  publish(),
  refCount()
)

stream$.subscribe(makeSub('1'))

setTimeout(() => {
  stream$.subscribe(makeSub('2'))
}, 300)

/*
  output

  1: val 0
  1: complete
  2: complete
*/

callbag

var pipe = require('callbag-pipe')
var interval = require('callbag-interval')
var share = require('callbag-share')
var take = require('callbag-take')
var subscribe = require('callbag-subscribe')

var makeSub = (label = '') => ({
  next: val => console.log(`${label}: val ${val}`),
  complete: () => console.log(`${label}: complete`)
})

var stream$ = pipe(
  interval(100),
  take(1),
  share
)

pipe(
  stream$,
  subscribe(makeSub('1'))
)

setTimeout(() => {
  pipe(
    stream$,
    subscribe(makeSub('2'))
  )
}, 300)

/*
  output

  1: val 0
  1: complete
  2: val 0
  2: complete
*/
staltz commented 6 years ago

And yet:

var Rx = require('rxjs')
var { take, publish, refCount, share } = require('rxjs/operators')

var makeSub = (label = '') => ({
  next: val => console.log(`${label}: val ${val}`),
  complete: () => console.log(`${label}: complete`)
})

var stream$ = Rx.interval(100).pipe(
  take(1),
  share()
)

stream$.subscribe(makeSub('1'))

setTimeout(() => {
  stream$.subscribe(makeSub('2'))
}, 300)

/*
  output

1: val 0 
1: complete 
2: val 0 
2: complete 
*/