datamill-co / target-redshift

A Singer.io Target for Redshift
MIT License
23 stars 17 forks source link

Integrate with RedShfit Spectrum #40

Open moeezalisyed opened 4 years ago

moeezalisyed commented 4 years ago

Integrating with RedShift Spectrum will make this target very efficient.

Use Case If you have large amounts of data that is not accessed frequently, it is more efficient to store it in S3 instead of keeping it loaded in RedShift. When data needs to be queried, it can be queried with RedShfit Spectrum that provides really fast querying by leveraging MPP. In order for this work, the data needs to be stored in S3 in a structured columnar format - I suggest Parquet. Moreover, these files can be gzipped leading to more efficient storage without slowing down query times.

Enhancements RedShfit Spectrum is also able to query nested fields for files that are stored in S3 as parquet (https://docs.aws.amazon.com/redshift/latest/dg/tutorial-nested-data-create-table.html). Looking at the code, seems like that's not something currently provided, so this will be an additional enhancement.

Integration Steps:

  1. Instead of saving the files as CSVs in S3, store them as gzipped parquet files
  2. Create an external table in RedShift that references the parquet file in S3. To define a table, we will need a schema, this can be specified by the user in the target config (along with a table name override).

Changes I foresee

  1. We will need to update target_tools to be able to append each batch of the stream into a parquet file. This is doable using pandas and s3fs. But we can look at other options as well.
  2. After the stream has been processed, we can take the parsed schema and create an external table. Creating the table should be just executing SQL on the cluster. This is already being done from what I can see, but we will need to update it create an external table in the RedShift cluster. (https://docs.aws.amazon.com/redshift/latest/dg/c-spectrum-external-tables.html)

    I think this can make target-redshift very scalable. Infact, we may be able to separate the intermediate code, and create a target-s3-parquet that simply stores the stream as an s3 parquet file (similar to the s3-csv tap).

Talked with @AlexanderMann about this in slack.

AlexanderMann commented 4 years ago

@moeezalisyed thanks for the feature request. This looks to be a really useful set of functionality for Target-Redshift in general.

My inclination with any large feature/rewrite is to make as many small meaningful steps towards the final functionality without breaking things. To this end, I'm pretty excited by the final feature set you detail here as I think there are a LOT of little improvements we can make which will lead to the final solution.

Questions

Before scoping out all of the above, I'm hoping that your experience with Redshift Spectrum can help get me up to speed and I can poke about on some points:

  1. Redshift Spectrum can read from CSV files and non gzipped files as well as gzipped parquet correct?
  2. All of the examples for the S3 data lake referenced seem to point to a subfolder in a bucket. Does Spectrum read all files present in the given subfolder, or do you have to manually tell it about new data placed therein?
  3. The alter table examples for Spectrum all point to the data paths, and whatnot. Am I correct in assuming that things like COMMENTs and column type changes are all still accepted via normal commands?
  4. Most of the headache in dealing with Redshift Spectrum appears to be in getting it setup, making sure IAM permissions are correct, and in getting the external database/schemas in place. Given Target-Redshift expects all of this to be done for regular usage, I'd assume we expect this for the Spectrum use case as well.
    • Are there any commands/issues you can see with us expecting users to set everything up themselves?
    • Will it be an issue for us to expect users to manage their S3 resources themselves as well? (ie, we presently encourage users to setup lifecycle rules to limit their S3 footprint over time etc.)
moeezalisyed commented 4 years ago

@AlexanderMann I agree - it is always great to build up in steps.

For your questions:

  1. Yes - CSV, TSV, Parquet, ORC, JSON, Amazon ION, Avro, RegExSerDe, Grok, RCFile, and Sequence are included. It is able to read the parquet compression type from the extension. You can also supply the compression type directly to the create external table statement (this will take priority over the extension). I suggested gzipped parquet since columnar storage will be way more efficient and gzip is a commonly used compression type with Parquet.

  2. Yes - it is able to read all files in the folder. If you add new files to the folder, you don't need to tell it nor update the table (unless you add new columns). it is able to automatically pick data in new files. If the schema changes - then you would need to update the table.

  3. Similar to RedShift seems like there is no simple way to update column types or alter columns etc. However, I imagine this can be dealt with: create a temp table, dropping, and updating.

  4. I think we will need users to include two more things in the config:

    • An external database
    • An external schema I already played around with this and have a command that can be placed in README to let people know how to set these up. I imagine if users are setting up the RedShift database and schemas already, they should be able to set up the external database and schema as well.

Here is the command:

create external schema <EXTERNAL_SCHEMA_NAME> 
from data catalog 
database '<EXTERNAL_DB_NAME>' 
iam_role 'arn:aws:iam::<AWS_ACCOUNT_ID>:role/<ROLENAME>'
create external database if not exists
  1. I think we can similarly encourage users to manage their S3 lifecycle themselves. i.e if they don't want objects staying in S3 after they don't have a need for it, they will need to take care of it themselves.
AlexanderMann commented 4 years ago

@moeezalisyed sweet, so I think there are some basic steps to make this work.

❓Next question❓: I've been poking around online trying to find a simple small footprint way to write arbitrary nested Python data structures (JSON compatible crap) to Parquet. Seems like the "simplest" method is to install Pandas, Numpy, etc. etc. and then do some jumping between libraries to string this together? Do you know of a an easier/better method to perform this action?

aaronsteers commented 4 years ago

I did a small amount of digging and found an "Arrow" parquet library which can be used as follows:

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

df = pd.DataFrame({'one': [-1, np.nan, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]},
                   index=list('abc'))
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example.parquet')

Two desired improvements from this simple sample are: (1) avoid pandas, which is in-memory (this is a must, since large files will simply fail to load into memory) (2) use a true "streaming" approach (this is a nice-to-have, boosts performance)

I can see getting around at least the first challenge by perhaps using koalas (the big-data version of pandas) or another compatible library.

Hope this helps.

aaronsteers commented 4 years ago

There's apparently also a native koalas to_parquet function, from the koalas docs sample code:

df = ks.DataFrame(dict(
   date=list(pd.date_range('2012-1-1 12:00:00', periods=3, freq='M')),
   country=['KR', 'US', 'JP'],
   code=[1, 2 ,3]), columns=['date', 'country', 'code'])
df.to_parquet(
    '%s/to_parquet/foo.parquet' % path,
    mode = 'overwrite',
    partition_cols=['date', 'country'])

Ref: https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.to_parquet.html

Again, I'd suggest avoid using pandas at all cost, but koalas might work for this use case.

UPDATE: Upon further research, it appears that koalas may require a working local installation of spark, which is most-likely not feasible.

I also found this fastparquet python library but it also requires pandas. https://github.com/dask/fastparquet

It looks like all roads lead to using pandas, unfortunately. And perhaps this is inevitable due to the need to create categories/lookups on source columns during compression. To revise my previous anti-pandas statement, it looks like pandas may be required.

In that case, a mitigation to the scalability limitations of having to load all data into memory might be to support a window-by or partition-by clause which would create a distinct parquet dataset for each day/month/year/region/etc. in order to avoid/resolve failures when the local machine cannot hold an entire recordset in memory at once.

Hope this helps. 😄

UPDATE (# 2): Related discussion here: https://github.com/dask/fastparquet/issues/476

AlexanderMann commented 4 years ago

@aaronsteers I doubt that'll be a problem. We already have all data loaded in memory, and are batching by reasonable sizes, so I think we can handle that.

I also found the above links you're talking about, and had the same 🤦‍♀ moments since it'd be great to have a simple library which writes a data format, but alas...Pandas it is.


So I think the simplest path forward here are a couple of easy prs, and then some more head-scratchy ones. Bonus: some of these can be done in parallel.

  1. :easy: ⚪️change the upload/write path for our data loads to S3 from simple uuid keys to structured paths of: s3://<bucket>/<schema>/<table>/<table-version>/<uuid>
  2. :medium: ⚪️ change the upload format to be gzipped
  3. :easy: ⚪️provide some new keys in the configs etc. for the external database details and other things we'll need.
  4. :medium-hard: ⚪️ probably create a subclassed target which relies on most of the tooling of the base class target-redshift and does all of the minor changes etc. for getting Redshift spectrum commands generated out of regular Redshift compatible sql etc.
  5. :easy: ⚪️ provide some new keys in the configs etc. for writing data as Parquet
  6. :medium: ⚪️write data as Parquet instead of csv if a config flag is set