Effect-TS / effect

An ecosystem of tools for building production-grade applications in TypeScript.
https://effect.website
MIT License
7.02k stars 221 forks source link

`Stream.zip` for more than two streams #3411

Open nounder opened 1 month ago

nounder commented 1 month ago

What version of Effect is running?

3.6.0

What steps can reproduce the bug?

import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
    Stream.broadcast(3, 10),
    Stream.map(([s1, s2, s3]) => ({
        n1: s1,
        n2: s2.pipe(Stream.map((v) => v * 2)),
        n3: s2.pipe(Stream.map((v) => v * 3)),
    })),
    Stream.flatMap(({ n1, n2, n3 }) =>
        Stream.zip(n1, n2, n3), // TYPESCRIPT REPORTS: Expected 1-2 arguments, but got 3. ts (2554) [11, 22]
    ),
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)

What is the expected behavior?

Passing more than 2 streams to Stream.zip works per documentation :

Zipping is a process of combining two or more streams to create a new stream by pairing elements from the input streams.

What do you see instead?

Expected 1-2 arguments, but got 3. ts (2554) [11, 22]

when passing more than 2 streams to Stream.zip

Additional information

No response

nounder commented 1 month ago

Here's workaround using Effect.all:

import { Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
    Stream.mapEffect((v) =>
        Effect.all(
            {
                n1: Effect.succeed(v),
                n2: Effect.succeed(v * 2),
                n3: Effect.succeed(v * 3),
            },
            { concurrency: "unbounded" },
        ),
    ),
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)

Unfortunately, this makes consuming stream slower because it has to wait for all effects to finalize for each element.

mikearnaldi commented 4 weeks ago

Seems like we are missing a Stream.all