lautis / piperator

Composable pipelines for Enumerators.
MIT License
206 stars 8 forks source link

Feature suggestion: built-in transformations #4

Closed denisdefreyne closed 7 years ago

denisdefreyne commented 7 years ago

Hi!

I wrote D★Stream, which is very similar to Piperator. D★Stream comes with a handful of built-in pipeline transformations, which Piperator does not. D★Stream provides map, zip, scan, with_next, buffer, ….. Would it be useful to integrate these into Piperator as well?

D★Stream’s transformers are almost directly usable with Piperator. For instance, here’s the example from D★Stream’s README using Piperator instead of pure D★Stream:

require 'piperator'
require 'd-stream'

events =
  [
    { id: 40562348, at: Time.now - 400, status: 'new' },
    { id: 40564682, at: Time.now - 300, assignee_id: 2 },
    { id: 40565795, at: Time.now - 250, priority: 'high' },
    { id: 40569932, at: Time.now - 100, status: 'solved' },
  ]

# Monkey-patch DStream transformer to behave like a pipe
class DStream::Abstract
  def call(*s)
    apply(*s)
  end
end

S = DStream

indices = (1..(1.0 / 0.0))

history_builder =
  Piperator.
    # calculate new state
    pipe(S.scan({}, &:merge)).

    # add `version`
    pipe(S.zip(indices)).
    pipe(S.map { |(e, i)| e.merge(version: i) }).

    # remove `id`
    pipe(S.map { |e| e.reject { |k, _v| k == :id } }).

    # add `valid_to` and `valid_from`, and remove `at`
    pipe(S.with_next).
    pipe(S.map { |(a, b)| a.merge(valid_to: b ? b.fetch(:at) : nil) }).
    pipe(S.map { |e| e.merge(valid_from: e.fetch(:at)) }).
    pipe(S.map { |e| e.reject { |k, _v| k == :at } }).

    # add `row_is_current`
    pipe(S.with_next).
    pipe(S.map { |(a, b)| a.merge(row_is_current: b.nil?) })

history = history_builder.call(events)
history.each { |e| p e }

Questions:

denisdefreyne commented 7 years ago

Also worth stating explicitly: I don’t think it makes sense for there to be two gems like Piperator/D*Stream, and I’d prefer to combine efforts into Piperator, since it seems to be more mature.

lautis commented 7 years ago

Some transforms could be included in Piperator itself. The transforms in Enumerable module would be good candidates.

I've toyed with the idea of using the same syntax as Enumerables: Piperator::Pipeline.new.map { |i| i + 1 }. Ideally this implementation would be able to even automatically delegate to Enumerable module. Not sure if that would work in practice.

There are benefits in using the combinators as you've used D★Stream. Looks very much like Rambda :) Having an external gem like piperator-transforms will likely get going faster. Feel free to release a gem with piperator- prefix.

denisdefreyne commented 7 years ago

I haven’t been able to continue much with this, but I did release ddbuffer today, which buffers eumerables/enumerators and is useful for slow sources/sinks:

# Read articles (an Enumerator)
articles = my_web_service.each_article

# Buffer 100 articles at a time
articles = DDBuffer.new(100).call(articles)

# Write buffered articles
articles.each_slice(50) do |slice|
  my_slow_db.insert_articles(slice)
end

Should work smoothly with Piperator out of the box.

lautis commented 7 years ago

Cool! I added ddbuffer (and D★Stream) to the readme.

denisdefreyne commented 7 years ago

@lautis Do you want me to keep this issue open? It might not be directly actionable, so 👍 if you’d prefer to have it closed.

lautis commented 7 years ago

I guess it could be closed. The interoperability with D★Stream is pretty neat.