snowplow-incubator / snowplow-snowflake-loader

Loads Snowplow enriched events from S3 into Snowflake
11 stars 9 forks source link

Transforming via COPY INTO #101

Open dhuang opened 4 years ago

dhuang commented 4 years ago

Snowflake has some capabilities when it comes to transforming during a load. From my very basic understanding of what the transformer does, it seems like much of its logic replaced with a transformation on load, which means we can also directly load data from the enriched files in S3.

Example test file in the enriched file format:

my_app  {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.test/foo/jsonschema/1-0-0","data":{"hello":"world"}}}
my_app  {"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.test/bar/jsonschema/1-0-0","data":{"example":123}}}

I was able to load this into Snowflake with each type of unstructured event in the right format.

CREATE TABLE my_table (
    app_id VARCHAR
    , unstruct_event_com_strava_foo OBJECT
    , unstruct_event_com_strava_bar OBJECT
);

COPY INTO my_table
FROM (
    SELECT 
        t.$1
        , IFF(PARSE_JSON(t.$2):data:schema = 'iglu:com.test/foo/jsonschema/1-0-0', PARSE_JSON(t.$2):data:data, NULL)
        , IFF(PARSE_JSON(t.$2):data:schema = 'iglu:com.test/bar/jsonschema/1-0-0', PARSE_JSON(t.$2):data:data, NULL) 
        FROM @stage/test.tsv t
)
FILE_FORMAT = (
    TYPE = 'CSV' 
    FIELD_DELIMITER = '\t'
);

Open questions

If this is indeed possible, there could potentially just be a single loader step without the transformer Spark job at all?

chuwy commented 4 years ago

Hi @dhuang,

That's a really good idea to explore and I need to admit we always took it as granted that all loaders should use same Analytics SDK transformation, but I still see several problems with this approach (although, I didn't try to get deeply into it yet):

  1. First one is somehow related to above paragraph. We'd need to have a lot of logic inside SQL. Right now its well-tested, battle-proven across all loaders Scala code, which makes it very easy to add additional logic into Transformer. I belive it will be a lot harder to do with SQL
  2. Bad rows. I don't think it will be poissible to emit bad rows that we added in 0.5.0 with SQL approach
  3. Am I right that this approach can work only with static set of schemas? Also I don't see how we could mutate the table when new type is discovered.

At the same time, I also see two big benefits:

  1. Reduced costs. That spark job is far from ideal in terms of optimization.
  2. Reduced delay. Basically, we reduced it by time of running transformer