mlfoundations / dclm

DataComp for Language Models
MIT License
1.09k stars 95 forks source link

Ray Actor dies during tokenization process #24

Closed humzaiqbal closed 2 weeks ago

humzaiqbal commented 1 month ago

I'm running tokenization on a bunch of processed files and I find that the Ray Actor dies due to all references to it being removed

  File "/home/ubuntu/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/ray/_private/worker.py", line 2656, in get
    values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
  File "/home/ubuntu/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/ray/_private/worker.py", line 871, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(UserCodeException): ray::MapBatches(buffer_write)() (pid=345911, ip=10.1.16.209)
  File "/home/ubuntu/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/open_lm/datapreprocess/ray/tokenize_shuffle.py", line 544, in buffer_write
    return {x: [y] for x, y in buffer_writer_pool.get_next().items()}
  File "/home/ubuntu/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/ray/util/actor_pool.py", line 309, in get_next
    return ray.get(future)
ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
        class_name: BufferedShardWriter
        actor_id: f4dc4fd5a9a6d1d8a085c6e201000000
        pid: 1720701
        name: 367e67b756e8a199e62de7392cd8ab67aef847680ad88bdf13490848_buffer_writer_0
        namespace: 56ced544-c253-4038-8abe-89c64eb43193
        ip: 10.1.16.209
The actor is dead because all references to the actor were removed. Any thoughts on what may be causing this?

attaching some more in depth logs here ray-data.log

humzaiqbal commented 1 month ago

Update I was able to repro the same error running only on a single processed file as opposed to the 2.8K I was working with to start.

ch-shin commented 1 month ago

Adding some details

afang-story commented 3 weeks ago

Sorry for the late response.

If this is still an issue can you give some information on the instance or ray cluster you are using? Besides allow_imbalanced_write, have you also tried increasing num_writers_per_node?

Could you also provide more details, such as the command used and the input files for the single file input error reproduction?

afang-story commented 2 weeks ago

Closing for now. Feel free to reopen if this remains an issue.

ryoungj commented 4 days ago

Same issue here.

I am now testing a small scale by processing a small amount of files (~200M tokens in total) on a single node with 32 CPU cores and 96GB memory (a previous run on 5 nodes also failed). Setting allow_imbalanced_write could resolve the issue but increasing num_writers_per_node does not.

Any ideas to fix the issue without setting allow_imbalanced_write?

afang-story commented 4 days ago

could you try using this

https://github.com/revbucket/tokshuf-rust

ryoungj commented 3 days ago

I was looking through the code and the issue seems to be the buffer writer was not available after the function exits. Here is a walkaround by creating a global reference that seems to work:

global_buffer_writers = {}  # Global variable to store buffer writer references

def buffer_write(rows, folder, counter, buffer_size, num_writers_per_node):
    global global_buffer_writers

    node_id = ray.get_runtime_context().get_node_id()
    worker_id = ray.get_runtime_context().get_worker_id()

    # Create buffer writers if they don't exist for this node
    if node_id not in global_buffer_writers:
        global_buffer_writers[node_id] = []
        for k in range(num_writers_per_node):
            buffer_writer_name = f"{node_id}_buffer_writer_{k}"
            logger.info(f"Creating {buffer_writer_name}")
            buffer_writer = BufferedShardWriter.options(
                name=buffer_writer_name,
                get_if_exists=True,
                scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
                    node_id=node_id, soft=False
                ),
            ).remote(buffer_size=buffer_size)
            ray.get(counter.add_buffer_writer.remote(buffer_writer_name, buffer_writer))
            global_buffer_writers[node_id].append(buffer_writer)

    buffer_writer_pool = ray.util.ActorPool(global_buffer_writers[node_id])
    buffer_writer_pool.submit(lambda a, rows: a.write.remote(rows, folder, counter, buffer_size), rows)
    result = buffer_writer_pool.get_next()

    return {x: [y] for x, y in result.items()}

I am less familiar with ray so not sure if this will robustly work in more general cases. Would be great to hear your thoughts on this!

chenweize1998 commented 3 days ago

I think I've identified and resolved the issue. The problem appears to stem from the BufferedShardWriter somehow exiting before completing its writing task. This occurs in the following code:

https://github.com/mlfoundations/open_lm/blob/9bb92ef1689333534b7057942a20d18a46d1fa52/open_lm/datapreprocess/ray/tokenize_shuffle.py#L532-L538

To address this, I added the argument lifetime="detached" to the BufferedShardWriter options. With this modification, the tokenization process now runs normally.

For those encountering this issue, here's a quick fix:

  1. Locate the open_lm/datapreprocess/ray/tokenize_shuffle.py file in your Python site-packages environment.
  2. Modify the BufferedShardWriter initialization (lines 532-538) as follows:
buffer_writer = BufferedShardWriter.options(
    name=buffer_writer_name,
    get_if_exists=True,
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=node_id, soft=False
    ),
    lifetime="detached"
).remote(buffer_size=buffer_size)

This modification should resolve the issue and allow the tokenization process to complete successfully. It might not be the optimal solution and I don't know whether setting the lifetime to detached would incur other risks. But seems that it does resolve the problem.