Effect-TS / effect

An ecosystem of tools to build robust applications in TypeScript
https://effect.website
MIT License
7.63k stars 243 forks source link

Need for `Stream.share` operator #3191

Open dilame opened 4 months ago

dilame commented 4 months ago

What is the problem this feature would solve?

I have a WebSocket connection to a remote server. To manage subscriptions, I need to send messages like subscribe@someTopic to subscribe to a topic and unsubscribe@someTopic to unsubscribe. Each topic should be represented as a stream.

The challenge arises when multiple parts of my code need to subscribe to the same topic. They should share the same WebSocket connection but be able to operate independently. This is difficult with the classic Scope finalization model because if one of the two consumers closes the scope, it sends the unsubscribe@someTopic signal to the server. Consequently, the second consumer stops receiving events without notice.

The Stream.share operator addresses this issue by allowing multiple independent consumers to subscribe to the same topic without interfering with each other. This ensures that each consumer can independently manage its subscription and receive events without being affected by others.

Here's how i would like to achieve this:

const sharedWebSocket$ = createWebSocketConnection().pipe(
  share({
    connector: PubSub.unbounded(),
    resetOnRefCountZero: true
  })
);

const subscribeTopic = (topic: string) => sharedWebSocket$.pipe(
  filter(message => message.topic === topic),
  ensuring(unsubscribeTopic(topic))
  share({
    connector: PubSub.unbounded(),
    resetOnRefCountZero: true
  })
);
const sharedPriceTopic$ = subscribeTopic('price');

This setup allows each topic subscription to be shared among multiple consumers, ensuring independent operation and proper resource management.

Also, Tim Smart has almost finished the implementation of a replayable PubSub https://github.com/Effect-TS/effect/pull/3135 . This implementation is particularly useful for creating replayable shared streams.

Another use case involves a Stream representing a WebSocket connection that emits the current asset price every second. I need to access the latest price as soon as possible to calculate the order price quickly when an event occurs.

The ideal solution would be:

const replayebleSharedPrice$ = sharedPriceTopic$.pipe(
  share({
    connector: PubSub.unbounded({ replay: 1 }),
    resetOnRefCountZero: false // It is important to keep the latest price available
  })
);

With this setup, I can access the latest price in any part of my code within the Scope, ensuring timely calculations and responsiveness.

The Stream.share operator is crucial for managing shared streams in scenarios with multiple independent consumers. It allows for efficient resource management and ensures that consumers can operate without interfering with each other. This operator is essential for real-world applications that rely on shared data streams, such as WebSocket connections for real-time data updates.

What is the proposed solution?

What alternatives have you considered?

Stream.broadcast* family, but it doesn't covers any of these requirements

dilame commented 4 months ago

In trading, there is a concept known as technical analysis. There are various indicators, such as RSI. To calculate RSI, you need the last N candles (klines) (usually 14). Let's say the trading bot is initially doing other things (calculating other market aspects, waiting for an order to be executed, spending time with family), but at the very beginning, it subscribed to the candles of the asset of interest:

const klines = yield* subscribeTopic('kline@USD/EUR').pipe(
  share({
    connector: PubSub.unbounded({replay: 14})
  })
)

So, by the time it needs to calculate the RSI, it will already have either all the last 14 candles or at least some of them.

In any case, the problem of replayable streams is quite general, and I believe there should be a general solution for it.