GoogleCloudDataproc / hadoop-connectors

Libraries and tools for interoperability between Hadoop-related open-source software and Google Cloud Platform.
Apache License 2.0
280 stars 237 forks source link

Read channel is always closed when using PositionedReadable methods #107

Closed sidseth closed 6 years ago

sidseth commented 6 years ago

readFully etc defined in PositionedReadable have a contract to not reset the stream position on invocation, as a result of which the default implementation in FSInputStream performs a seek back to the original position after the read.

GoogleStorageReadChannelImpl ends up closing the stream after every such read as a result of the backwards seek. The same is also true for GoogleHadoopFSInputStream - since it relies on the position of the underlying channel.

Each readFully operation, no matter how large, seems to be taking between 100-250ms when running on a GCP uswest instance (8 cores) against Regional uswest Google Storage. This can add up quite fast for storage formats like ORC which make extensive use of calls from PositionedReadable. The 100-250ms latency is also something I'd like to investigate - independent of this issue.

steveloughran commented 6 years ago

Can I join in and note that seek() performance is critical for columnar datasets like ORC and Parquet. improving seek perf by eliminating the need to abort and re-initiate HTTP connections through various means (lazy seek, adaptive read range, ...) the key way to improve input performance on the primary storage formats in use today

medb commented 6 years ago

@steveloughran GCS connector already uses lazy seek (it has some bugs that I'm going to fix in coming days though), but may you elaborate "adaptive range read" technique, what serves as a signal for it, i.e. based on what range reads are "adapted"?

steveloughran commented 6 years ago

S3a did its lazy seek in HADOOP-12444; HADOOP-13203 added the ability to declare that a filesystem instance was for random data, by setting the fs.s3a.experimental.fadvise to one of "normal", "sequential" and "random". Normal and sequential assume you want to read the entire file, so do a GET pos-EOF. Random IO only does a GET of max(bytes-requested, readahead-range) ,which delivers best performance for ORC/Parquet data

Azure wasb optimised for random IO in HADOOP-14535: assume sequential until the first backward seek(), at which point they assume random IO. S3A picked up that idea from in HADOOP-14965

this leaves the two stores input policies similar: as soon as there's a backwards seek(), abort the stream and switch into random IO mode. This matches the base IO pattern of SequenceFile, ORC and Parquet IO, where a footer near the EOF is read, then you go back to the first stripe in the file with columns you care about, read the summary there, maybe go backwards to read some actual column values, or go on to the next stripe....

medb commented 6 years ago

Thank you Steve, this is a very helpful explanation!

I have tested adaptive range reads approach and for Spark SQL that reads Parquet files it works only if random access state for an object is preserved between channel instances - looks like it uses 2 different channels to read footer and after that column data and if 1st channel that reads footer detects that this is a random access file (based on backward seeks) it should pass this information to the channel that will read column data - I achieved this by storing all object paths that were detected to have random access in LRU cache and check it during new channel creation.

Another hint that I use, is that when footer size (last 8 bytes of a file) is read it also means that this is a random access file, but it's less universal signal than backward seeks, I guess.

steveloughran commented 6 years ago

In HADOOP-15229 I've proposed adding a builder API to FileSystem.open(); fadvise options would be one of the features this features which will be available.