huggingface / datatrove

Freeing data processing from scripting madness by providing a set of platform-agnostic customizable pipeline processing blocks.
Apache License 2.0
1.96k stars 139 forks source link

Multi-node parallelism on slurm clusters #290

Open shizhediao opened 3 days ago

shizhediao commented 3 days ago

Hi,

Let's say, I have a slurm cluster that contains 100 nodes, each node has 100 cores. Assuming I have 10000 tasks. This is my current code:

   dist_executor = SlurmPipelineExecutor(
        job_name=f"filter",
        pipeline=[
            JsonlReader(
                args.input_dataset,
                limit=args.data_limit,
                file_progress=True,
                doc_progress=True,
                text_key=args.text_key,
                glob_pattern="*.jsonl.zst",
            ),
            LanguageFilter(
                language_threshold=0.65, 
                languages=[Languages.english],
                exclusion_writer=JsonlWriter(
                    f"{LOCAL_PATH}/removed/1_non_english/",
                    output_filename="data/${rank}.jsonl",
                    compression=None
                    # folder structure: language/dump/file
                ),
            ),
            JsonlWriter(
                output_folder=f"{LOCAL_PATH}/{args.output_name}/intermediate_filtered",
                output_filename="${rank}.jsonl",
                compression=None
            ),
        ],
        tasks=args.n_tasks,
        workers=args.n_workers,
        time="20:00:00",
        partition=args.partition1,
        account=args.account,
        logging_dir=f"{LOCAL_LOGS_PATH}/intermediate_filtered",
        slurm_logs_folder=f"{LOCAL_LOGS_PATH}/intermediate_filtered_slurm_logs",
        randomize_start_duration=180,  # don't hit the bucket all at once with the list requests
        cpus_per_task=12,
        # qos="high",
        mem_per_cpu_gb=3,
    )

I find that the workers is the number of nodes instead of the number of CPU cores. Is my understanding correct? Then, it seems to me that my 10000 tasks will be executed node by node. For example, the node_1 will process task_1 and then task_101, then task_201. It does not fully utilize all the CPU cores in a single node. I expect all the tasks (task_1, task_101, task_201 ...) to be assigned to node_1 at the beginning and executed parallelly.

If I want to distribute my jobs to each node and fully utilize every core, what should I change to my code?

Thanks!

ShayDuane commented 11 hours ago

I am encountering an issue where, even though I specify nodelist=node41,node42 and allocate 2 nodes, the task ranks do not seem to be shared across the nodes. Instead, each node appears to execute the same rank tasks. Below are the logs:

+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /shared/home/dsq/test_log/executor.pik
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node41
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 📷 MINT
0: 💽 - WRITER: 📷 MINTWriter
0: 2024-09-25 16:54:30.608 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node42
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:log_pipeline:90 - 
1: --- 🛠️ PIPELINE 🛠
1: 📖 - READER: 📷 MINT
1: 💽 - WRITER: 📷 MINTWriter
1: 2024-09-25 16:54:35.185 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
0: 2024-09-25 16:58:20.410 | SUCCESS  | vldata.executor.base:_run_for_rank:98 - Processing done for rank=0
0: 2024-09-25 16:58:20.417 | INFO     | vldata.executor.base:_run_for_rank:104 - 
0: 
0: 📉📉📉 Stats: Task 0 📉📉📉

This results in an inability to achieve true parallel computation across nodes.

guipenedo commented 10 hours ago

Hi, do you know if your cluster is configured to always run 1 task per node? Ideally it should be possible for different tasks to share cpu resources on the same node and not always take a node exclusively

guipenedo commented 10 hours ago

I am encountering an issue where, even though I specify nodelist=node41,node42 and allocate 2 nodes, the task ranks do not seem to be shared across the nodes. Instead, each node appears to execute the same rank tasks. Below are the logs:

+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /shared/home/dsq/test_log/executor.pik
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node41
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 📷 MINT
0: 💽 - WRITER: 📷 MINTWriter
0: 2024-09-25 16:54:30.608 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node42
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:log_pipeline:90 - 
1: --- 🛠️ PIPELINE 🛠
1: 📖 - READER: 📷 MINT
1: 💽 - WRITER: 📷 MINTWriter
1: 2024-09-25 16:54:35.185 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
0: 2024-09-25 16:58:20.410 | SUCCESS  | vldata.executor.base:_run_for_rank:98 - Processing done for rank=0
0: 2024-09-25 16:58:20.417 | INFO     | vldata.executor.base:_run_for_rank:104 - 
0: 
0: 📉📉📉 Stats: Task 0 📉📉📉

This results in an inability to achieve true parallel computation across nodes.

can you show the config you are using?

ShayDuane commented 10 hours ago

I am encountering an issue where, even though I specify nodelist=node41,node42 and allocate 2 nodes, the task ranks do not seem to be shared across the nodes. Instead, each node appears to execute the same rank tasks. Below are the logs:

+ PYTHONUNBUFFERED=TRUE
+ srun -l launch_pickled_pipeline /shared/home/dsq/test_log/executor.pik
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node41
0: 2024-09-25 16:54:30.605 | INFO     | vldata.utils.logging:log_pipeline:90 - 
0: --- 🛠️ PIPELINE 🛠
0: 📖 - READER: 📷 MINT
0: 💽 - WRITER: 📷 MINTWriter
0: 2024-09-25 16:54:30.608 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:add_task_logger:58 - Launching pipeline for rank=0 in node42
1: 2024-09-25 16:54:35.178 | INFO     | vldata.utils.logging:log_pipeline:90 - 
1: --- 🛠️ PIPELINE 🛠
1: 📖 - READER: 📷 MINT
1: 💽 - WRITER: 📷 MINTWriter
1: 2024-09-25 16:54:35.185 | INFO     | vldata.pipeline.readers.base:read_files_shard:193 - Reading input file part1/arXiv_src_2311_021_wds.tar, 1/1
0: 2024-09-25 16:58:20.410 | SUCCESS  | vldata.executor.base:_run_for_rank:98 - Processing done for rank=0
0: 2024-09-25 16:58:20.417 | INFO     | vldata.executor.base:_run_for_rank:104 - 
0: 
0: 📉📉📉 Stats: Task 0 📉📉📉

This results in an inability to achieve true parallel computation across nodes.

can you show the config you are using?

here https://github.com/huggingface/datatrove/issues/292 thanks!