reconbot / streaming-iterables

A Swiss army knife for async iterables. Designed to replace your streams.
https://www.npmjs.com/package/streaming-iterables
MIT License
79 stars 8 forks source link

Suggest a different `pipeline` implementation in line with other libraries #285

Open wellcaffeinated opened 2 years ago

wellcaffeinated commented 2 years ago

Currently pipeline does not operate asynchronously and it returns the last iterator.

Other libraries like RXJS have a different implementation that returns a function that can be called to initiate the pipeline.

I'd suggest a new function (let's call it pipe() for now) that operates like this:

const fetchDetailsByType = pipe(
  async type => fetchPokemonByType(type),
  map(async name => fetchDetails(name))
)
for await (const details of fetchDetailsByType('water')){
  //...
}

This allows for pipeline chaining. Eg:

const saveLowHP = pipe(
  filter(details => details.baseStats.hp < 100),
  saveToDb
)

// save low hp water pokemon to disk
pipe(
  fetchDetailsByType,
  saveLowHP
)('water')
reconbot commented 2 years ago

That does seem useful

wellcaffeinated commented 2 years ago

@reconbot I've actually been coding my own iterables library (calling it "cousin-it"). I have slightly different use cases in mind, but there's a lot of overlap. I'd love to join forces if you're interested!

reconbot commented 2 years ago

I'm interested, tell me more!

wellcaffeinated commented 2 years ago

I'm specifically interested in iterables for data analysis and animation (an alternative to d3.js to some extent).

I actually have an animation library called "intween" https://intween.wellcaffeinated.net/ Currently it uses observables (rxjs style) to handle things... but in retrospect it may be better to use async iterables.

My hope with "cousin-it" is to provide some more basic tools for creating and manipulating iterables. Here are some examples:

Generators:

range(0, 10)
interval(1000) // every 1 second.
animationFrames() // request animation frame, and emits timing information
animationFrames(1000) // same with a specific duration

Utilities:

keyed iterators and enter/exit example:

const update = pipe(
  keyBy((data) => data.name),
  enter(([data, key]) => {
    const element = renderPokemonView(data)
    container.appendChild(element)
    return element
  }),
  exit(async ([x, div]) => {
    // fade out and remove
    div.style.opacity = '0'
    await delay(500)
    container.removeChild(div)
  }),
  consume
)

await update( fetchMorePokemon() )

Also the idea of channels for parallel piping.

const result = pipe(
  channels(
    pipe(
      map(n => n * n),
    ),
    pipe(
      map(n => -n)
    ),
  ),
  merge,
  collect
)( range(0, 10) )

// result -> [0, 0, 1, -1, 4, -2, ...] 

I also have ideas with regards to channels that could be like:

send('channel-name', iterable)

for await (const data of receive('channel-name')){
   // data from "iterable"
}
reconbot commented 2 years ago

I've primarily used this for ETL and bulk processing. But also the standard libraries for streams is quite lacking but they all can become async iterators. Channels is a pretty cool concept. I'd love to see it work across worker thread boundaries.

wellcaffeinated commented 2 years ago

yeah i'm taking queues from signal processing (like audio processing). Being able to split data, send data to a different receiver, transform data, schedule, and aggregate data are all useful things

wellcaffeinated commented 2 years ago

If you're interested maybe I can share some of the groundwork I've done so far, and perhaps have sister libraries, or something we both contribute to