Netflix / iceberg

Iceberg is a table format for large, slow-moving tabular data
Apache License 2.0
472 stars 59 forks source link

Share A Single File System Instance In HadoopTableOperations #92

Open mccheah opened 5 years ago

mccheah commented 5 years ago

We shouldn't use Util.getFS every time we want a FileSystem object in HadoopTableOperations. An example of where this breaks down is if file system object caching is disabled (set fs.<scheme>.impl.disable.cache). When such caching is disabled, a long string of calls on HadoopTableOperations in quick succession will create and GC FileSystem objects very quickly, leading to degraded JVM behavior.

An example of where one would want to disable file system caching is so that different instances of HadoopTableOperations can be set up with FileSystem objects that are configured with different Configuration objects - for example, configuring different Hadoop properties when invoking the data source in various iterations, given that we move forward with https://github.com/Netflix/iceberg/issues/91. Unfortunately, Hadoop caches file system objects by URI, not Configuration, so if one wants different HadoopTableOperations instances to load differently configured file system objects with the same URI, they will instead receive the same FileSystem object back every time, unless they disable FileSystem caching.

rdblue commented 5 years ago

Can you give an example of properties you're trying to set here?

We can cache file systems in HadoopTableOperations, but most of the systems I've worked on use this pattern of getting the right file system for the URI and using the FileSystem level cache.

mccheah commented 5 years ago

We are experimenting with using Iceberg as a temporary representation of the tables that are backed by our internal data warehouse solution. When we do so, however, we need to put the Iceberg table metadata somewhere. We want to put it on local disk, but when we put it on local disk we need to encrypt it with a one-time encryption key that only exists for the lifetime of the Spark dataset that is being read / written; So for example we're doing something like this:

Key encryptionKey = generateKey();
Configuration conf = new Configuration();
conf.set("encryption.key", encryptionKey.toString());
HadoopTables tables = new HadoopTables(conf);
// create table and insert all metadata
sparkSession.read().option("iceberg.spark.hadoop.encryption.key", encryptionKey.toString()).load(tempTablePath);

In such a case, we don't want the same file system instance - probably a local FS instance wrapped with some encryption layer - to be cached, because every time we run this code we want a different encryption key every time.

rdblue commented 5 years ago

Okay, how about adding the support you're talking about to HadoopTableOperations and opening a PR? That would unblock you because you'd have the caching level you need and we could further evaluate the feature.

rdblue commented 5 years ago

Also, why do all of the properties include "spark"?

mccheah commented 5 years ago

The properties here assume being injected into sparkSession.read.option. If we wanted to include them in the Table properties set instead it should be iceberg.hadoop.

rdblue commented 5 years ago

Properties set through Spark wouldn't need to be specific to Spark. You might use the same ones as session properties in Presto.