huggingface / datatrove

Freeing data processing from scripting madness by providing a set of platform-agnostic customizable pipeline processing blocks.
Apache License 2.0
1.99k stars 139 forks source link

Error when running exact_substrings #76

Closed jordane95 closed 8 months ago

jordane95 commented 8 months ago

I follow the instructions in the code to use the script in this repo for building suffix array and generate byterange. But I get the following error when running step3.

(/home/user/env/datatrove) dev-dialogue-gpu-8k# python exact_substrings_test.py 
2024-02-01 11:50:52.260 | INFO     | datatrove.utils.logging:add_task_logger:24 - Launching pipeline for rank=0
2024-02-01 11:50:52.261 | INFO     | datatrove.utils.logging:log_pipeline:37 - 
--- 🛠️ PIPELINE 🛠
🫂 - DEDUP: 🪞 - exact-substrings stage 3
2024-02-01 11:50:52.262 | INFO     | datatrove.pipeline.dedup.exact_substrings:get_sequence_bytes_offset:182 - self.rank=0, -> self.sequence_bytes_offset[self.rank]=0
2024-02-01 11:50:52.387 | INFO     | datatrove.pipeline.readers.base:read_files_shard:95 - Reading input file part-00000-sample.jsonl
part-00000-sample.jsonl
2024-02-01 11:50:52.385 | ERROR    | datatrove.executor.base:_run_for_rank:74 - One or more duplicate ranges have not been used
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [14, 15, 16, 19, 20, 21]
           │          └ 9
           └ <function _serve_one at 0x7f7cdf776e60>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 5
           │     │     └ 9
           │     └ <function _main at 0x7f7cdf776170>
           └ <module 'multiprocess.spawn' from '/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/...
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 5
           │    └ <function BaseProcess._bootstrap at 0x7f7cdfa89cf0>
           └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f7cdfa89360>
    └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7f7cdf4ec1f0>, <multiprocess.queues.SimpleQueue object at 0x7f7c3cd22170>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
    │    └ <function worker at 0x7f7c3cd1b490>
    └ <ForkServerProcess name='ForkServerPoolWorker-5' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (1,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 1
           │    │             └ 1
           │    └ <function PipelineExecutor._run_for_rank at 0x7f7cde9b84c0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f7c3cd22350>

> File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DedupReader.run at 0x7f7c3cd6c970>
    └ <class 'collections.deque'>

  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
           │    └ False
           └ 🫂 - DEDUP: 🪞 - exact-substrings stage 3

AssertionError: One or more duplicate ranges have not been used
2024-02-01 11:50:52.441 | ERROR    | datatrove.executor.base:_run_for_rank:74 - One or more duplicate ranges have not been used
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [13, 14, 15, 16, 19, 20]
           │          └ 9
           └ <function _serve_one at 0x7f7cdf776e60>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 5
           │     │     └ 9
           │     └ <function _main at 0x7f7cdf776170>
           └ <module 'multiprocess.spawn' from '/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/...
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 5
           │    └ <function BaseProcess._bootstrap at 0x7f7cdfa89cf0>
           └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f7cdfa89360>
    └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7f7cdf4ec1f0>, <multiprocess.queues.SimpleQueue object at 0x7f7c3cd22170>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
    │    └ <function worker at 0x7f7c3cd1b490>
    └ <ForkServerProcess name='ForkServerPoolWorker-4' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (2,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 2
           │    │             └ 2
           │    └ <function PipelineExecutor._run_for_rank at 0x7f7cde9b84c0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f7c3cd22350>

> File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DedupReader.run at 0x7f7c3cd6c970>
    └ <class 'collections.deque'>

  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
           │    └ False
           └ 🫂 - DEDUP: 🪞 - exact-substrings stage 3

AssertionError: One or more duplicate ranges have not been used
2024-02-01 11:50:52.464 | ERROR    | datatrove.executor.base:_run_for_rank:74 - One or more duplicate ranges have not been used
Traceback (most recent call last):

  File "<string>", line 1, in <module>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 273, in main
    code = _serve_one(child_r, fds,
           │          │        └ [12, 13, 14, 15, 16, 19]
           │          └ 9
           └ <function _serve_one at 0x7f7cdf776e60>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/forkserver.py", line 312, in _serve_one
    code = spawn._main(child_r, parent_sentinel)
           │     │     │        └ 5
           │     │     └ 9
           │     └ <function _main at 0x7f7cdf776170>
           └ <module 'multiprocess.spawn' from '/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/...
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/spawn.py", line 129, in _main
    return self._bootstrap(parent_sentinel)
           │    │          └ 5
           │    └ <function BaseProcess._bootstrap at 0x7f7cdfa89cf0>
           └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 314, in _bootstrap
    self.run()
    │    └ <function BaseProcess.run at 0x7f7cdfa89360>
    └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    │    │        │    │        │    └ {}
    │    │        │    │        └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
    │    │        │    └ (<multiprocess.queues.SimpleQueue object at 0x7f7cdf4ec1f0>, <multiprocess.queues.SimpleQueue object at 0x7f7c3cd22170>, None...
    │    │        └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
    │    └ <function worker at 0x7f7c3cd1b490>
    └ <ForkServerProcess name='ForkServerPoolWorker-3' parent=1625393 started daemon>
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    │     │       └ {}
                    │     └ (3,)
                    └ functools.partial(<bound method LocalPipelineExecutor._launch_run_for_rank of <datatrove.executor.local.LocalPipelineExecutor...

  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
           │    │             │     └ 3
           │    │             └ 3
           │    └ <function PipelineExecutor._run_for_rank at 0x7f7cde9b84c0>
           └ <datatrove.executor.local.LocalPipelineExecutor object at 0x7f7c3cd22350>

> File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
    │     └ <generator object DedupReader.run at 0x7f7c3cd6c970>
    └ <class 'collections.deque'>

  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
           │    └ False
           └ 🫂 - DEDUP: 🪞 - exact-substrings stage 3

AssertionError: One or more duplicate ranges have not been used
2024-02-01 11:50:52.495 | INFO     | datatrove.executor.local:_launch_run_for_rank:51 - 1/4 tasks completed.
multiprocess.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 46, in _launch_run_for_rank
    return self._run_for_rank(rank, local_rank)
  File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 75, in _run_for_rank
    raise e
  File "/home/user/code/datatrove/src/datatrove/executor/base.py", line 62, in _run_for_rank
    deque(pipelined_data, maxlen=0)
  File "/home/user/code/datatrove/src/datatrove/pipeline/dedup/exact_substrings.py", line 344, in run
    assert self.exhausted_ranges, "One or more duplicate ranges have not been used"
AssertionError: One or more duplicate ranges have not been used
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/code/datatrove/examples/exact_substrings_test.py", line 96, in <module>
    run_step_3()
  File "/home/user/code/datatrove/examples/exact_substrings_test.py", line 91, in run_step_3
    print(executor_3.run())
  File "/home/user/code/datatrove/src/datatrove/executor/local.py", line 80, in run
    stats = list(
  File "/home/user/env/datatrove/lib/python3.10/site-packages/multiprocess/pool.py", line 873, in next
    raise value
AssertionError: One or more duplicate ranges have not been used
jordane95 commented 8 months ago

Again, I think this bug relates to the corner case where one worker is idle and did nothing in the for loop before to change the exhausted_ranges status...

jordane95 commented 8 months ago

Fixed by PR https://github.com/huggingface/datatrove/pull/73