transferwise / pipelinewise-target-s3-csv

Singer.io Target for CSV on S3 - PipelineWise compatible
https://transferwise.github.io/pipelinewise/
Other
15 stars 45 forks source link

Data Lake features #14

Open aaronsteers opened 4 years ago

aaronsteers commented 4 years ago

Hello! I'd like to ask if one or more of the following features might be accepted in a PR. These features would enable "data lake" type target, and would address some potential scalability limits in the present implementation.

  1. Dynamic file naming schemes.

    • For example, a s3_key_naming_scheme setting for a 'salesforce' data pipeline might accept values like data/raw/salesforce/{stream}/v1/* - in which case the {stream} text would be replaced with the name of the stream (e.g. "Account", "Opportunities", etc.).
    • By convention, the string would be required to end in * - which would be replaced by the text which is presently used in the target output.
    • I don't recall if targets have access to the name of the upstream tap, but if so, a more advanced version would be: data/raw/{tap}/{stream}/v1/* in which {tap} is replaced with the text salesforce.
  2. Date partitioning.

    • Following a similar example as above, except that the naming scheme would accept values for {yyyy}, {mm}, {dd} - as in: data/raw/salesforce/{stream}/v1/{yyyy}/{mm}/{dd}/*.
    • The date itself would be determined by whichever field is the bookmark/state key.
    • When incoming data crosses over into a different partition (day, month, and/or year), a new file would be started with the contents applicable to that date window.
    • The code would have to throw an error if the bookmark/state key is not a date or datetime type.

The above has recently become relevant to my use cases, as I'm starting new projects which have a long backfill requirement - between 2 and 7 years. The current behavior seems to be that these initial backfills would reach very large size, with one single initial file potentially containing multiple years of data.

aroder commented 4 years ago

@aaronsteers do you still have these needs? I have very similar needs. Did you fork and implement?

aaronsteers commented 4 years ago

Hi @aroder - I do still have the same requirements but our ultimate destination was Snowflake, and since pipelinewise-target-snowflake also stored intermediate files in S3, it made more sense for my use case to expand on that target instead of this one. What I've implemented in a pending PR (https://github.com/transferwise/pipelinewise-target-snowflake/pull/77) for target-snowflake is a file-naming-scheme which accepts the name of the table as an argument:

Essentially the flow is Source ->S3 -> Snowflake, and I retain the S3 files so it's similar enough output to having used this tap directly, while also getting my downstream database populated. (A similar approach should also be doable for Redshift since both platforms recommend ingesting data from S3.)

Another challenge I found as a downside to using the target-s3-csv directly is that there doesn't appear to be a clear way of passing on a catalog to downstream consumers. So, for instance, if I'm pulling from salesforce into S3-CSV, tap-salesforce can auto-detect which keys are primary keys, incremental replication keys, etc. - but once it is landed in S3, my subsequent extract/load from S3->Snowflake would not have the same rich metadata - and I will start from scratch to redefine at least the primary key and bookmark (incremental refresh) keys. I would love to see this target eventually have the ability to retain this upstream metadata and make it easier to chain pipelines downstream.

aaronsteers commented 4 years ago

I have not yet implemented any kind of date partitioning but that's something I would like to eventually revisit.

aroder commented 4 years ago

We use Redshift on top of S3, and we use the Spectrum feature. This allows querying the data in S3 as if it were stored in tables in Redshift. Saves a lot of space and removes the need to make another copy of the data within Redshift.

Controlling the naming scheme of the key is necessary to make this work. Date partitioning is also feature of Spectrum, although not required.

aroder commented 4 years ago

@aaronsteers good point on losing the metadata. The target-redshift tap maintains some of this metadata on the tables it generates in Redshift, but it has some downsides too. Like it can only create tables based on the schema definition, because the properties selected/not selected in the catalog are not available.

So if I have a source table with 200 of columns but only need a handful, all 200 columns are created, and space is allocated to them. But most are null.

My testing so far using this target with a data lake setup results in less data and far fewer columns. So I'm leaning to this setup over using target-redshift

aroder commented 4 years ago

I'd like to follow your lead in https://github.com/transferwise/pipelinewise-target-snowflake/pull/77. Did you leave out the * concept? It is not as intuitive to me as the {token} concept.

koszti commented 4 years ago

just wondering how do you guys feel about CSV in general? Should we consider at some stage supporting parquet and/or avro or basically you can live with CSV and adding the above improvements would solve your problem?

aaronsteers commented 4 years ago

@koszti - I do think CSV (especially when gzipped) has a lot of value. It's perhaps the most universally accepted cloud data format, accepted natively by basically every cloud data platform (Snowflake, Redshift, Spark, and even Pandas).

That said, I think parquet would be a great addition and - perhaps combined with avro schemas - would potentially resolve some of the issues around not retaining robust primary key and data type info when the target needs to be consumed again by downstream pipelines. I don't know much about avro standalone - except that it's row-based instead of column-based and that avro schemas are often used to describe parquet datasets (as well as avro ones).

aaronsteers commented 4 years ago

@aroder - Yes, the spec up above where I proposed a value like data/raw/salesforce/{stream}/v1/* was a pretty raw spec meant more for discussion. Rather than support the "*" wildcard, the target already had a timecode I could use to ensure uniqueness of the files.

What I found when working on the snowflake target was that to fit into the existing paradigm, the best solve was a combination of a s3_path_prefix (required) and a s3_file_name_scheme (optional). Internally, we will concatenate them, but it allows existing users to keep getting the same behavior while allowing the extra customization for those who want it.

I did not implement a {token} option which would (theoretically?) be guaranteed to be random, and I did not inspect the catalog or emitted events to parse out a {tap} variable (like salesforce). Instead, it's expected that the files are already landing in a prefix designated for that source, and then we are just providing additional capability to parameterize the subfolder/file naming based on stream/table name.

aroder commented 4 years ago

I submitted #19 if you want to review.

I took a similar approach to where you landed @aaronsteers, of keeping backwards compatibility.

@koszti I like the idea of other formats, especially if the target-s3 is doing the heavy lifting. I like what Aaron was talking about with maintaining the metadata. I think finding an optimal format (Parquet + Avro sounds nice) and making that an option would be a great feature. But CSV is very workable too

Our data lake configuration does not assume that (because I never considered it)--we took the approach of just dump the data into a raw bucket, then do some cleanup and put it in a prepped bucket. Redshift queries the prepped bucket data directly using Spectrum, without actually copying the data into Redshift.

aroder commented 4 years ago

Some reference material for parquet format https://github.com/mirelagrigoras/target-parquet

koszti commented 4 years ago

@aroder, PR #19 has been merged and now you can use it from master. 🙇

Do you want to address some more items from this thread or it's fine for now so we can release a new version to PyPI and can link to the main PPW repo? Which option do you prefer?