rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.35k stars 891 forks source link

[FEA] Improve cudf::io::datasource::create() #17110

Open tpn opened 1 day ago

tpn commented 1 day ago

At a prior company that made heavy use of cudf, we ran into serious performance problems with Kvikio/GDS when small reads were being furnished by a mmap'd data source because they did not meet the configured device read threshold (i.e. for reading compressed metadata which would frequently be less than 128KB).

This was because the underlying parquet files lived on a network file system--WekaFS in our case--and mmap'd I/O against WekaFS resulted in pathological performance problems. Especially when it was lots of little <128KB reads that would be paged in 4K at a time over the network in a really inefficient way.

Additionally, we found ourselves needing to have a data source that did O_DIRECT host reads for everything, such that the Linux page cache could be avoided entirely. This was useful when doing back-to-back benchmarks of different features, where the presence or absence of data in the page cache would have huge impacts on runtimes. i.e. it was impossible to see if a PR purporting a 5% perf boost was actually delivering such a boost when back-to-back runs varied by 50% simply due to cold vs hot page cache data.

I'm working on reimplementing the various improvements we introduced in our internal codebase in cudf, such that they can be used by others. The approach that seems most viable, assuming you want to keep all datasource implementation innards behind datasource.cpp, is to allow cudf::io::datasource::create() to take additional datasource_kind and datasource_params parameters; see code sample below.

I'm actively hacking on a branch locally, I'll throw up a draft PR soon.

 diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp
index dc14802adc..845f34a1db 100644
--- a/cpp/include/cudf/io/datasource.hpp
+++ b/cpp/include/cudf/io/datasource.hpp
@@ -36,6 +36,174 @@ namespace io {
  * @file
  */

+/**
+ * @brief Kind of data source to create when calling
+ * `cudf::io::datasource::create()`.
+ *
+ * @see cudf::io::datasource::create()
+ * @see cudf::io::datasource_params
+ *
+ * N.B. GDS = GPUDirect Storage
+ */
+enum class datasource_kind {
+  /**
+   * @brief Kvikio-based data source (default).
+   *
+   * This data source is the default for cuDF, and should be the most performant
+   * option for most use cases.  It supports GDS where possible, falling back to
+   * multi-threaded host-based reads when GDS is not available.
+   *
+   * It supports asynchronous reads, and will use the provided CUDA stream for
+   * all I/O operations when possible.
+   */
+  KVIKIO = 0,
+  DEFAULT = KVIKIO,
+
+  /**
+   * @brief Kvikio-based data source that does not attempt to use GDS, instead
+   * falling back to multi-threaded host-based reads.
+   *
+   * It supports asynchronous reads, but does not do any stream synchronization,
+   * as the reads are all performed on the host.
+   */
+  KVIKIO_COMPAT,
+
+  /**
+   * @brief Kvikio-based data source that will fail if GDS is not available.
+   * Specifically, `cudf::io::datasource::create()` when called with this kind
+   * of data source will throw a `cudf::runtime_error` if GDS is not available.
+   */
+  KVIKIO_GDS,
+
+  /**
+   * @brief Host-based data source that does not support any device or async
+   * operations.
+   *
+   * All reads are performed via standard POSIX pread() calls.  No
+   * multi-threading or asynchronous operations are supported.
+   *
+   * The primary purpose of this datasource type is to be a base class for the
+   * `O_DIRECT` implementation, which needs to issue pread() calls against a
+   * file descriptor that *hasn't* been opened with `O_DIRECT` if certain
+   * constraints aren't met (specifically: when reading the final bytes of a
+   * file that isn't perfectly aligned to a sector-size boundary).
+   *
+   * The time required to service reads from this data source will be affected
+   * by the presence or absence of the desired data in the Linux page cache.
+   * Thus, back-to-back runs of the same file will have significantly different
+   * performance characteristics, depending on whether the data is in the page
+   * cache or not.
+   *
+   * Generally, this data source should be avoided in favor of the `KVIKIO`
+   * data source, which will be more performant in most cases.  Thus, it can
+   * be used as a baseline for which improved `KVIKIO` performance can be
+   * empirically measured.
+   */
+  HOST,
+
+  /**
+   * @brief Host-based data source that issues reads against a file descriptor
+   * opened with `O_DIRECT`, where possible, bypassing the Linux page cache.
+   *
+   * This data source will always result in the slowest possible read times,
+   * as all reads are serviced directly from the underlying device.  However,
+   * it will be consistently slow, and that consistency can be critical when
+   * benchmarking or profiling changes purporting to improve performance in
+   * unrelated areas.
+   *
+   * Thus, the primary use case for this data source is for benchmarking and
+   * profiling purposes, where you want to eliminate any runtime variance in
+   * back-to-back runs that would be caused by the presence or absence of data
+   * in the host's page cache.
+   *
+   * A secondary use case for this data source is when you specifically do not
+   * want to pollute the host's page cache with the data being read, either
+   * because it won't be read again soon, or you want to remove the memory
+   * pressure (or small but non-trivial amount of compute overhead) that would
+   * otherwise be introduced by servicing I/O through the page cache.  In some
+   * scenarios, this can yield a net performance improvement, despite a higher
+   * per-read latency.
+   */
+  ODIRECT,
+
+  /**
+   * @brief Host-based data source that uses memory mapped files to satisfy
+   * read requests.
+   *
+   * Note that this can result in pathological performance problems in certain
+   * environments, such as when small reads are done against files residing on
+   * a network file system (including accelerated file systems like WekaFS).
+   */
+  HOST_MMAP,
+};
+
+/**
+ * @brief Parameters for the kvikio data source.
+ */
+struct kvikio_datasource_params {
+  /**
+   * @brief When set, explicitly disables any attempts at using GPUDirect
+   * Storage, resulting in kvikio falling back to its "compat" mode using
+   * multi-threaded host-based reads.
+   *
+   * Defaults to false.
+   *
+   * N.B. Compat mode will still be used if GDS isn't available, regardless
+   *      of the value of this parameter.
+   */
+  bool use_compat_mode{false};
+
+  /**
+   * @brief The threshold at which the data source will switch from using
+   * host-based reads to device-based (i.e. GPUDirect) reads, if GPUDirect is
+   * available.
+   *
+   * This parameter should represent the read size where GDS is faster than
+   * a posix read() plus the overhead of a host-to-device memcpy.
+   *
+   * Defaults to 128KB.
+   */
+  size_t device_read_threshold{128 << 10};
+
+  /**
+   * @brief The number of threads in the kvikio thread pool.
+   *
+   * This parameter only applies to the kvikio data source when GDS is not
+   * available and it is in compat mode.
+   *
+   * Defaults to 0, which defers the thread pool sizing to kvikio.
+   */
+  uint16_t num_threads{0};
+
+  /**
+   * @brief The size in bytes into which I/O operations will be split.
+   *
+   * Defaults to 1MB.
+   */
+  size_t task_size{1 << 20};
+};
+
+/**
+ * @brief Parameters for the `O_DIRECT` data source.
+ */
+struct odirect_datasource_params {
+  /**
+   * @brief The sector size, in bytes, for the underlying device upon which
+   * `O_DIRECT` reads will be dispatched.
+   *
+   * Defaults to 512.  This is the most common sector size for modern drives,
+   * althought 4096 is becoming more prevalent, especially in enterprise gear.
+   *
+   * N.B. On Linux, you can determine the sector size of a device with the
+   *      the `blockdev` command, e.g.: `sudo blockdev --getss /dev/sda`.
+   */
+  size_t sector_size{512};
+};
+
+/**
+ * @brief Union of parameters for different data sources.
+ */
+using datasource_params = std::variant<kvikio_datasource_params, odirect_datasource_params>;
+
 /**
  * @brief Interface class for providing input data to the readers.
  */
@@ -94,7 +262,10 @@ class datasource {
    * include padding after the byte range, to include additional data that may be needed for
    * processing.
    *
-   @throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate
+   * @throws cudf::logic_error if the minimum size estimate is greater than the maximum size estimate
+   *
+   * @throws cudf::runtime_error if `KVIKIO_GDS` is specified as the desired kind of data source,
+   * and GDS is not available for the file.
    *
    * @param[in] filepath Path to the file to use
    * @param[in] offset Starting byte offset from which data will be read (the default is zero)
@@ -102,12 +273,16 @@ class datasource {
    * zero, which means the whole file after `offset`)
    * @param[in] min_size_estimate Lower estimate of the data range that will be read (the default is
    * zero, which means the whole file after `offset`)
+   * @param[in] kind Optionally supplies the kind of data source to create
+   * @param[in] params Optionally supplies parameters for the data source
    * @return Constructed datasource object
    */
   static std::unique_ptr<datasource> create(std::string const& filepath,
                                             size_t offset            = 0,
                                             size_t max_size_estimate = 0,
-                                            size_t min_size_estimate = 0);
+                                            size_t min_size_estimate = 0,
+                                            datasource_kind kind = datasource_kind::DEFAULT,
+                                            std::optional<datasource_params> params = std::nullopt);
tpn commented 18 hours ago

WIP/draft PR here: https://github.com/rapidsai/cudf/pull/17115

vuule commented 17 hours ago

Thank you for the detailed proposal! One question to better understand the WekaFS situation - what is the preferred method to perform IO in this case? GDS?

tpn commented 15 hours ago

Anything other than mmap :-) Literally carrier pidgeon would probably have been preferred. And these weren't weak-sauce network links; they were 200GB NICs. In our case, we had a custom KvikioDataSource that essentially mirrored your file_source, except it didn't ever fall back to mmap -- it would use kvikio.pread() for those small metadata calls.

Edit: actually just to clarify... the WekaFS pathological perf issue was traced to host read calls (because the device threshold of 128KB wasn't met), and in your default implementation, that was resulting in the reads ending up as being serviced by mmap. (Saying that though, I'm looking at the code and wondering how that happened--I seem to recall GDS was working properly for reads >128KB, it was just the <=128KB ones that hit the slow mmap path. But looking at current cudf code, this would imply that the initial cufile_integration check failed (or even ifndef CUFILE_FOUND), resulting in mmap for everything.)

So the solution I propose would have a create() method look something like this:

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
                                               size_t offset,
                                               size_t max_size_estimate,
                                               size_t min_size_estimate,
                                               datasource_kind kind,
                                               std::optional<const datasource_params&> params)
{
  CUDF_EXPECTS(max_size_estimate == 0 or min_size_estimate <= max_size_estimate,
               "Invalid min/max size estimates for datasource creation");

  switch (kind) {
    case datasource_kind::KVIKIO:
    case datasource_kind::KVIKIO_COMPAT:
    case datasource_kind::KVIKIO_GDS: {
      kvikio_datasource_params new_params;
      if (params) {
        if (auto kvikio_params = std::get_if<kvikio_datasource_params>(&params.value())) {
          // Copy the user-provided parameters into our local variable.
          new_params = *kvikio_params;
        } else {
          throw cudf::logic_error("Invalid parameters for KVIKIO-based datasource.");
        }
      }
      if (kind == datasource_kind::KVIKIO_COMPAT) {
        // Forcibly-set the compatibility mode to true, regardless of what may
        // already be present in the params.  The `kind` parameter has requested
        // `KVIKIO_COMPAT`, and that takes precedence over the `use_compat_mode`
        // parameter in the `kvikio_datasource_params`.
        new_params.use_compat_mode = true;
      } else if (kind == datasource_kind::KVIKIO_GDS) {
        // GDS is unique in that we are expected to throw a cudf::runtime_error
        // if GDS is not available.  The first chance we have to do this is
        // here, by way of fencing against CUFILE_FOUND.
#ifndef CUFILE_FOUND
        throw cudf::runtime_error("GDS is not available because cuFile is not enabled.");
#endif
        // The next check is done against the `is_gds_enabled()` function in
        // `cufile_integration`.  If GDS is not enabled, we throw a runtime
        // error here as well.
        if (!cufile_integration::is_gds_enabled()) {
          throw cudf::runtime_error("cuFile reports GDS is not available.");
        }
        // Forcibly-set the compatibility mode to false, regardless of what may
        // already be present in the params.  The `kind` parameter has requested
        // `KVIKIO_GDS`, and that takes precedence over the `use_compat_mode`
        // parameter in the `kvikio_datasource_params`.
        new_params.use_compat_mode = false;
      } else {
        CUDF_EXPECTS(kind == datasource_kind::KVIKIO,
                     "Invariant check failed: kind != datasource_kind::KVIKIO");
        // We don't need to do any special handling for `KVIKIO` here.
      }
      return std::make_unique<kvikio_source>(filepath.c_str(), new_params);
    }
    case datasource_kind::HOST:
      return std::make_unique<host_source>(filepath.c_str());
    case datasource_kind::ODIRECT: {
      odirect_datasource_params new_params;
      if (params) {
        if (auto odirect_params = std::get_if<odirect_datasource_params>(&params.value())) {
          // Copy the user-provided parameters into our local variable.
          new_params = *odirect_params;
        } else {
          throw cudf::logic_error("Invalid parameters for O_DIRECT-based datasource.");
        }
      }
      return std::make_unique<odirect_source>(filepath.c_str(), new_params);
    }
    case datasource_kind::HOST_MMAP:
      return std::make_unique<memory_mapped_source>(
        filepath.c_str(), offset, max_size_estimate, min_size_estimate);
    default:
      CUDF_FAIL("Unsupported datasource kind");
  }
}

There would be a new host_source that does host-only POSIX pread() calls; no async support, no device support.

odirect_source deriving from host_source which does all its sector aligned stuff for host reads against a file descriptor opened with O_DIRECT, but delegating reading of the final bytes of the file (unless the file is a perfect sector size multiple) to the underlying host_read's read().

memory_mapped_source which you've already implemented; this proposal would just allow you to explicitly control if you want mmap via kind = datsource_kind::HOST_MMAP.

The kvikio_source would be new. It would be similar to your current file_source implementation, in that it has a private kvikio::FileHandle that it dispatches device read and host read calls to. But it would also be cognizant of the kvikio_datasource_parms, which allows a user to customize the threadpool size, task size, whether or not GDS is used at all (use_compat_mode), as well as the device read threshold. Additionally, it needs to be cognizant of whether or not the kind is KVIKIO_GDS and make sure it keeps the guarantee to raise a runtime error if GDS isn't available at runtime.