Closed mccheah closed 5 years ago
InputFile and OutputFile instances are supplied by a table's TableOperations. That provides a way to supply your own implementations by overriding the data source used by Spark.
We use MetacatTables to connect to our metastore instead of HadoopTables or HiveTables. The default Iceberg source for DSv2 uses HadoopTables (and will be moving to HiveTables). To override that, we have a subclass of the IcebergSource called IcebergMetacatSource that overrides findTable(DataSourceOptions)
. The tables returned by that method are loaded by MetacatTables and use MetacatTableOperations, which controls how metadata is committed and how InputFile and OutputFile instances are created.
I'd recommend using the same approach for your integration. The only thing you need to change is to supply a different ServiceLoader config with your source instead of the default.
Also, this will eventually be cleaner when Spark adds catalog support. You'll point your Spark configuration at a catalog implementation directly and that catalog will allow you to instantiate tables with the right TableOperations.
InputFile and OutputFile instances are supplied by a table's TableOperations. That provides a way to supply your own implementations by overriding the data source used by Spark.
The current DSv2 reader and writer implementations don't do that right now though - they primarily use HadoopOutputFile#fromPath
/ HadoopInputFile#fromPath
. The above proposal was to replace those calls, but it sounds like there are other ways to replace those calls with other APIs. (Additionally to do this we would have to make TableOperations
serializable because we would open the InputFile
and OutputFile
instances on the executors - this sounds difficult.)
Additionally the TableOperations
API doesn't clarify f newInputFile(path)
is opening a metadata file versus opening a table contents data file - the TableOperations
implementation might need to configure the returned InputFile
instance depending on if it's reading metadata or reading physical data. It would be beneficial to have TableOperations#readMetadataFile(path)
and TableOperations#readTableDataFile(DataFile)
to make the distinction. It seems like a core Iceberg concept that the metastore location can be distinct from the physical data location.
The default Iceberg source for DSv2 uses HadoopTables (and will be moving to HiveTables). To override that, we have a subclass of the IcebergSource called IcebergMetacatSource that overrides findTable(DataSourceOptions). The tables returned by that method are loaded by MetacatTables and use MetacatTableOperations, which controls how metadata is committed and how InputFile and OutputFile instances are created.
This would require rewriting much of the Hadoop table operations logic, right? Are we perhaps saying that's the right layer of abstraction to work with? We wanted to treat Iceberg as a bit of a black box (see https://github.com/Netflix/iceberg/issues/92#issuecomment-439499151) in that Iceberg is just a storage / organizational layer and it takes care of the conventions of file paths in the metastore and the backing store; all we want to do is change how the bytes are written to some location that Iceberg has selected.
(Comment edited because the thought didn't flow smoothly on the first iteration)
Finally I don't see TableOperations#newDataFile
to open a new table contents data file. So we would have to add that API also. We can close this ticket if we'd like and create another one that outlines all of the APIs that are missing from TableOperations
and also discuss how to make the right components Serializable.
I'm closing this because discussion has moved to the Apache repo: https://github.com/apache/incubator-iceberg/issues/12
It would be useful to allow the custom metadata described in https://github.com/Netflix/iceberg/issues/106 to be consumed by the Spark Data Source. For example, it would be helpful to encrypt the files upon writing them. But different users of the data source will have different ways they would want to use that custom metadata to inform how the data is read or written.
We therefore propose supporting a data source option that service loads an instance of the below interface in the data source reader and writer layer.
Below is an API sketch for such a plugin:
It is difficult, however, to make
IcebergSparkIO
Serializable. Therefore if we're not careful, we would have to service load the implementation on every executor, and that so multiple times. We propose instead to service load a provider class that can be passed the data source options data structure so that the plugin only has to be service loaded once and can be serialized to be distributed to the executor nodes. Therefore we also require the below interface: