thbar / kiba

Data processing & ETL framework for Ruby
https://www.kiba-etl.org
Other
1.75k stars 87 forks source link

Add support for aggregating transforms #57

Closed thbar closed 6 years ago

thbar commented 6 years ago

See #53.

As seen in this StackOverflow question & other similar situations, it can be helpful to ensure one can aggregate rows together.

This PR introduces an optional #close method on transforms, which can either:

Because of the yielding support, you must use the new StreamingRunner for this to work:

extend Kiba::DSLExtensions::Config
config :kiba, runner: Kiba::StreamingRunner

The bundled AggregateTransform provides an example of use:

class AggregateTransform
  def initialize(aggregate_size:)
    @aggregate_size = aggregate_size
  end

  def process(row)
    @buffer ||= []
    @buffer << row
    if @buffer.size == @aggregate_size
      yield @buffer
      @buffer = []
    end
    nil
  end

  def close
    yield @buffer unless @buffer.empty?
  end
end
ttilberg commented 5 years ago

Hey @thbar, I've been seeing some recent activity with you mentioning Kiba 3 plans. Do you still have intention to officially release Kiba 2.5 with this feature? Or will it be waiting for the full 3.0 release? I've been looking forward to using this, but don't want to target unofficial releases. Cheers!

thbar commented 5 years ago

Hi @ttilberg! Thanks for asking!

I'll release aggregating transforms as part of Kiba 2.5.

I'll make an official release shortly!

thbar commented 5 years ago

@ttilberg it's ready - Kiba v2.5.0 is out. Happy coding!