MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.67k stars 458 forks source link

S3 Sources Epic #4914

Closed quodlibetor closed 1 year ago

quodlibetor commented 3 years ago

Overview

There are two primary use cases for S3 sources, one for customers and one for internal use:

These use cases have implications for how to prioritize work and features.

For both ETL and Persistence:

However, there are some features that have exactly inverse requirements for ETL'd data vs Source Caching

For ETL'd data:

For Source Caching:

Checklist of all features

S3 is overall a huge API surface, and in some sense it will probably always be an evolving target.

Version 1.0

Features for v1, shipped as separate work, in order:

Future Work/open issues

Open questions

Out of Scope for now

V1 Design

Syntax

Since we need to support a large number of objects and listening to bucket updates the creation syntax should be designed for that OOTB -- there's not much point in having separate syntaxes for single-object vs multi-object APIs. Additionally, this should align with the syntax we eventually provide for multi-file sources, and the syntax that we use for other AWS credentials (e.g. kinesis).

The parts of this syntax will be implemented over time, not all at once:

CREATE [MATERIALIZED] SOURCE <name>
FROM S3
OBJECTS FROM (SCAN BUCKET '<bucket_name>'| SQS '<queue_name>' | TABLE '<relation_name>')...
[MATCHING '<pattern>']
[ORDER BY OBJECT KEY]
[WITH ([<aws_credentials_options>])]
FORMAT <specifier>;

With two interesting things to call out:

See "Feature Details" below for various examples.

Semantics

AWS-imposed constraints

There are some interesting limitations in what we are able to do, based on what APIs AWS provides.

Feature: downloading objects

I'll describe what happens for various source declarations here, to describe how materialized behavior changes as more features are demanded. This is the base case.

The simplest case is just requesting a list of files. There are several distinct possible cases here, but we will handle them all the same:

CREATE SOURCE datalake FROM S3 OBJECTS FROM SCAN BUCKET 'bucket' [MATCHING '<glob>']
FORMAT TEXT;

The cases cases for <glob> are:

  1. Omit MATCHING entirely, or specify all objects as: **
  2. single object: financials.csv
  3. prefix of objects: financials/**
  4. glob pattern: financials/*.csv
  5. prefix glob pattern: financials/**/*.csv

For each of these we will follow the same procedure:

Unknown: Is it possible for ListObjectsv2 to repeat an object in the same series of calls, i.e. using the opaque continuation token provided by the ListObjectsV2 response. Hopefully the new "strongly consistent" ListObjects announced today means that this is not a concern.

Feature: ordered reads

Syntactically this is specified by requesting an ORDER BY OBJECT KEY

CREATE SOURCE datalake FROM S3
OBJECTS FROM SCAN BUCKET 'example'
ORDER BY OBJECT KEY
FORMAT TEXT;

The default is to read objects in an undefined order. Without an ORDER BY clause this does not even guarantee that the objects will be read in the order that they are returned by S3's ListObject.

If an ORDER BY OBJECT KEY clause is present then all rows are guaranteed to be sent into the dataflow layer in both object and row order. ListObjectsV2 is guaranteed to return objects "in UTF8 order", but we may parallelize downloads without this specified.

The space for optimization/parallelization to help improve that is complex, and we probably shouldn't promise a huge amount there.

The initial strategy will be to collect everything from the initial ListObject calls from the queue mentioned above into a sorted collection, and then to send those into the download pool. Potentially the first iteration will have only a single download task to ensure order, but using a more complex struct that includes ordering and object information in it may allow parallel downloads.

Feature: ingesting new objects after initial import

Adding a OBJECTS FROM SQS '<queue-name>' will cause us to subscribe to "queue-name" for updates to the bucket. That SQS queue can populated either directly or via an SNS configuration. See the S3 Event Notifcations Guide for details.

We will only act on s3:ObjectCreated:* events until we have a story around deleted and updated objects.

CREATE SOURCE datalake FROM S3 OBJECTS FROM SCAN BUCKET 'example', SQS 'sqs-queue'
FORMAT TEXT;

This will cause us to perform a ListObjects operation, and once all of the objects have been ingested we will start reading object IDs from the provided notifications SQS queue.

They will feed into the same "download objects" queue internally, so that deduplication can piggy back on any state that may need to be maintained for in-order traversal, and so that the object glob filtering will still be able to be applied at the same place.

Aside:

This syntax is designed to be extended to include other sources of object names, most generally:

OBJECTS FROM RELATION <table>

Although we can imagine something intermediate like sqs (not S3 notifications), kafka, or a file, e.g.:

OBJECTS FROM KAFKA <spec>

There are some finicky parts to this:

Detailed discussion in #5502

Future Work

Supporting an explicit catalog of objects

It would be better for S3 and related (e.g. filesystem) APIs to have as their underlying abstract a table-valued function that can convert object identifiers into a collection of rows. This is generally covered by the OBJECTS FROM RELATION syntax described above.

Prefix-sharding optimization

Potentially long term using Prefix + Delimiter will allow us to shard multi-thread our ListObject calls.

https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#AmazonS3-ListObjectsV2-response-Delimiter https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_Example_7

benesch commented 3 years ago

Ooh, nice! One remaining question I have is how we plan to listen for new objects. IME the ListObjects request gets expensive if you poll at a high frequency—it's also just pretty slow for buckets with a lot of objects—but could maybe work as a first implementation. I assume the notification_sqs_queue is part of the long-term answer here, but is that sufficiently consistent? (In other words, can you "line up" the ListObjects request with the SQS subscription?)

And what happens if an object is deleted from the bucket?

Also the connector/format/envelope setup is continuing to show its brittleness here. The syntax you've proposed doesn't support customizing the delimiter, nor does it permit Avro OCF files. Don't really have any good suggestions for fixing that, I'm afraid. :/

umanwizard commented 3 years ago

@awang - you should take a look.

umanwizard commented 3 years ago

@benesch We are simply using the wrong abstraction right now for finding file delimiters. We need to make it some backflow from the decoding/envelope layers, rather than hardcoding it to be some string pattern like newlines. Your ideas around user-defined sources are extremely interesting in this respect.

For now, we don't need solving that problem to be in-scope for the S3 works, since the same problem exists for files. Thus, we are not making the unorthogonality any worse with this work.

I would like to treat S3 sources as largely the same as files, and support the same API for both. That way, we can solve both problems at the same time.

benesch commented 3 years ago

I would like to treat S3 sources as largely the same as files, and support the same API for both.

Are you proposing adding a DELIMITER '\n' option, like we have for file sources?

umanwizard commented 3 years ago

AFAIK we don't have such a thing now, and I wouldn't propose adding it, because I don't think it's the correct abstraction.

quodlibetor commented 3 years ago

One remaining question I have is how we plan to listen for new objects. IME the ListObjects request gets expensive ... [is] notification_sqs_queue sufficiently consistent?

Yes you're correct that listobjects is way too expensive for us to use it as a way of tracking new objects. @elindsey pointed out that even using listobjects to ... list objects ... can be slow enough for large buckets that it becomes a major bottleneck.

And what happens if an object is deleted from the bucket?

We don't support deletion of records by way of deletion of objects (adding that to the main description). Otherwise I think that this counts as part of error handling?

Also the connector/format/envelope setup is continuing to show its brittleness here.

Strong agree. I also agree with Brennan that we should be trying to treat these as similarly as possible to files.

awang commented 3 years ago

1) re: polling frequency -- sqs/sns/kinesis is probably the proper answer (especially for large data volumes), but if we decide to go with polling, i propose making it user-customizable, with a default of 1hour (making that number up). the data volumes of our initial beta customers will probably drive this -- although, using ListObjects sounds handy for test/dev scenarios, where a user can just point us at the S3 bucket without needing to configure yet another thing

2) re: if an object is deleted from the bucket -- for simplicity, i propose ignoring that event

3) re: the DELIMITER '\n' option: what's the standard way of doing it in Postgres? Surely this must be a solved problem, either natively within Postgres or via a 3rd-party tool

andrioni commented 3 years ago

One possible suggestion for defining multi-object sources is to use manifest files, like Redshift does for COPY.

quodlibetor commented 3 years ago

I've updated the issue description:

Also AWS's announcement of S3 strong consistency is.. extremely well-timed, for me.

@awang

if we decide to go with polling, i propose making it user-customizable

I agree, there has been a bit of discussion of whether it should be an explicit REFRESH command, or built-in interval polling.

re: if an object is deleted

I've made this explicit in the top comment

re: DELIMITER '\n'

I believe that this is in reference to how we handle multiple JSON objects in the same S3 object, which we currently require to be newline-delimited. I'm not certain of how postgres handles this, but my understanding is that it accepts a much smaller set of input formats than we do (mostly CSV and formats that postgres/psql itself writes).

awang commented 3 years ago

@quodlibetor what's the status of this? is there an mvp we can try out?

quodlibetor commented 3 years ago

@awang the attached PR has the first step (a single file) I hope to have reading an entire bucket up in a couple days. Getting testing for this in CI is a little complex, I might get full-bucket reading working first before I get good test coverage.

quodlibetor commented 3 years ago

Actually @awang #5202 supports downloading all objects in a bucket that match a pattern (i.e. it implements the OBJECTS 'path/prefix/**/2020/*' syntax, but it needs more testing.

quodlibetor commented 3 years ago

I've updated the syntax and commentary in the description to reflect new syntax that @benesch and I came up with:

CREATE [MATERIALIZED] SOURCE <name>
FROM S3 BUCKET '<bucket>'
OBJECTS FROM (SCAN | SQS '<queue_name>' | TABLE '<relation_name>')...
[MATCHING '<pattern>']
[ORDER BY OBJECT KEY]
[WITH ([<aws_credentials_options>])]
FORMAT <specifier>;

For reference, the old syntax looked like:

CREATE [MATERIALIZED] SOURCE <name>
FROM S3 BUCKET '<bucket>' OBJECTS '<pattern>'
[ORDER BY OBJECT KEY]
[OBJECTS FROM SQS '<queue_name>']
[WITH ([SNAPSHOT], [<aws_credentials_options>])]
[WITHOUT (SNAPSHOT)]
FORMAT <specifier>;

The main advantage of this new syntax is that it makes it more clear that the objects are listed and then filtered, and also makes the snapshot behavior more obvious SCAN is a required part of the syntax, but can be ommitted instead of adding an explicit WITHOUT SNAPSHOT clause.

For example, this is now how you'd write "scan the bucket for json files and also listen for new json files in a queue named 'notifications'":

CREATE SOURCE example FROM S3 BUCKET 'example'
OBJECTS FROM SCAN, SQS 'notifications' MATCHING '**/*.json'
awang commented 3 years ago

@andrioni (and anyone else following along this thread) -- it is now possible to try out s3 sources via experimental mode

nchammas commented 3 years ago

There are two primary use cases for S3 sources, one for customers and one for internal use:

  • For customers, we would like to be able to ingest ETL'd data so that they can join their datalakes with data from their live DBs

I'd like to also propose another customer use case for the (distant?) future: I want Materialize to be my batch ETL engine. e.g. I have a stuffy batch-updated data lake on S3 and just want a better way to manage it. This will require S3 sink support. Dunno how this fits into the vision for Materialize (do you want it to address the same space as Apache Spark?) but there it is.

CREATE [MATERIALIZED] SOURCE <name>
FROM S3 BUCKET '<bucket>'
OBJECTS FROM (SCAN | SQS '<queue_name>' | TABLE '<relation_name>')...
[MATCHING '<pattern>']
[ORDER BY OBJECT KEY]
[WITH ([<aws_credentials_options>])]
FORMAT <specifier>;

Side question about <aws_credentials_options>: I'm not familiar with Rust or the Rust AWS SDK you may be using, but will this support the default AWS credential chain? Ideally, users should not have to specify this section and still automatically get the full walk through of the credential chain, like with the Python and Java AWS SDKs. This would include things like the environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY), the shared credentials file (~/.aws/credentials), and the EC2 metadata service.

Supporting an explicit catalog of objects

It would be better for S3 and related (e.g. filesystem) APIs to have as their underlying abstract a table-valued function that can convert object identifiers into a collection of rows. This is generally covered by the OBJECTS FROM RELATION syntax described above.

Another forward-looking comment: Depending on how close to the Hadoop ecosystem you want to take Materialize, it could potentially be a great integration point to have Materialize know how to talk to the Hive Metastore. So many businesses already have their data lakes registered in the Metastore (or a wrapper thereof, like AWS Glue), which gives them that mapping from logical name (e.g. product.sales) to a list of partitions on HDFS/S3.

quodlibetor commented 3 years ago

Ideally, users should not have to specify this section and still automatically get the full walk through of the credential chain

Yup! This is currently how it works: region is required, but if other credentials are omitted then we walk the credentials chain.

it could potentially be a great integration point to have Materialize know how to talk to the Hive Metastore

Interesting! I think that this can be paraphrased as "it would be great to be able to use the hive metastore as a manifest source", right? I'm not familiar with the metastore, I suppose it must know bucket names as well as object names.

If that's a reasonable point of comparison, then I think it might make sense for us to be able to have a separate "metastore" source, which would enable:

> CREATE SOURCE hive_metastore AS ...;
> CREATE MATERIALIZED VIEW interesting_tables AS SELECT object_key FROM hive_metastore hm WHERE hm.table_name = 'product.sales';
> CREATE S3 SOURCE product_sales OBJECTS FROM VIEW interesting_tables ...;

If so, that suggests we should flip some syntax around and for consistency's sake make the bucket an argument to SCAN:

CREATE S3 SOURCE <name> OBJECTS FROM [SCAN '<bucket_name>'... | VIEW 'view_name' | ...]...

Where the OBJECTS FROM target can be specified multiple times, so this would be valid:

CREATE S3 SOURCE unified OBJECTS FROM SCAN 'datalake', SCAN 'one-off-bucket', VIEW interesting_tables;
quodlibetor commented 3 years ago

Also AFAIK S3 sink support is very much on the horizon, it's still got some design work, but it should tie in nicely with our internal S3 persistence work.

nchammas commented 3 years ago

Interesting! I think that this can be paraphrased as "it would be great to be able to use the hive metastore as a manifest source", right? I'm not familiar with the metastore, I suppose it must know bucket names as well as object names.

I don't want to derail this thread, because I think this perhaps merits its own discussion. But just to provide a perspective as a heavy Apache Spark user:

Instead of "manifest source" I think of "pluggable catalog". Perhaps they are the same thing, but I would describe the interface a bit differently than you did earlier:

It would be better for S3 and related (e.g. filesystem) APIs to have as their underlying abstract a table-valued function that can convert object identifiers into a collection of rows.

A catalog API would convert object identifiers (like product.sales) into a detailed description of the data source: the schema, the storage format, and paths to the data. Materialize would use that information to understand how to locate and query the table.

In other words, the catalog API would provide Materialize with the same kind of information it gets from a manually user-crafted CREATE SOURCE statement.

I'll express the idea using some pseudo-SQL:

> SELECT * FROM product.sales LIMIT 100;
ERROR: No such table: product.sales
> CONNECT CATALOG hive <hive_connection_parameters> AS lake;
> SELECT * FROM product.sales LIMIT 100;
ERROR: No such table: product.sales
> -- I need to look up this table in the correct namespace, which is "lake".
> SELECT * FROM lake.product.sales LIMIT 100;
+----+-----+                                                                    
|item|price|
+----+-----+
...
> -- I can now query any table or view registered in the Hive catalog I just connected to.

Does this make sense for Materialize? I don't know. But if you're interested in exploring this idea further, you might want to look at how Spark and Trino (formerly Presto SQL) support multiple external catalogs.

quodlibetor commented 3 years ago

Ah that's super neat, and way more sophisticated than I was imagining. I can 100% imagine this fitting into Materialized, the ergonomics could be fantastic. I don't think that we have any prioritization around external catalogs yet, but it definitely makes sense as something to design for with all the work we're doing on our internal catalogs in general.

quodlibetor commented 3 years ago

Relabelling this as not a milestone blocker (because the part that was a blocker has been released) and unassigning myself because all work is in sub-issues.

benesch commented 1 year ago

Closing as stale, as S3 sources were removed with the transition to platform last year.