MaterializeInc / materialize

The data warehouse for operational workloads.
https://materialize.com
Other
5.68k stars 458 forks source link

Multiple source files for a single "source" #3773

Closed brianbruggeman closed 1 year ago

brianbruggeman commented 3 years ago

Our current system architecture is a mix of Spark batch jobs as well as Kafka streams dumping data into S3. In both cases, we use a variety of formats (gzip, json, json+gzip, parquet, avro, etc.). We generally segregate these files by some sort of directory schema: /version=v.../foo01=bar01/foo02=bar02/foo03=bar03/.../data_file(s) In some cases, especially for development and testing or debugging, it's far better to download several (test) files to simulate some of the data flow. One of the data streams I'm currently working with generates around 250M - 500M (depending on time of day) new rows per hour. I'd like to do several transformations of that data at scale and in some cases, I want to materialize the transformation (e.g. a topK) and make that transformation available through a public REST interface.

I can come up with a simple extraction code to pull data files into a local host, but it seems that (unless I really am missing something) there's a 1:1 relationship between a file and a source (CREATE SOURCE ...). This creates an ergonomic issue where I need to do a bit of extra work to aggregate files locally, and for very large file sets, I run into space issues quickly on my small company laptop of 250gb hard drive. I'd prefer not to have to duplicate the data locally and have Materialize.io ingest from multiple source files for a single "source".

rjnn commented 3 years ago

@brianbruggeman thanks for the issue! A few questions to help us scope the work, and just understand your use-case better:

  1. I'm interpreting your request such that a wildcard based source ("create source foo from file "foo*") would satisfy you. Is this correct?

  2. Are you aware that you can create a source per file, and also create a view that's the union of all those sources, and then treat that view as the "source" for your computations downstream. If you're aware that this is possible, and don't like it, can you say a few words on what's annoying about this? If you are unaware of this, and are now happy, that's a clear documentation todo for us. (Hence the question: which one is it?).

  3. Already on our roadmap is adding support for "S3 sources", including polling the "watch" API so that we can get notifications and incrementally update when new files are added. Would watching a directory under an S3 URL fulfill your need? There's still the question of closing the feature gap on "local files" as well, so I think something like S3 sources as well as doing (1) might be necessary.

Thank you for your feature request! We really appreciate it, and it's very helpful!

brianbruggeman commented 3 years ago

@rjnn Thanks for the ask. I'll answer each individually.

  1. Wildcard ingestion: I think this would definitely satisfy my immediate issue. I think the wildcard would allow me to be more specific about the file system, but I can also imagine a scenario where I've already pulled all of the files I care about to a local host and I just want to ingest an entire folder (or folder tree in some cases), regardless of the file names:
  1. Union Sources: I had not considered (yet) creating a union source. In my current situation, though, I have several thousand smallish (about 5-8MB compressed) gzip files. In total, they represent as much as 35GB or so per hour. While I certainly wouldn't want to download all of those files locally, I rejected the idea initially of generating a source for each of those small files because of name conflicts/convention and the assumption that the CRUD lifecycle is painful to manage with that many sources. Note that in production, for this specific example, I'd simply use a kafka stream output. In evaluating Materialize as a solution, however, I would rather not load a kafka server.

  2. S3: I suspect that the S3 solution you initially create will be inadequate for what I would need. I mention this because I have already needed to generate a DSL that mapped the directory structure version=.../foo01=.../foo02=... so I can search for specific matching instances (e.g. matching across multiple data sources for ingestion times or "most recent" data which is dependent upon the data source itself). That said, if you've already been thinking about this kind of thing, and you've thought about adding some sort of a SELECT expression to match to that directory structure, then that's fantastic and would probably satisfy my needs.

I really appreciate the documentation you have put together so far. I'm evaluating Materialize as an alternative to KSQLdb and Flink.

quodlibetor commented 3 years ago

The syntax described in #4914 is designed to closely mirror what we use for multi-file sources. Long term, we would like to support both glob patterns and feeding an inventory of files/objects into the source.

Potentially with syntax along the lines of:

CREATE SOURCE foo FROM FILES '*/path/*'
[WITHOUT SNAPSHOT] -- whether or not to include the files that currently exist in the path
[FILES FROM RELATION <view-or-source>]  -- include lines from all files that are inserted into the relation
[TAIL [FILES|DIRECTORY|ALL]]; -- watch individual files for append or for newly created files
quodlibetor commented 3 years ago

@brianbruggeman coming back to this, do you think that supporting globs of the form **/foo0{1,2} would be sufficient for your DSL use case? That would match any key anywhere in an S3 bucket that ends with exactly one of foo01 or foo02.

It seems like it might not, if the keys need to depend on some property of the data, but are there other features that we'd need to support your use case?

brianbruggeman commented 3 years ago

Maybe a better example of foo. I hadn't intended foo to be literal.

path/version=10/synced=20210101/processed=20201229/generated=20201228/...

"foo" could really be anything, and the example above isn't directly from a real example, but Spark tends to create s3 keys with the above format when the data is partitioned by specific fields. And in this case, order really matters...

we want version 10 we want the sync of the data to be 20210101 we may not care when it was processed (e.g. processed=*), but we do care that it was generated with a specific date.

All of that is because the various partitions may indirectly indicate algorithm used to generate the data and/or schema needed to process.

quodlibetor commented 3 years ago

Thanks! The requirements that you described seem like they would be match by this glob path/version=10/synced=20210101/processed=*/generated=20201228/... you might need to dynamically generate the patterns when creating sources until we implement FROM TABLE at which you could dynamically INSERT globs or full paths into a table. Does that seem likely to be sufficient?

brianbruggeman commented 3 years ago

Often, I'm looking for the "most recent", so I actually need to collect the keys, sort them, pick the most recent and then apply a glob pattern. In my case, I care more about presenting some data than aligning it perfectly (i.e. my data doesn't necessarily need to "match" perfectly with dates and transformations lining up...). So picking the most recent is "good enough" and makes my system robust enough to handle outages in the data pipeline.

So I would likely need to dynamically generate the path anyway. That said, a glob (with the option of multiple globs... (e.g. version=10/synced=*/processed=*/...) would definitely help.

awang commented 3 years ago

Thanks! Btw, @brianbruggeman have you seen any existing alternative (ie, an application or library) that would have gotten close to allowing you to specify the file matching process you want, and avoid having to write the DSL?

As an example, from what I can tell Snowflake supports regex (https://docs.snowflake.com/en/user-guide/data-load-external-tutorial-copy-into.html), while Flink uses the Factory model: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/sources.html

brianbruggeman commented 3 years ago

@awang I really haven't. In my most recent foray, I ended up building a PEG parser.

benesch commented 2 years ago

Porting a comment from @orent here in (#9793), who would specifically like to use the glob patterns to accept log files:

Instead of just a filename, accept directory path + optional glob or regex pattern.

Some ideas:

  • File ordering configurable by modification time, string sort or string with numbers (equivalent to ordering used by "ls -v"). Also sort by regex capture groups? Support month names?
  • Ensure it supports the rotation and compression practices of common tools, specifically logrotate.
  • Sanity check: maintain list of file names/sizes/timestamps/tail hash/head hash/inode. Identify and report anomalies (changes/appends to files which are lot latest in set, changes other than append, etc).
  • Sanity check: parse some record timestamp formats and verify monotonic or near-monotonic.
  • Sanity check failures configurable to issue warning or pause source.

Supporting file ordering is something we'd likely punt to a downstream view by way of an ORDER BY filename clause or somesuch. Similarly with the sanity checks. I don't think we want to build anything specific into Materialize for handling parsing of log files, but we've talked about introducing a generic CREATE ASSERTION SQL command that would allow you to plug in any sanity check you can describe in SQL.

orent commented 2 years ago

Handling order is, obviously, best handled in a bit of sql.

In true eat-your-own-dogfood fashion, you can implement watching the log directory as a SOURCE that is transformed by a live view into filenames to tail.

How elaborate should sanity checking be? Weird things WILL happen during live tailing and across restarts and users may appreciate well-tested checks that they probably won’t implement themselves.

sploiselle commented 1 year ago

Retiring old issue