Closed aweisberg closed 2 years ago
This issue has been automatically marked as stale because it has not had any activity in the last 2 years. If you feel that this issue is important, just comment and the stale tag will be removed; otherwise it will be closed in 7 days. This is an attempt to ensure that our open issues remain valuable and relevant so that we can keep track of what needs to be done and prioritize the right things.
Written by @oerling
Introduction
Modern clouds and data center designs are predominantly based on disaggregated storage. This enables decoupling of storage and compute and provides flexibility in provisioning and scaling but comes at a cost in performance over tightly coupled storage/compute solutions.
If anything, this further emphasizes the common wisdom on distributed computing: Have maximum asynchronicity and maximum operations in flight at all times so as to absorb latency and have a buffer against fluctuations in response times.
This document is based on initial discussions between Facebook and Amazon Athena. More participants are invited to join.
Use Case
We seek to increase CPU utilization by interleaving IO with processing. This takes place within a table scan and involves adaptive prefetching over a span of row groups, stripes and splits.
In almost all cases, Presto has knowledge of multiple splits worth of stripes in advance of needing these.
In this discussion, we speak in terms of the Aria scan operator, which does filter reordering and has explicit control on column reading order. The same principles apply to
LazyBlock
based schemes with minimal modifications.A scan might involve very selective or location correlated filters which are false for full stripes. In these cases, some columns may be read very sparsely. Such reading should not be speculatively scheduled.
Therefore the first row groups should be scanned without prefetch or maybe only prefetching the filtered columns. Filter order adaptation will cause the most selective filter to be run first, which in some cases may lead to all other columns becoming sparsely accessed, effectively limiting read ahead to the first filter. If such a thing happens, read-ahead should be discontinued for the affected columns.
In the case of wide tables, a single split may hold only a few row groups. For these cases, it is important to carry adaptation related information between splits. This is equally true for filter order as for speculative reading.
In all cases, at least the file footer must be read. Thus the simplest case consists of just pre-reading these for a large set of files before starting on the files themselves. Additionally, files have stripe footers whose locations are known only based on the file footer. Latency may be absorbed by asynchronously issuing the dependent read as soon as the read this depends on returns. In this way we get to read the metadata of N files in the latency of just one.
Furthermore, if we know that either all columns are read or that columns in particular positions are read, we may combine reading the columns themselves along with the stripe metadata. The read will be for a speculative amount of data and more will be read if more turns out to be needed.
Some systems may have an outsize benefit for all requests falling within a large block being issued together. These requests might not cover the entire block but would still be submitted as a single request. For example Facebook's internal distributed file system benefits from 72 mB block sizes, specially if data needs to be reconstructed. S3 scheduling benefits from having multiple read ranges in the same request.
Storage System Capabilities
In the discussion, we will assume that a storage system has the following metadata and interface capabilities:
byte[]
, offset, size triplets. These cover the number of bytes in each range. The multirange request does not need to notify about completion of individual ranges. On success completion, the buffers will have been filled with the data.These are trivial to implement on a local file system and expose a common set of features for various cloud and cluster systems.
Buffer Pool
Presto interfaces with storage via a buffer pool. The loaders, e.g.
AbstractOrcDataSource.ChunkedSliceLoader
presently have a pattern:This becomes
and at close of the
OrcDataSource
or when the data is no longer needed:The
CachedData
structure holds a pin on one or more pages of a buffer pool. The pins are released as the stream goes through the buffers that correspond to the byte range in the file.pool.get
checks if the byte range is in memory. If it is, this blocks until a possible background read finishes. If the range is not in memory it is read synchronously.The data is usually fed to a decompressor or skipped over. Hence showing it as fixed size arrays is no inconvenience and allows doing the reading within a bounded buffer pool of preallocated fixed size elements. In uncompressed cases the data is accessed by a StreamReader. Also then an interface that iterates over
byte[]
, offset, length triples is appropriate.An upper bound on memory used for asynchronous reading can be easily enforced. If buffers are not available, a read ahead can simply not be issued and the read of the data reverts to synchronous mode. Another possibility is blocking the query until there are enough buffers for read ahead. This mode would enforce a cap on outstanding IO per JVM.
The buffers keep their association to a file and byte range until these are reused. In this way frequently read data may actually stay in memory, although we do not expect this in most use cases.
The buffer pool is keyed by file and quantized offset. File names are interned and compared by identity, that is, a hash table of weak references keeps a unique instance of each filename.
We do not expect complex buffer replacement logic. We keep a time of last use and a use count for each buffer. A clock algorithm that decrements use counts will do. When looking at buffers to reuse one can look at the next 10 unpinned buffers and pick the one with the lowest use count / time since last use. If use counts are non-0, we decrement them on the way.
Triggering Read Ahead
The coordinator delivers splits to workers in batches. As soon as splits are received, we can trigger reading of metadata. This involves a read of the footer for each file plus reads of stripe data that depend on the footer's content.
Creation of a
PageSource
for a split just needs to wait until the background metadata read finishes. At this point the locations of columns will be known.In the warm up stage, reads are done synchronously. This identifies which columns are actually read. After a number of row groups have been processed, we move to read ahead for the columns that have a dense read pattern.
We identify byte ranges for the columns we expect to read and schedule these for asynchronous read. The byte ranges are given buffers from a pool. These stay pinned until the data is accessed or the stripe is closed.
Depending on the number of columns and stripe size, the read ahead may cover multiple stripes or splits.
If a system benefits from bundling requests to nearby ranges in a single RPC,then this bundling can be done after identifying the ranges to prefetch.
The first read ahead can start after the first few row groups. It will cover data for all columns read so far for as many splits as there is space for in the read ahead quota. Disk ranges for which there is no space in the quota will simply be serviced synchronously when needed.
The scan may issue the next read ahead when for example half the buffers of the previous read ahead have been unpinned.
Interface Changes
PageSource
Get and set data access statistics. The statistics are connector-dependent and contain data for read ahead, filter order etc.
ConnectorSplit
Initiate and wait for background metadata read.
Skip over column statistics for columns that are not being read. This is not an outside visible change but will save time and memory for sparsely read wide tables.
Filter Statistics
Since the time of arrival of data being read is now tied to other data, we need to subtract time spent waiting for data from the execution time of a filter over a column. If reads are synchronous and take place inside the call to a filtering column reader, the wait time can be included. If the wait time is shared between many other columns, it should not be included in the filter time. The filter time is in turn used for filter ordering.
Other
The buffer pool needs a few configuration parameters and performance counters, for example buffer reuses, number of times read aheads could not be scheduled for lack of space etc. The buffer pool is a process-wide structure but does not have interface outside of the reader.
The same buffer pool could be used for temporary memory for hash joins, for example. In this way anything that was not used for hash joins/group bys could be used for IO acceleration or even caching hot data in memory.