pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.51k stars 1.88k forks source link

scan_parquet from S3 is 5x slower than in Python #11835

Closed jinschoi closed 11 months ago

jinschoi commented 11 months ago

Checks

Reproducible example

use cloud::AmazonS3ConfigKey as Key;
use eyre::Result;
use polars::prelude::*;
use dotenvy::dotenv;

fn lazyframe_from_s3_key(key: &str) -> Result<LazyFrame> {
    let access_key_id = std::env::var("aws_access_key_id")?;
    let secret_access_key = std::env::var("aws_secret_access_key")?;
    let region = std::env::var("aws_region")?;
    let cloud_options = cloud::CloudOptions::default().with_aws([
        (Key::AccessKeyId, access_key_id),
        (Key::SecretAccessKey, secret_access_key),
        (Key::Region, region),
    ]);
    let args = ScanArgsParquet {
        cloud_options: Some(cloud_options),
        ..Default::default()
    };

    let df = LazyFrame::scan_parquet(key, args)?
        .with_streaming(true);
    Ok(df)
}

macro_rules! time {
    ($msg:expr, $($stmts:stmt);+) => {
        let now = std::time::Instant::now();
        $($stmts)*
        println!("{}: {:.2?}", $msg, now.elapsed());
    };
}

pub fn dataframe_from_s3_key(key: &str) -> Result<DataFrame> {
    time!("lazyframe scan",
          let lf = lazyframe_from_s3_key(key)?);
    time!("lazyframe collect",
          let df = lf.collect()?);
    Ok(df)
}

fn main() -> Result<()> {
    dotenv()?;
    let df = dataframe_from_s3_key("s3://xxx/test.parquet")?;
    dbg!(&df);

    Ok(())
}

Log output

lazyframe scan: 244.43ms
RUN STREAMING PIPELINE
parquet -> ordered_sink
RefCell { value: [] }
STREAMING CHUNK SIZE: 50000 rows
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (41125, 338863),(1128323, 320829),(1833539, 324471),(380043, 33593),(772972, 326081),(1478462, 324264),(741404, 31511),(1802782, 30699),(4, 41066),(1099108, 29157),(1449208, 29196),(413693, 327656).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (3205041, 28099),(2856027, 28572),(3582205, 319573),(2506739, 27959),(3901834, 27987),(2158066, 28189),(2884657, 320328),(3233198, 320884),(3929879, 316815),(2186313, 320370),(2534756, 321215),(3554138, 28009).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (5653937, 322647),(6003203, 316009),(5283565, 25907),(5309530, 317071),(4274384, 317989),(4246750, 27576),(4592429, 27991),(5976640, 26505),(4620478, 318120),(4938654, 26404),(4965116, 318393),(5626657, 27222).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (6690815, 318996),(7036264, 318566),(7354886, 27087),(8042121, 26175),(7382031, 316585),(7725453, 316612),(6319268, 26335),(6664571, 26186),(7009867, 26339),(8068354, 317339),(7698672, 26723),(6345661, 318854).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (9755720, 24885),(9411465, 25661),(9069009, 25469),(10098871, 26337),(8411244, 317995),(10125266, 319557),(9094536, 316873),(9780663, 318152),(9437184, 318480),(8385749, 25437),(8729295, 24601),(8753954, 314999).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (10812615, 320553),(10786649, 25908),(10444879, 25254),(11158826, 318164),(11477046, 26691),(10470191, 316402),(11503795, 316536),(11133224, 25544),(11847162, 317072),(11820387, 26717),(12190076, 316627),(12164290, 25728).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (12506759, 26452),(13564717, 317891),(13218083, 319802),(12873146, 319397),(13908534, 317506),(12533269, 314241),(12847566, 25522),(13537941, 26718),(14226096, 26433),(14252587, 314901),(13192599, 25426),(13882664, 25812).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (15251171, 25987),(14913706, 24269),(15593948, 23934),(16269831, 25232),(15277216, 316676),(14593070, 320580),(15617940, 312427),(16295121, 315006),(14938033, 313082),(15930423, 24224),(15954705, 315070),(14567544, 25468).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (17656053, 313844),(17994750, 315752),(17290403, 24887),(18334286, 314099),(18310558, 23670),(16610183, 24557),(16976183, 314164),(17630333, 25662),(16951274, 24851),(17315348, 314929),(16634798, 316420),(17969953, 24739).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (19329591, 25949),(18673120, 316014),(18648441, 24621),(19696993, 317114),(19355598, 314772),(20379588, 312134),(19013742, 315793),(19670426, 26509),(18989190, 24494),(20014163, 24848),(20354427, 25103),(20039069, 315302).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (21740701, 317154),(20716463, 315162),(20691778, 24627),(21716188, 24455),(21395550, 320582),(22399815, 26275),(22426148, 317500),(21055563, 314541),(21031681, 23824),(21370160, 25332),(22057911, 24820),(22082789, 316970).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (23423443, 24906),(24456415, 312069),(23788069, 310686),(24432437, 23920),(23448407, 316466),(24122494, 309887),(23764929, 23082),(24098811, 23625),(22743704, 25284),(23082275, 24647),(22769046, 313173),(23106980, 316407).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (25444819, 24538),(26463237, 24438),(24793560, 314637),(25783866, 24553),(24768540, 24962),(25108253, 24508),(25808477, 315309),(26149017, 314164),(25469415, 314395),(26487733, 314936),(25132819, 311944),(26123842, 25117).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (27509455, 317677),(28190662, 315652),(26827905, 315251),(27851901, 313608),(28165565, 25039),(28531188, 314213),(27484166, 25231),(26802725, 25122),(27827188, 24655),(28506370, 24760),(27143212, 24540),(27167810, 316300).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (29521126, 24786),(29208460, 312610),(29545970, 317430),(30544117, 25470),(29887922, 313587),(30226457, 317604),(28845457, 24030),(29863456, 24408),(29184615, 23787),(30569645, 313622),(28869545, 315014),(30201565, 24834).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (30906947, 313490),(32229146, 25585),(31578814, 312090),(31555134, 23622),(30883323, 23566),(31220493, 23541),(31890960, 24825),(31244092, 310986),(32573577, 23892),(31915843, 313247),(32254789, 318732),(32597527, 315310).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (33250407, 24160),(33591292, 24380),(34608293, 24679),(32936980, 313371),(32912893, 24029),(33615730, 315433),(34296052, 312185),(33274625, 316611),(34270432, 25562),(34633030, 312539),(33931219, 24046),(33955323, 315053).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (35287586, 23936),(35965578, 24240),(35627794, 25443),(36304602, 24269),(34970207, 317323),(34945625, 24524),(36642025, 24510),(35653295, 312227),(35989876, 314670),(36666593, 312946),(36328929, 313040),(35311580, 316158).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (36979595, 24429),(37318791, 23829),(37004082, 314653),(38697010, 315482),(38358695, 313919),(37681933, 311280),(37993269, 24842),(38334623, 24014),(38672670, 24282),(37657051, 24824),(38018169, 316398),(37342678, 314317).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (40028341, 24933),(39351740, 24615),(39376413, 314382),(39715389, 312896),(39690851, 24480),(39037078, 314606),(40729963, 315731),(40367214, 23911),(39012548, 24472),(40053332, 313826),(40391183, 314805),(40706044, 23861).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (41382609, 24532),(41407199, 313595),(41745716, 318896),(42089426, 316319),(42429920, 318012),(42770992, 310721),(41720850, 24808),(41069770, 312783),(42747988, 22946),(42064668, 24700),(41045750, 23962),(42405801, 24061).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (44437728, 23534),(43785744, 311805),(43081769, 24775),(44774808, 24404),(43423633, 25060),(44097605, 25626),(43761724, 23962),(43106602, 316975),(44123289, 314383),(43448751, 312917),(44461320, 313432),(44799270, 311530).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (46465612, 23727),(46799904, 25487),(45135681, 313548),(45110856, 24767),(45473057, 314804),(45811832, 313581),(46151140, 314416),(46825449, 312364),(45449285, 23714),(45787917, 23857),(46125469, 25613),(46489397, 310451).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (47815411, 23584),(47839053, 314279),(48517745, 316195),(48858326, 314444),(47480873, 23097),(48178698, 313766),(47504028, 311327),(47137869, 24739),(47162666, 318151),(48153388, 25252),(48833996, 24272),(48492520, 25167).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (50524271, 24350),(49197250, 314505),(50548679, 317082),(49172826, 24366),(50188268, 23501),(50211827, 312388),(50865817, 23965),(49849214, 24411),(49511811, 23713),(49873683, 314529),(49535582, 313576),(50889840, 318181).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (51547175, 23694),(52556220, 22632),(51232937, 314182),(52889754, 24551),(51208077, 24802),(51907339, 314176),(51570927, 312262),(52246566, 309598),(51883245, 24036),(52221571, 24937),(52914363, 312651),(52578910, 310788).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (53567225, 23227),(54236888, 23663),(53227070, 24288),(53590510, 310067),(54915161, 23657),(53925344, 311488),(53251416, 315753),(53900633, 24653),(54260609, 315625),(54600885, 314220),(54938876, 315227),(54576290, 24537).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (56944413, 25045),(56290867, 313197),(55589947, 25058),(55615063, 314912),(56604120, 24486),(56266860, 23949),(55930031, 23194),(55953283, 313521),(55254159, 23623),(55277840, 312051),(56628664, 315693),(56969516, 310776).
BatchedParquetReader: fetched 6 row_groups for 2 fields, yielding 12 column chunks.
BatchedParquetReader: column chunks start & len: (58965371, 24242),(57951792, 24193),(57280348, 23263),(58311333, 313398),(57303669, 312895),(57976043, 311126),(57616620, 23438),(58989671, 314482),(58287225, 24050),(58624787, 24238),(57640116, 311620),(58649083, 316232).
BatchedParquetReader: fetched 2 row_groups for 2 fields, yielding 4 column chunks.
BatchedParquetReader: column chunks start & len: (59664958, 313092),(59328938, 312091),(59641085, 23815),(59304209, 24671).
lazyframe collect: 27.33s
[src/main.rs:44] &df = shape: (17_605_984, 2)
... dataframe display ...

Issue description

Running the above code on a 60MB parquet file in s3 takes 27s. The equivalent Python code:

import polars as pl
df = pl.scan_parquet("s3://xxx/test.parquet").collect()

takes ~5s. While running with POLARS_VERBOSE=1, each of those BatchedParquetReader lines outputs about one per second.

Expected behavior

The rust code should be at least as fast as the Python version.

Installed versions

"parquet", "lazy", "aws", "dtype-categorical", "dtype-u8"
jinschoi commented 11 months ago

Reading the s3 file into a std::io::Cursor and creating a ParquetReader from it only takes 5s as well:

use std::io::{Read, Cursor};

use cloud::AmazonS3ConfigKey as Key;
use dotenvy::dotenv;
use eyre::{Result, Report};
use polars::prelude::*;
use rusoto_core::Region;
use rusoto_s3::{GetObjectRequest, S3Client, S3};
use tokio::task;

async fn cursor_for_s3(key: &str) -> Result<Cursor<Vec<u8>>> {
    dbg!(key);
    let s3_client = S3Client::new(Region::default());
    let req = GetObjectRequest {
        bucket: "s3bucket".to_owned(),
        key: key.to_owned(),
        ..Default::default()
    };

    let mut reader = s3_client
        .get_object(req)
        .await?
        .body
        .unwrap()
        .into_blocking_read();
    let mut buf = vec![];
    let join_handle: task::JoinHandle<Result<Cursor<Vec<u8>>, Report>> = task::spawn_blocking(move || {
        reader.read_to_end(&mut buf)?;
        Ok(Cursor::new(buf))
    });
    join_handle.await?
}

macro_rules! time {
    ($msg:expr, $($stmts:stmt);+) => {
        let now = std::time::Instant::now();
        $($stmts)*
        println!("{}: {:.2?}", $msg, now.elapsed());
    };
}

pub async fn dataframe_from_s3_key_rusoto(key: &str) -> Result<DataFrame> {
    time!("parquet read",
          let mut cursor = cursor_for_s3(key).await?);

    time!("df conversion",
          let df = ParquetReader::new(&mut cursor).finish()?);
    Ok(df)
}

#[tokio::main]
async fn main() -> Result<()> {
    dotenv()?;
    let df = dataframe_from_s3_key_rusoto("xxx").await?;
    dbg!(&df);

    Ok(())
}
allinux commented 11 months ago

Since the released python version and rust version are released separately, you must check whether they are the same version.

There was no significant performance difference when testing python and rust with py-0.19.8(#11645).

In [1]: %%time
   ...: import polars as pl
   ...: storage_options = {
   ...:     "aws_access_key_id": "",
   ...:     "aws_secret_access_key": "",
   ...:     "aws_region": "ap-northeast-2",
   ...: }
   ...: 
   ...: df = pl.scan_parquet("s3://.../DATE_PARTITION=2022022*/*.parquet", storage_options=storage_options)
   ...: print(df.collect())
   ...: 
   ...: 
shape: (288_230, 352)
┌──────────┬────────────────┬─────────────────────┬────────────┬───┬─────────────────┬─────────────────┬──────────┬────────────────┐
│ AIR_DATA ┆ AI_FabricLevel ┆ Anti&&BLANK&&Crease ┆ B1_Error_F ┆ … ┆ supersetVersion ┆ totalCycleCount ┆ wattHour ┆ DATE_PARTITION │
│ ---      ┆ ---            ┆ ---                 ┆ ---        ┆   ┆ ---             ┆ ---             ┆ ---      ┆ ---            │
│ str      ┆ str            ┆ str                 ┆ str        ┆   ┆ str             ┆ str             ┆ str      ┆ i64            │
╞══════════╪════════════════╪═════════════════════╪════════════╪═══╪═════════════════╪═════════════════╪══════════╪════════════════╡
│ []       ┆ 0              ┆                     ┆            ┆ … ┆ 10              ┆                 ┆          ┆ 20220225       │
│ []       ┆                ┆                     ┆            ┆ … ┆ 10              ┆                 ┆          ┆ 20220225       │
│ []       ┆                ┆                     ┆            ┆ … ┆ 10              ┆                 ┆          ┆ 20220225       │
│ []       ┆ 0              ┆                     ┆            ┆ … ┆ 10              ┆                 ┆          ┆ 20220225       │
│ …        ┆ …              ┆ …                   ┆ …          ┆ … ┆ …               ┆ …               ┆ …        ┆ …              │
│ []       ┆                ┆                     ┆            ┆ … ┆ 10              ┆ 48              ┆ 1223     ┆ 20220228       │
│ []       ┆                ┆                     ┆            ┆ … ┆ 10              ┆ 1               ┆ 0        ┆ 20220228       │
│ []       ┆                ┆                     ┆            ┆ … ┆ 10              ┆                 ┆          ┆ 20220228       │
│ []       ┆                ┆                     ┆            ┆ … ┆ 10              ┆                 ┆          ┆ 20220228       │
└──────────┴────────────────┴─────────────────────┴────────────┴───┴─────────────────┴─────────────────┴──────────┴────────────────┘
CPU times: user 11 s, sys: 9.07 s, total: 20.1 s
Wall time: 5.46 s

let opts = CloudOptions::default().with_aws([ (AmazonS3ConfigKey::DefaultRegion, "ap-northeast-2".to_string()), (AmazonS3ConfigKey::Region, "ap-northeast-2".to_string()), (AmazonS3ConfigKey::Bucket, "".to_string()), (AmazonS3ConfigKey::AccessKeyId, "".to_string()), (AmazonS3ConfigKey::SecretAccessKey, "".to_string()), ]);

let args = ScanArgsParquet{ hive_partitioning: true, cloud_options: Some(opts), ..Default::default() }; let df = LazyFrame::scan_parquet("s3://.../DATE_PARTITION=2022022/.parquet", args)? .collect();

println!("{:?}", df);

datagov@ec2-an2-prd-datalake-data-coverage-02c release]$ time ./polars_ex01 Ok(shape: (288_230, 352) ┌──────────┬────────────────┬─────────────────────┬────────────┬───┬─────────────────┬─────────────────┬──────────┬────────────────┐ │ AIR_DATA ┆ AI_FabricLevel ┆ Anti&&BLANK&&Crease ┆ B1_Error_F ┆ … ┆ supersetVersion ┆ totalCycleCount ┆ wattHour ┆ DATE_PARTITION │ │ --- ┆ --- ┆ --- ┆ --- ┆ ┆ --- ┆ --- ┆ --- ┆ --- │ │ str ┆ str ┆ str ┆ str ┆ ┆ str ┆ str ┆ str ┆ i64 │ ╞══════════╪════════════════╪═════════════════════╪════════════╪═══╪═════════════════╪═════════════════╪══════════╪════════════════╡ │ [] ┆ 0 ┆ ┆ ┆ … ┆ 10 ┆ ┆ ┆ 20220225 │ │ [] ┆ ┆ ┆ ┆ … ┆ 10 ┆ ┆ ┆ 20220225 │ │ [] ┆ ┆ ┆ ┆ … ┆ 10 ┆ ┆ ┆ 20220225 │ │ [] ┆ 0 ┆ ┆ ┆ … ┆ 10 ┆ ┆ ┆ 20220225 │ │ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │ │ [] ┆ ┆ ┆ ┆ … ┆ 10 ┆ 48 ┆ 1223 ┆ 20220228 │ │ [] ┆ ┆ ┆ ┆ … ┆ 10 ┆ 1 ┆ 0 ┆ 20220228 │ │ [] ┆ ┆ ┆ ┆ … ┆ 10 ┆ ┆ ┆ 20220228 │ │ [] ┆ ┆ ┆ ┆ … ┆ 10 ┆ ┆ ┆ 20220228 │ └──────────┴────────────────┴─────────────────────┴────────────┴───┴─────────────────┴─────────────────┴──────────┴────────────────┘)

real 0m5.917s user 0m11.247s sys 0m13.190s

ritchie46 commented 11 months ago

Wait until the next rust release.