Our ML pipelines run on large instances in k8s cluster often large number of cpus is available 128/96 cpus. Looks like the new swordfish runtime has issues handling and scheduling so many parallel tasks and workers.
Currently the number of workers for runtime is set to NUM_CPUS and also the max concurrency of other tasks and operations.
Resulting slow run speeds and excessive memory consumptions.
To Reproduce
I have a script to random generate data somewhat similar to the structure that we have, it includes 24 images stored as s3 links to jpegs and 36 label stored as npz bytes in the parquet and read later on with a udf.
only one npz and jpeg are generated and duplicated in all parquet rows and columns that why the parquet size is pretty small.
(our parquets are with similar structure but much heavier)
Please replace the bucket and prefix with your own:
import os
import io
from typing import List
import numpy as np
from PIL import Image
from randimage import ColoredPath, EPWTPath, GaussianBlobMask
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
JPEG_PATH = "image.jpeg"
S3_BUCKET = <YOUR-BUCKET>
S3_JPEG_PATH_PREFIX = <YOUR-PREFIX>
def save_img(img, path):
parent = os.path.dirname(path)
if parent:
os.makedirs(parent, exist_ok=True)
img.save(path)
def get_random_colored_image(
img_size,
): # Tried to make it geometrically representative to road paths
mask = GaussianBlobMask(img_size).get_mask()
path = EPWTPath(mask).get_path()
img = ColoredPath(path, img_size).get_colored_path()
return img
def get_random_grayscale_image(img_size=(480, 640), save_path=None) -> Image:
img = get_random_colored_image(img_size)
img = (img * 255).round().astype(np.uint8)
gray_img = Image.fromarray(img).convert("L")
if save_path:
save_img(gray_img, save_path)
return gray_img
def from_grayscale_img(path: str) -> List[np.ndarray]:
img = np.array(Image.open(path))
threshold = np.percentile(img.reshape(-1), 99)
mask = (img > threshold)
noise = np.random.rand(*img.shape)
y = (img + noise) * mask
buffer = io.BytesIO()
np.savez_compressed(buffer, arr=y.astype(np.float32))
return buffer.getvalue()
def upload_jpeg_to_s3():
s3 = boto3.client('s3')
s3.upload_file(JPEG_PATH, S3_BUCKET, S3_JPEG_PATH_PREFIX)
def generate_image_bytes_and_label():
get_random_grayscale_image(save_path=JPEG_PATH)
label_npz_bytes = from_grayscale_img(JPEG_PATH)
upload_jpeg_to_s3()
s3_uri = f"s3://{S3_BUCKET}/{S3_JPEG_PATH_PREFIX}"
return s3_uri, label_npz_bytes
if __name__ == "__main__":
image_s3_path, label_npz_bytes = generate_image_bytes_and_label()
# Duplicate the images and labels to make 7000 rows
num_rows = 7000
images = [image_s3_path for _ in range(num_rows)]
labels = [label_npz_bytes for _ in range(num_rows)]
# It is approximately the number of our images and labels
image_columns = {f'image_{i+1}': images for i in range(24)}
label_columns = {f'label_{i+1}': labels for i in range(36)}
df = pd.DataFrame({**image_columns, **label_columns})
schema = pa.schema([
(f'image_{i+1}', pa.string()) for i in range(24)
] + [
(f'label_{i+1}', pa.binary()) for i in range(36)
])
table = pa.Table.from_pandas(df, schema=schema)
row_group_size = 20 * 1024 * 1024 # 20MB in bytes
pq.write_table(table, 'images_labels_1.parquet', row_group_size=row_group_size)
pq.write_table(table, 'images_labels_2.parquet', row_group_size=row_group_size)
print("Parquet files 'images_labels_1.parquet' and 'images_labels_2.parquet' have been created successfully.")
Run script:
Replace the s3 parquets path and if needed the endpoint_url and profile_name (im using onprem storage)
import io
import os
import time
import numpy as np
import daft
from daft.daft import S3Config, IOConfig, io_glob
train_path = <REPLACE WITH YOUR OWN S3 PATH>
io_config = IOConfig(s3=S3Config(endpoint_url=<IF NEEDED>, profile_name=<IF NEEDED>))
daft.set_execution_config(enable_native_executor=True,
default_morsel_size=8,
parquet_inflation_factor=100)
os.environ["DAFT_ANALYTICS_ENABLED"] = "0"
column_names = [f"image_{i}" for i in range(1, 25)] + [f"label_{i}" for i in range(1, 37)]
parquet_filestats = io_glob(f"{train_path}**/*.parquet", io_config=io_config)
parquet_paths = [file['path'] for file in parquet_filestats]
df = daft.read_parquet(parquet_paths, io_config=io_config)
def load_npz(compressed_data):
buf = io.BytesIO(compressed_data)
data = np.load(buf)
nd_array = data['arr']
return nd_array
print(column_names)
for col_name in column_names[:24]:
df = df.with_column(col_name, df[col_name].url.download(on_error="null", io_config=io_config).image.decode())
for col_name in column_names[24:]:
df = df.with_column(col_name, df[col_name].apply(load_npz, return_dtype=daft.DataType.python()))
start_time = time.time()
counter = 0
for sample in df.iter_rows(8):
counter += 1
if counter % 100 == 0:
end_time = time.time()
elapsed_time = end_time - start_time
print(f"########## {counter} ##########")
print(f"Benchmark: Processed {counter} samples in {elapsed_time} seconds")
start_time = end_time
if counter >= 1000:
break
It helped to reduce duration and memory consumption
before editing:
273 threads spawned
with memray toped at 317GB allocations and duration 351 secs:
after compiling with NUM_CPUS=4:
lower number of threads spawned 145:
much lower memory consumption 5.1GB and duration (~50 sec) and some mem allocations are also getting freed:
Component(s)
Python Runner, Other
Additional context
We started to experience it on our ML pipelines since getdaft>0.3.9 probably 0.3.10/11. I provided a possible lead on what may cause this behavior but I am not 100% precent sure feel free to add your thoughts. 😁
Describe the bug
Our ML pipelines run on large instances in k8s cluster often large number of cpus is available 128/96 cpus. Looks like the new swordfish runtime has issues handling and scheduling so many parallel tasks and workers. Currently the number of workers for runtime is set to NUM_CPUS and also the max concurrency of other tasks and operations.
https://github.com/Eventual-Inc/Daft/blob/3394a6646eb2b16002bccf2df64e30006d7776e8/src/daft-local-execution/src/sources/scan_task.rs#L70
https://github.com/Eventual-Inc/Daft/blob/3394a6646eb2b16002bccf2df64e30006d7776e8/src/common/runtime/src/lib.rs#L20-L25
https://github.com/Eventual-Inc/Daft/blob/3394a6646eb2b16002bccf2df64e30006d7776e8/src/common/runtime/src/lib.rs#L136-L140
Resulting slow run speeds and excessive memory consumptions.
To Reproduce
I have a script to random generate data somewhat similar to the structure that we have, it includes 24 images stored as s3 links to jpegs and 36 label stored as npz bytes in the parquet and read later on with a udf.
only one npz and jpeg are generated and duplicated in all parquet rows and columns that why the parquet size is pretty small. (our parquets are with similar structure but much heavier)
Please replace the bucket and prefix with your own:
Run script: Replace the s3 parquets path and if needed the endpoint_url and profile_name (im using onprem storage)
Python 3.10.15 Ubuntu20.04 Daft local version from the following commit: https://github.com/Eventual-Inc/Daft/tree/274f300fb86413dfea11d38fba1fe93e7db2f02b
I worked on an instance with 128 cpus and 2TB RAM in a k8s cluster
Expected behavior
I played with daft and setting NUM_CPUS to smaller hardcoded value of 4 (also 8/16/32 worked fine) in:
https://github.com/Eventual-Inc/Daft/blob/3394a6646eb2b16002bccf2df64e30006d7776e8/src/common/runtime/src/lib.rs#L21-L21
and
https://github.com/Eventual-Inc/Daft/blob/3394a6646eb2b16002bccf2df64e30006d7776e8/src/daft-local-execution/src/lib.rs#L24-L26
It helped to reduce duration and memory consumption
before editing:
273 threads spawned
with memray toped at 317GB allocations and duration 351 secs:
after compiling with NUM_CPUS=4:
lower number of threads spawned 145:
much lower memory consumption 5.1GB and duration (~50 sec) and some mem allocations are also getting freed:
Component(s)
Python Runner, Other
Additional context
We started to experience it on our ML pipelines since
getdaft>0.3.9
probably 0.3.10/11. I provided a possible lead on what may cause this behavior but I am not 100% precent sure feel free to add your thoughts. 😁