thbar / kiba

Data processing & ETL framework for Ruby
https://www.kiba-etl.org
Other
1.75k stars 87 forks source link

If extractor returns an instance of Enumerable instead of yielding items, the ETL pipeline stops #92

Closed vfonic closed 4 years ago

vfonic commented 4 years ago

(I'm not sure whether this should be in SO or here.)

Is there a way to allow:

def each
  items
end

instead of:

def each
  items.each do |item|
    yield item
  end
end

?

Would that make sense for the gem to support?

thbar commented 4 years ago

Hello @vfonic!

No, there is no way to allow that exactly as you write it. The semantic for the sources is that it must yield, the gem will not change that behaviour short term on that!

One thing you can do to get roughly what I believe you are looking for, though, is:

def each
  yield items
end

Then use kiba-common's EnumerableExploder (https://github.com/thbar/kiba-common#kibacommontransformsenumerableexploder) to extract elements from your items:

require 'kiba-common/transforms/enumerable_exploder'

transform Kiba::Common::Transforms::EnumerableExploder

In some cases, maybe you'll want to use the Enumerable source (https://github.com/thbar/kiba-common#kibacommonsourcesenumerable), depending on how convenient it is!

Let me know if this is good enough for your situation!

vfonic commented 4 years ago

Thanks for such a quick reply!

I guess I'll just stick with

def each
  yield items
end

I've had couple of times when I needed to chain multiple extractors. I understand it might be an anti-pattern to have that functionality, but that functionality would solve my issues. :D

Thanks again!

thbar commented 4 years ago

I've had couple of times when I needed to chain multiple extractors. I understand it might be an anti-pattern to have that functionality, but that functionality would solve my issues. :D

Just some feedback on that: it is not an anti-pattern necessarily actually. A next version of Kiba may merge the concepts of source/transform/destination into a single entity, taking input and output streams (with empty input for source, and empty output for destinations).

This is a bit far away in the future, but I still wanted to let you know you are not necessarily off-track :smile:

vfonic commented 4 years ago

Heads up. @thbar seems like

def each
  yield items
end

doesn't work. At least not in class transformer.

This still works:

def each
  items.each do |item|
    yield item
  end
end
thbar commented 4 years ago

@vfonic I'm a bit confused: I thought you were referring to sources. Sources can implement each.

Transformers on the other hand can only implement process (documentation).

Can you clarify if you are using a source or a transformer?

thbar commented 4 years ago

Also it is worth noting if you want to use a source component (implementing each) as a transformer, you can do that by leveraging this adapter:

https://github.com/thbar/kiba-common#kibacommontransformssourcetransformadapter

vfonic commented 4 years ago

@thbar ah, you're right. I'm using source class, not transformer class.

Still, using source class, the yield items doesn't work.

Let me know if it works for you and I'll try to recreate a small reproducible example.

thbar commented 4 years ago

I made a little repro but it works on my case at least:

class YieldingSource
  def initialize(items:)
    @items = items
  end

  def each
    yield @items
  end
end

Then used as:

source YieldingTransform, items: (1..10)

transform do |r|
  puts r
  r
end

(in that case r will contain the whole (1..10) range as a single object, but you can use the EnumerableExploder if you need to split the range into individual items).

So please post a repro indeed, maybe something will stand out, it should work in general!

vfonic commented 4 years ago

Ah! That's what I was missing EnumerableExploder. I can't modify the Kiba.parse/Kiba.run part of the script, as I have many ETL pipelines using the exact same code so it will break too many things. I'll just stick to doing items.each { |item| yield item } for now.

thbar commented 4 years ago

Understood! Yes in that case it will be better! Good luck with your implementation!

ttilberg commented 4 years ago

@vfonic:

I've had couple of times when I needed to chain multiple extractors.

No problem! You can define multiple sources in the job:

source CsvSource, filename: 'one.csv'
source CsvSource, filename: 'two.csv'

transform etc

Or more dynamically:

Dir.glob('incoming/**/*.csv').each do |file|
  # I prefer using do/end even if it's only one line, since it squeezes `source()` to the beginning of a line
  source CsvSource, filename: file
end

or a real example:

required_scrapes = HTTParty.get 'https://internalapi.example.com/clients/4163/scrapes.json'

required_scrapes.each do |scrape|
  query = PricingDataQuery.new(scrape).build
  source ETL::SqlSource, dataset: DB::PricingData[query]
end

After creating these examples, I noticed your last post about

I can't modify the Kiba.parse/Kiba.run part of the script

Perhaps you were in fact only wondering if Kiba could be tweaked to supported def each; yield things; end (I've often wondered the same). But if you do still have a problem to solve, and can't change the job definition as above, you could change the source class to be more amenable (though I think you've already got what you need). The classes you write can be very flexible, using recursion, or referencing other sources (i.e. building aggregate sources from other more specific ones).

edit: Well that didn't age well! This conversation wrapped up while I was click clacking away!

vfonic commented 4 years ago

:joy: Thanks for the detailed reply nevertheless!

Here's how my ETL setup looks like (it's heavily stripped down from the full example):

# frozen_string_literal: true

class CrawlerEtlJob < ApplicationJob
  def perform(record_type) # rubocop:disable Metrics/AbcSize, Metrics/MethodLength
    extractor, transformers, loader = Etl::Factory.get_etl_for(record_type)

    Kiba.run(
      Kiba.parse do
        extend Kiba::DSLExtensions::Config

        source(extractor, crawler: crawler)
        transformers.each { |transformer| transform(transformer, crawler: crawler) }
        destination(loader, crawler: crawler)
      end
    )
  rescue StandardError => e
    Rollbar.error(e)
  end
end

And here's how I solved chaining the sources:

# frozen_string_literal: true

module Etl
  module Extractors
    class EventsMergerExtractor
      def initialize
        @original_extractor = OriginalExtractor.new
      end

      def each
        items = @original_extractor.each.to_a

        items = do_some_black_magic_to_merge_events_as_needed(items)

        return items.to_enum unless block_given?

        items.each do |item|
          yield item
        end
      end
    end
  end
end

Luckily I know that total number of records is about 5k and they are relatively small so they will (hopefully) always fit in the memory. Otherwise, I'd have to think of something else.

Basically, what I'm doing here is detecting duplicate entries, based on some fields, but then I have to merge the rest of the fields, for example: timestamp or similar.