waymo-research / waymo-open-dataset

Waymo Open Dataset
https://www.waymo.com/open
Other
2.73k stars 618 forks source link

Optimize parquet files (sorted columns and row group size) #841

Open nlgranger opened 5 months ago

nlgranger commented 5 months ago

Fetching individual rows identified by sensor and timestamp in the parquet files is slow.

Simply re-encoding the files with better options can significantly improve the access time.

Baseline:

%%timeit
pq.read_table(
    root / split / "lidar" / f"{records[0]}.parquet",
    filters=[("key.frame_timestamp_micros", "=", 1550083485748805)],
)
1.65 s ± 81.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Sorted and row group size:

pq.write_table(
    lidar.sort_by([("key.frame_timestamp_micros", "ascending"), ("key.laser_name", "ascending")]),
    "/home/me/lidar.parquet",
    row_group_size=4,
    compression="BROTLI",
    sorting_columns=[pq.SortingColumn(2), pq.SortingColumn(3)]
)
%%timeit
pq.read_table(
    "/home/me/lidar.parquet",
    filters=[("key.laser_name", "=", 1), ("key.frame_timestamp_micros", "=", 1550083485748805)],
)
46.2 ms ± 1.24 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Note: Sorting by timestamp first, then sensor is 4 times faster than the opposite on my computer.

aryasenna commented 4 months ago

Hi @nlgranger, I'm also facing problem with slow access time (see #856) when I use Waymo dataset parquets directly for training

Sorting help, but this must be done on every parquet load, and goes without saying sorting themselves takes time.

Short of re-encoding 1.2TB worth of parquet files, do you perhaps have a better trick?

nlgranger commented 4 months ago

Nope, I just re-encoded.

aryasenna commented 4 months ago

Nope, I just re-encoded.

Ouch, how long did it take you to re-encode few TBs of parquets?

nlgranger commented 4 months ago

About a day on a 48 cpu server I think. It's not that slow. Make sure to enable brotli compression otherwise sparse data such as lidar return maps will take a huge space.

aryasenna commented 4 months ago

About a day on a 48 cpu server I think. It's not that slow. Make sure to enable brotli compression otherwise sparse data such as lidar return maps will take a huge space.

Hi, I tried your snippet but it's giving me some OOM error despite loads of free RAM.

Do you mind sharing your re-encoding script? Thank you very much in advance. 🙂

u3Izx9ql7vW4 commented 4 months ago

What's the rationale behind row_group_size of 4? This seems quite small and possibly inefficient

nlgranger commented 4 months ago

In random access, you will only ever need a single row of each group so you want it to be as small as possible. For point cloud data 4 rows is already in the order of a hundreds of kb so compression will be close to the maximum ratio it can achieve. Feel free to make your own tests and adjust to your taste.

aryasenna commented 4 months ago

About a day on a 48 cpu server I think. It's not that slow. Make sure to enable brotli compression otherwise sparse data such as lidar return maps will take a huge space.

Hi, I tried your snippet but it's giving me some OOM error despite loads of free RAM.

Do you mind sharing your re-encoding script? Thank you very much in advance. 🙂

Alright, I fixed my script.

With the threaded approach, re-sorting a few hundred GB parquets finished in just under 2 hours, so I think it's a reasonable tradeoff; of course, this depends on your I/O and CPU count (I happen to have access to fast disks and lots of CPU core).

This stuff does need lots of memory, I have to limit my thread count (example 8 seems to work with a 300 GB RAM cluster)

In case anyone needs it:

Re-encode Waymo Script ```python # Re-encode the parquet files in the Waymo dataset to ensure that they are sorted by the frame timestamp and sensor name # See: https://github.com/waymo-research/waymo-open-dataset/issues/841 # License: MIT import os # Suppress annoying TF warning, and we dont need GPU mem anyway os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true' import time import logging import pyarrow.parquet as pq from waymo_open_dataset import v2 from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Change to your own dataset directory dataset_dir = "" dataset_out_dir = "" # set the same as dataset_dir if you want to overwrite row_group_size = 4 thread_count = 8 # too many and you will get OOM kill # Add more if you need other modalities directories = { 'CameraImage': os.path.join(dataset_dir, v2.TAG_BY_COMPONENT[v2.CameraImageComponent]), 'LiDAR': os.path.join(dataset_dir, v2.TAG_BY_COMPONENT[v2.LiDARComponent]), } # List and context names context_names = {} for key, dir_path in directories.items(): context_names[key] = set(os.path.splitext(filename)[0] for filename in sorted(os.listdir(dir_path)) if filename.endswith('.parquet')) all_context_names = list(set.intersection(*context_names.values())) all_context_names = sorted(all_context_names) if not all_context_names: logger.error(f"No matching .parquet files found in {dataset_dir}") raise ValueError(f"No matching .parquet files found in {dataset_dir}") seq_counts = len(all_context_names) logger.info(f"Found {seq_counts} sequences in {dataset_dir} with complete lidar-camera correspondence (out of {len(context_names['CameraImage'])})") # Define sorting keys for each type sorting_keys = { 'CameraImage': [("key.frame_timestamp_micros", "ascending"), ("key.camera_name", "ascending")], 'LiDAR': [("key.frame_timestamp_micros", "ascending"), ("key.laser_name", "ascending")], } def process_file(context_name): """ Process the given file by reading the parquet table, sorting it based on the specified keys, and writing the sorted table to a new directory. Args: context_name (str): The name of the context file to be processed. Returns: str: The name of the processed context file. Raises: Exception: If there is an error during the processing. """ try: for key, dir_path in directories.items(): table = pq.read_table(os.path.join(dir_path, context_name + '.parquet')) sort_key = sorting_keys[key] table_sorted = table.sort_by(sort_key) new_dir_path = os.path.join(dataset_out_dir, v2.TAG_BY_COMPONENT[getattr(v2, key + 'Component')]) os.makedirs(new_dir_path, exist_ok=True) pq.write_table(table_sorted, os.path.join(new_dir_path, context_name + '.parquet'), row_group_size=row_group_size, compression="BROTLI", sorting_columns=[pq.SortingColumn(table.schema.names.index(column[0]), column[1] == "ascending") for column in sort_key] ) except Exception as e: logger.error(f"Error processing {context_name}: {e}") return context_name if __name__ == "__main__": os.makedirs(dataset_out_dir, exist_ok=True) start_time = time.time() # Run in thread this is IO-bound operation with ThreadPoolExecutor(max_workers=thread_count) as executor: list(tqdm(executor.map(process_file, all_context_names), total=len(all_context_names), desc="Re-encoding files", unit="parquet")) end_time = time.time() h, rem = divmod(end_time - start_time, 3600) m, s = divmod(rem, 60) logger.info(f"Processed {seq_counts} parquets in {int(h):02d}h {int(m):02d}m {s:.2f}s") ```