airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.53k stars 4k forks source link

Add option to not delete staging data using snowflake/redshift destination #4030

Closed marcosmarxm closed 2 years ago

marcosmarxm commented 3 years ago

Tell us about the problem you're trying to solve

As a user I want to keep the staging data transferred to S3/GCS for future usages (change the datawarehouse, use Athena)

Describe the solution you’d like

Add an option to select (delete/keep staging data) when creating the destination with the S3 staging option.

For this to be useful, the output path persisted by the connector needs to be predictable so users can consume it reliably. For example, it needs to have a pattern like:

s3://<bucketname>/<some_prefix>/<source_data_namespace_name>/<table_name>/*files*

See the S3 destination connector docs for an example of what this path should look like.

Implementation steps

There are roughly two stages to implementing this:

  1. [ ] Instead of re-inventing the wheel w.r.t output paths and whatnot, refactor the redshift destination to use the S3 CSV writer under the hood (S3CsvWriter.java in the io.airbyte.integrations.destination.s3.csv package) - https://github.com/airbytehq/airbyte/issues/8550
  2. [ ] Add and leverage a boolean flag to the connector configuration which configures whether or not the data is retained after a sync to redshift. - https://github.com/airbytehq/airbyte/issues/8552

Start by doing this for the Redshift destination only. We can get to Snowflake afterwards.

Please create GH issues for both of these and add them to the upcoming ConnCore milestone for clearer tracking of the components of this issue.

Refactoring the redshift destination to use the S3 destination code

  1. Rename and @Deprecate the existing S3StreamCopier and S3StreamCopierFactory classes to indicate that this implementation should no longer be used. The reason we're renaming and deprecating instead of just refactoring the class itself is because that class is used by other destinations like Snowflake etc.. so we are trying to limit the scope of the connectors we're changing here.
  2. Create a new class S3StreamCopier which leverages the S3 class S3CsvWriter under the hood. See the DatabricksStreamCopier class for an example of what reusing an S3Writer looks like. That example uses Parquet but it should be very similar to what you will need to do here. You should take care to ensure that your implementation is functionally equivalent to the deprecated class in step 1, apart from re-using . Take care to write unit tests for this class.
  3. Refactor the RedshiftStreamCopier class to use the class you created in step 2. Also write unit tests for this class as needed. once again, apart from filepaths, it should function the same as the previous implementations.
  4. Test and publish the redshift copy destination.

By the end of this step, staging files should now be output in a filesystem path equivalent to the one used by the S3 destination. Apart from this, no functionality should be changed.

Congrats! You're (logically speaking) halfway there.

Adding and leveraging the boolean flag

Now we need to expose a user configuration which leverages

  1. Expose a boolean option in the spec.json allowing the user to configure. Familiarize yourself with the Connector Developer UX handbook to understand how the title and description properties should be set on this option. After Octavia Squidington III, the UX handbook is a connector developer's best friend.
  2. Pass the parameter throughout the destination, appropriately using it to decide whether or not to delete the staging files. Note that it may be useful to expose a new method in the S3StreamCopier class (and S3CsvWriter) class Map<String, String> getStreamOutputPaths() which returns the paths in which files for different streams are written. This will allow the Redshift destination to be told where the output paths are, instead of assuming them, which is dangerous since a refactor of the S3CsvWriter to write files to a different location would break the Redshift Copy destination.
  3. Write unit tests for all of this functionality.
  4. create a PR, publish, etc...

Additional context

Add any other context or screenshots about the feature request here.

Are you willing to submit a PR?

No

bioticinteractions commented 2 years ago

would be a great feature.

tuttle commented 2 years ago

I'm sure this would be useful for me.

danieldiamond commented 2 years ago

@tuttle please add your 👍 to the issue description. this is a useful measure for the airbyte team to understand user feature requests and i imagine help prioritise

tuttle commented 2 years ago

@danieldiamond thanks Dan :)

ChristopheDuong commented 2 years ago

I have a question for the users following this feature request...

Assuming you had this option to "persist" the staging data with Airbyte, what do you think of the raw tables (_airbyteraw*) that are produced by destinations before being normalized into the tabular format by the normalization process.

We could see those raw tables as an additional staging option inside the final destination too (it's actually the first one we implemented in airbyte), but then wouldn't it be a duplicated artifact if you already had in the blob storage?

Would you want an option to disable the _raw tables then or is there value in keeping both the blob storage + raw tables + final normalized tables? (assuming it wouldn't affect how normalization works)

bioticinteractions commented 2 years ago

for my use case, having it up in s3 means we're able to distribute the data through s3 rather than snowflake. We're finding it would be faster and easier to go through s3 since that is our data lake rather than query a relational database.

as far as having the 'raw' data still in the relational database, that would be fine with me. options would definitely be appreciated: (1) only relational database (2) only blob storage, (3) both relational and blob.

if that's the s3 object would be the exact 'raw' tables in the relational database, thats fine! it wouldnt be optimal, since i would prefer a 'standard' tabular format but thats a trivial problem.

one additional feature could be (that would largely be 'dressing') to customize the s3 key: `/yyyymmddss_replication_namespace/....../table_name_namespace_id.parquet'

but again, no worries on the aesthetics of things. if airbyte just keeps the objects in the namespace for each run, we can deal with that too! thanks

ChristopheDuong commented 2 years ago

for my use case, having it up in s3 means we're able to distribute the data through s3 rather than snowflake. We're finding it would be faster and easier to go through s3 since that is our data lake rather than query a relational database.

one additional feature could be (that would largely be 'dressing') to customize the s3 key: `/yyyymmddss_replication_namespace/....../table_name_namespace_id.parquet'

Are you aware of this destination https://docs.airbyte.io/integrations/destinations/s3 ? Could you give more details on why using a pure blob storage destination such as that one would not be preferable, rather than use a snowflake/redshift destination and keep their staging persisted after the sync?

bioticinteractions commented 2 years ago

Yes, I am aware of the s3 destination, thats what we're doing now, and very helpful! but we have to maintain the s3 and snowflake/redshift 'connections' separately (e.g. changes to tables or replication strategy).

It would be nice if we could do both in one go, it would simplify all of our pipelines. And since airbyte can stage in s3, thought it would make sense to have this feature of persisting in s3.

On Wed, Oct 13, 2021, 12:13 PM Christophe Duong @.***> wrote:

for my use case, having it up in s3 means we're able to distribute the data through s3 rather than snowflake. We're finding it would be faster and easier to go through s3 since that is our data lake rather than query a relational database.

one additional feature could be (that would largely be 'dressing') to customize the s3 key: `/yyyymmddss_replication_namespace/....../table_name_namespace_id.parquet'

Are you aware of this destination https://docs.airbyte.io/integrations/destinations/s3 ? Could you give more details on why using a pure blob storage destination such as that one would not be preferable, rather than use a snowflake/redshift destination and keep their staging persisted after the sync?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/airbytehq/airbyte/issues/4030#issuecomment-942538369, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABHV4YOUEAVGZTOYJFNYU3LUGW44ZANCNFSM46OVJXRQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

tuttle commented 2 years ago

@ChristopheDuong Thanks for asking and sorry for being a bit late. My opinion:

We're going to use Airbyte as the backend machine commanded via API and fulfilling the needs of our clients. Those needs we don't know in advance. As such we would appreciate if Airbyte could be as universally configurable as possible. (Of course to stay visually uncluttered, such advanced settings may be collapsed by default.)

Deleting is cheap and usually could be postponed. Destroying data at the right moment is something our Airbyte-controlling system would be certainly able to do itself (provided it would know the locations). Up to that moment, it could provide access to the data on multiple stages to client too, which is beneficial.

Each input stage contains unique state of matters that could be invaluable for debugging in certain situations. We were all been in such desperate situations sometimes. :-)

All said applies to raw (JSON) tables too. I like them sitting there and could delete them myself.

Thanks again.

tuliren commented 2 years ago

This option is already implemented for the Databricks destination:

UI:

Screen Shot 2021-12-03 at 00 38 45

Option: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json#L144

Code: https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java#L186

It should be relatively straightforward to do.

edgao commented 2 years ago

redshift was completed in https://github.com/airbytehq/airbyte/issues/8552 snowflake was completed in https://github.com/airbytehq/airbyte/issues/8820