pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30k stars 1.94k forks source link

Groupby using lazy mode on a csv throw an memory allocation error when running on AWS lambda #17946

Open GBMsejimenez opened 2 months ago

GBMsejimenez commented 2 months ago

Checks

Reproducible example

import json
import boto3
import polars as pl

session = boto3.Session()
CREDENTIALS = session.get_credentials()
STORAGE_OPTIONS = {
    "aws_region": "us-east-1",
    "aws_access_key_id": CREDENTIALS.access_key,
    "aws_secret_access_key": CREDENTIALS.secret_key,
}
if CREDENTIALS.token:
    STORAGE_OPTIONS.update({"session_token": CREDENTIALS.token})

print(STORAGE_OPTIONS)

# Define the schema for reading the CSV file
SCHEMA = {
    "user_id": pl.Int32,
    "transaction_date": pl.Datetime,
    "order_id": pl.Int32,
    "price": pl.Float32,
    "quantity": pl.Int16,
    "item_id": pl.Int32,
    "item_desc": pl.Utf8,
}

def read_s3(uri: str) -> pl.LazyFrame:
    """
    Read a CSV file from S3 using Polars.

    :param uri: S3 URI of the CSV file.
    :return: Polars LazyFrame with the CSV data.
    """
    return pl.scan_csv(
        uri,
        schema_overrides=SCHEMA,
        ignore_errors=True,
        truncate_ragged_lines=True,
        storage_options=STORAGE_OPTIONS,
    )

def apply_rfm(df: pl.LazyFrame) -> pl.LazyFrame:
    """
    Calculate RFM scores for each user and segment them.

    :param df: Input dataframe.
    :return: Dataframe with RFM scores and segments.
    """

    df_rfm = df.group_by("user_id").agg(
        recency=pl.col("transaction_date").max(),  # Most recent transaction date
        frequency=pl.col("order_id").n_unique(),  # Number of unique orders
        monetary=pl.col("total_amount_plus_taxes").sum(),  # Total monetary value
    )
    latest_date = df.select(pl.col("transaction_date").max()).collect().item()
    df_rfm = df_rfm.with_columns(
        recency=(
            latest_date - pl.col("recency")
        ).dt.total_days()  # Calculate recency in days
    )

    print("RFM Calculated")
    return df_rfm

def handler(event: dict, context: dict) -> dict:
    try:
        uri = event["Records"][0]["s3"]["uri"]

        df = read_s3(uri)

        df = apply_rfm(df)

        return {
            "statusCode": 200,
            "body": json.dumps("RFM loaded to DataBase"),
        }

    except Exception as e:
        print(f"Error in RFM process: {e}")
        return {"statusCode": 500, "body": json.dumps("Error in RFM process")}

Log output

INIT_REPORT Init Duration: 10008.73 ms  Phase: init Status: timeout
Error in RFM process: failed to allocate 25954093 bytes to download uri = s3://aws-us-east-1-dev-s3-xxx/xxx/dataset_processed302e6eea-f9ed-4df4-8ad5-b7c8eada0658.csv
This error occurred with the following context stack:
[1] 'csv scan' failed
[2] 'filter' input failed to resolve
[3] 'filter' input failed to resolve
[4] 'select' input failed to resolve
END RequestId: 3c9f6613-f850-48e4-8658-1b47af8d8786
REPORT RequestId: 3c9f6613-f850-48e4-8658-1b47af8d8786  Duration: 26514.89 ms   Billed Duration: 26515 ms   Memory Size: 10240 MB   Max Memory Used: 159 MB

Issue description

I'm new to Polars and attempting to implement an RFM analysis using the library. As part of my proposed architecture, I need to run the code in an AWS Lambda function. I've successfully implemented the RFM analysis and uploaded the code to Lambda using a Docker image.

Despite the code running successfully on my local container, I'm encountering a "failed to allocate 25954093 bytes" error when running it in the Lambda function. I've tried to troubleshoot the issue, ruling out credential errors since the scan_csv function doesn't throw any errors, and explicitly passing AWS credentials to the scan_csv function.

Attempts to Resolve I've attempted to apply solutions from issues #7774 and #1777, including:

Setting streaming=True on the collect method Defining my schema columns as pl.utf8 or pl.int

Thanks in advanced 🤗

Expected behavior

The Polars code should work seamlessly in the Lambda function, just like it does on the local container, without any memory allocation errors.

Installed versions

``` --------Version info--------- Polars: 1.3.0 Index type: UInt32 Platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.34 Python: 3.12.3 (main, Jun 5 2024, 03:37:09) [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: gevent: great_tables: hvplot: matplotlib: nest_asyncio: numpy: 2.0.1 openpyxl: pandas: 2.2.2 pyarrow: 17.0.0 pydantic: pyiceberg: sqlalchemy: torch: xlsx2csv: xlsxwriter: ```
ritchie46 commented 2 months ago

Can you set POLARS_PANIC_ON_ERR=1 and RUST_BACKTRACE=1 and show us the backtrace log?

wjglenn3 commented 2 months ago

Hi I'm not sure if I should start another issue for this, but I'm pretty sure I'm having the same issue. When running inside an AWS Lambda, I am able to read a CSV and write it to a Parquet file using read_csv and write_parquet, but not so much luck with scan_csv and sink_parquet. I'm getting the same type and error and have tried the same methods to solve the issue as @GBMsejimenez.

I've gotten the code down to the bare minimum for me to reproduce the error (the CSV file being tested only consists of a header and two lines of data, and the bucket and path in the file name have been edited out).

import polars as pl
import s3fs
import json

POLARS_PANIC_ON_ERR=1
RUST_BACKTRACE=1

# Lambda entry
def lambda_handler(event, context):

    pl.show_versions()

    csv_file = 's3://{BUCKET}/{PATH}/test.csv'
    #parquet_file = 's3://{BUCKET}/{PATH}/test.parquet'

    fs = s3fs.S3FileSystem(anon=False)

    df = pl.scan_csv(csv_file).collect(streaming=True)

    return {
        'statusCode': 200,
        'body': json.dumps("Finished")
    }

This is giving me an error of (with {BUCKET} and {PATH} having actual values)

[ERROR] ComputeError: failed to allocate 1343 bytes to download uri = s3://{BUCKET}/{PATH}/test.csv
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 40, in lambda_handler
    df = pl.scan_csv(csv_file).collect()
  File "/opt/python/polars/lazyframe/frame.py", line 2027, in collect
    return wrap_df(ldf.collect(callback))

My polars versions if necessary

``` --------Version info--------- Polars: 1.4.1 Index type: UInt32 Platform: Linux-5.10.219-229.866.amzn2.x86_64-x86_64-with-glibc2.26 Python: 3.11.6 (main, Feb 7 2024, 11:27:56) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: 2024.6.1 gevent: great_tables: hvplot: matplotlib: nest_asyncio: numpy: openpyxl: pandas: pyarrow: pydantic: pyiceberg: sqlalchemy: torch: xlsx2csv: xlsxwriter: ```
qmg-tmay commented 2 months ago

@wjglenn3 I'm experiencing the same issue when using a Docker container based lambda

HectorPascual commented 1 month ago

Hey, we are experiencing the same issue within docker in AWS Lambda, we attempted all the combinations.

I also tried installing s3fs, which is needed for the read_csv, but also breaks with error :

ComputeError : failed to allocate 12345 bytes to download uri = s3://...

Here's my minimum example that breaks :

import asyncio

import boto3
import polars as pl
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

session = boto3.session.Session(region_name="us-west-2")
credentials = session.get_credentials().get_frozen_credentials()
storage_options = {
    "aws_access_key_id": credentials.access_key,
    "aws_secret_access_key": credentials.secret_key,
    "aws_session_token": credentials.token,
    "aws_region": session.region_name,
}

async def do():
    df = pl.scan_csv(
        "s3://.../*.csv",  # example path
        storage_options=storage_options,
    ).collect()
    print(df)

def lambda_handler(event, context):
    uvloop.run(do())
    return "OK"

@alexander-beedie could you please be so kind to treat this issue?

Thank you for the efforts!