DataTrove is a library to process, filter and deduplicate text data at a very large scale. It provides a set of prebuilt commonly used processing blocks with a framework to easily add custom functionality.
DataTrove processing pipelines are platform-agnostic, running out of the box locally or on a slurm cluster. Its (relatively) low memory usage and multiple step design makes it ideal for large workloads, such as to process an LLM's training data.
Local, remote and other file systems are supported through fsspec.
pip install datatrove[FLAVOUR]
Available flavours (combine them with ,
i.e. [processing,s3]
):
all
installs everything: pip install datatrove[all]
io
dependencies to read warc/arc/wet
files and arrow/parquet formats: pip install datatrove[io]
processing
dependencies for text extraction, filtering and tokenization: pip install datatrove[processing]
s3
s3 support: pip install datatrove[s3]
cli
for command line tools: pip install datatrove[cli]
You can check the following examples:
gpt2
tokenizerpipeline
: a list of processing steps to execute (read data, filter, write to disk, etc)executor
: runs a specific pipeline on a given execution environment (slurm, multi cpu machine, etc)job
: the execution of a pipeline on a given executortask
: a job
is comprised of multiple tasks
, and these are used to parallelize execution, usually by having each task
process a shard
of data. Datatrove keeps track of which tasks have completed and when you relaunch only incomplete tasks will run.file
: an individual input file (.json, .csv, etc).
[!TIP] Note that each file will be processed by a single
task
. Datatrove does not automatically split a file into multiple parts, so to fully parallelize you should have multiple medium sized files rather than a single large file)
shard
: a group of input data (usually a group of file
s), which will be assigned to a specific task
. Each task
will process a different non overlapping shard
of data, from the full list of input filesworker
: compute resource that will execute a single task at a time, e.g., if you have 50 cpu cores you can run a LocalPipelineExecutor with workers=50
, to execute 50 tasks
simultaneously (one per cpu). Once a worker
is done with a task
, it will start processing another waiting task
[!TIP] Your number of
tasks
controls how much you can parallelize and also how much time each individual processing unit will take. If you have a small number of tasks (and they each therefore have to process a large number of files) and they fail, you will have to restart from scratch, whereas if you have a larger number of small tasks each failed task will take way less time to rerun.[!CAUTION] If your
tasks
>files
, some tasks will not process any data, so there usually isn't a point in settingtasks
to a number larger than `files.
Running a job
to process 10000 files
, on a machine with 100 cpu cores (workers
). If we choose to use 1000 tasks, each one will process a shard
of 10 files. workers=100
means that we can process 100 tasks at a time.
Each pipeline block processes data in the datatrove Document
format:
text
the actual text content for each sampleid
a unique id (string) for this samplemetadata
a dictionary where any additional info may be storedEach pipeline block takes a generator of Document
as input and returns another generator of Document
.
Document
Document
to disk/cloud in different formatsDocument
s based on specific rules/criteriaA pipeline is defined as a list of pipeline blocks. As an example, the following pipeline would read data from disk, randomly filter (remove) some documents and write them back to disk:
from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [ CSVReader( data_folder="/my/input/path" ), SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]
## Executors
Pipelines are platform-agnostic, which means that the same pipeline can smoothly run on different execution environments without any changes to its steps. Each environment has its own PipelineExecutor.
Some options common to all executors:
- `pipeline` a list consisting of the pipeline steps that should be run
- `logging_dir` a datafolder where log files, statistics and more should be saved. Do not reuse folders for different pipelines/jobs as this will overwrite your stats, logs and completions.
- `skip_completed` (_bool_, `True` by default) datatrove keeps track of completed tasks so that when you relaunch a job they can be skipped. Set this to `False` to disable this behaviour
- `randomize_start_duration` (_int_, `0` by default) the maximum number of seconds to delay the start of each task to prevent all tasks from starting simultaneously and potentially overloading the system.
Call an executor's `run` method to execute its pipeline.
> [!TIP]
> Datatrove keeps track of which tasks successfully completed by creating a marker (an empty file) in the `${logging_dir}/completions` folder. Once the job finishes, if some of its tasks have failed, you can **simply relaunch the exact same executor** and datatrove will check and only run the tasks that were not previously completed.
> [!CAUTION]
> If you relaunch a pipeline because some tasks failed, **do not change the total number of tasks** as this will affect the distribution of input files/sharding.
### LocalPipelineExecutor
This executor will launch a pipeline on a local machine.
Options:
- `tasks` total number of tasks to run
- `workers` how many tasks to run simultaneously. If `-1`, no limit. Anything `> 1` will use multiprocessing to execute the tasks.
- `start_method` method to use to spawn a multiprocessing Pool. Ignored if `workers` is 1
<details>
<summary>Example executor</summary>
```python
from datatrove.executor import LocalPipelineExecutor
executor = LocalPipelineExecutor(
pipeline=[
...
],
logging_dir="logs/",
tasks=10,
workers=5
)
executor.run()
This executor will launch a pipeline on a slurm cluster, using slurm job arrays to group and manage tasks. Options:
tasks
total number of tasks to run. required
time
slurm time limit string. required
partition
slurm partition. required
workers
how many tasks to run simultaneously. If -1
, no limit. Slurm will run workers
tasks at a time. (default: -1
)
job_name
slurm job name (default: "data_processing)
depends
another SlurmPipelineExecutor instance, which will be a dependency of this pipeline (current pipeline will only start executing after the depended on pipeline successfully completes)
sbatch_args
dictionary with any other arguments you would like to pass to sbatch
slurm_logs_folder
where to save the slurm log files. If using a local path for logging_dir
, they will be saved on logging_dir/slurm_logs
. If not, they will be saved as a subdir of the current directory.
cpus_per_task
how many cpus to give each task (default: 1
)
qos
slurm qos (default: "normal")
mem_per_cpu_gb
memory per cpu, in GB (default: 2)
env_command
custom command to activate a python environment, if needed
condaenv
conda environment to activate
venv_path
path to a python environment to activate
max_array_size
the MaxArraySize value in $ scontrol show config
. If number of tasks exceeds this number, it will split into multiple array jobs (default: 1001)
max_array_launch_parallel
if we need multiple jobs due to max_array_size, whether to launch them all in one go (parallel) or sequentially (default: False
)
stagger_max_array_jobs
when max_array_launch_parallel is True, this determines how many seconds to wait between launching each of the parallel jobs (default: 0
)
run_on_dependency_fail
start executing when a job we depend on finishes even if it has failed (default: False
)
randomize_start
randomize the start of each task in a job in a ~3 min window. Useful when heavily hitting an s3 bucket for example. (default: False
)
from datatrove.executor import SlurmPipelineExecutor
executor1 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job1",
logging_dir="logs/job1",
tasks=500,
workers=100, # omit to run all at once
time="10:00:00", # 10 hours
partition="hopper-cpu"
)
executor2 = SlurmPipelineExecutor(
pipeline=[
...
],
job_name="my_cool_job2",
logging_dir="logs/job2",
tasks=1,
time="5:00:00", # 5 hours
partition="hopper-cpu",
depends=executor1 # this pipeline will only be launched after executor1 successfully completes
)
# executor1.run()
executor2.run() # this will actually launch executor1, as it is a dependency, so no need to launch it explicitly
For a pipeline with logging_dir
mylogspath/exp1, the following folder structure would be created:
Log messages support colorization. By default, colorization will be auto detected for console messages and disabled for log files (logs/task_XXXXX.log). To explicitly enable or disable colorization, you may set the following environment variables:
DATATROVE_COLORIZE_LOGS
"1" to add ANSI colors to console log messages and "0" to disable colorization.DATATROVE_COLORIZE_LOG_FILES
set to "1" to add ANSI colors to log messages saved to logs/task_XXXXX.log.Datatrove supports a wide variety of input/output sources through fsspec.
There are a few ways to provide a path to a datatrove block (for input_folder
, logging_dir
, data_folder
and so on arguments):
str
: the simplest way is to pass a single string. Example: /home/user/mydir
, s3://mybucket/myinputdata
, hf://datasets/allenai/c4/en/
(str, fsspec filesystem instance)
: a string path and a fully initialized filesystem object. Example: ("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))
(str, dict)
: a string path and a dictionary with options to initialize a fs. Example (equivalent to the previous line): ("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})
DataFolder
: you can initialize a DataFolder object directly and pass it as an argument
Under the hood these argument combinations are parsed by get_datafolder
.
Usually, pipelines will start with a Reader block.
Most readers take a data_folder
argument — a path to a folder containing the data to be read.
These files will be distributed across each task. If you have N
tasks, task with rank i
(0-based) will process files i, i+N, i+2N, i+3N,...
.
Internally, each reader reads data and converts it into a dictionary before creating a Document
object.
Some options common to most readers:
text_key
the dictionary key containing the text content for each sample. Default: text
id_key
the dictionary key containing the id for each sample. Default: id
default_metadata
a dictionary for any default metadata values you would like to add (such as their source, for example)recursive
whether to look for files recursively in data_folder
's subdirectoriesglob_pattern
use this field to match specific files. For instance, glob_pattern="*/warc/*.warc.gz"
will match files with a .warc.gz
file extension on the warc/
folder of each of the data_folder
's subdirectoriesadapter
this function takes the raw dictionary obtained from the reader and returns a dictionary with Document
's field names. You may overwrite this function (_default_adapter) if you would like.limit
read only a certain number of samples. Useful for testing/debuggingYou can use extractors to extract text content from raw html. The most commonly used extractor in datatrove is Trafilatura, which uses the trafilatura library.
Filters are some of the most important blocks of any data processing pipeline. Datatrove's filter blocks take a Document
and return a boolean (True
to keep a document, False
to remove it). Removed samples do not continue to the next pipeline stage. You can also save the removed samples to disk by passing a Writer to the exclusion_writer
parameter.
Once you are done processing your data you will probably want to save it somewhere. For this you can use a writer.
Writers require an output_folder
(the path where data should be saved). You can choose the compression
to use (default: gzip
) and the filename to save each file as.
For the output_filename
, a template is applied using the following arguments:
${rank}
replaced with the current task's rank. Note that if this tag isn't present, different tasks may try to write to the same location${id}
replaced with the sample id${tag}
will be replaced with the corresponding document.metadata['tag']
valueAn example to separate samples by language based on their lang
metadata field:
JsonlWriter(
f"{MAIN_OUTPUT_PATH}/non_english/",
output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz", # folder structure: language/dump/file
)
For deduplication check the examples minhash_deduplication.py, sentence_deduplication.py and exact_substrings.py.
For summary statistics on your data you can use the Stats blocks. These blocks provide an easy way to collect data-profiles on your dataset in a distributed manner. It's a two step process in which you first:
1) For each shard iterate over documents and collect stats into of the following groupings summary
(all docs counted to "summary" key), fqdn
(fully qualified domain name grouping), suffix
(the last part of the url path grouping) or histogram
(value based grouping).
2) Merge the stats from different shards into a single file.
See the summary_stats.py for more details.
Each resulting stat is saved in a separate file with following structure: output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json
Each such file is a MetricStatsDict
object, which you can easily load using:
from datatrove.pipeline.stats.summary_stats import MetricStatsDict
import json
stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))
# E.g for total length of nytimes.com docs
stats["nytimes.com"].total
# Or for mean of cnn.com docs
stats["cnn.com"].mean
Following stats are available:
contamination_stats.py
: word_contamination_{words[0]}
: Frequency of words contamination in the document.doc_stats.py
: length
: Length of the document, white_space_ratio
: Ratio of whitespace characters, non_alpha_digit_ratio
: Ratio of non-alphabetic and non-digit characters, digit_ratio
: Ratio of digits, uppercase_ratio
: Ratio of uppercase letters, elipsis_ratio
: Ratio of elipsis characters, punctuation_ratio
: Punctuation ratiolang_stats.py
: fasttext_{language}
: Score of document being written in language
. Score is computed using FastText model.line_stats.py
: n_lines
: Number of lines per doc, avg_line_length
: Average length of line per doc, long_line_ratio_chars_{chars}
: Ratio of lines with more than k chars, short_line_ratio_chars_{chars}
: Ratio of lines with less than k chars, bullet_point_lines_ratio
: Ratio of line starting with bullet point, line_duplicates
: Ratio of lines that are duplicates, line_char_duplicates
: Ratio of chars in duplicated lines to all chars.paragraph_stats.py
: n_paragraphs
: Number of paragraphs, avg_paragraph_length
: Average paragraph length, short_paragraph_ratio_{chars}
: Ratio of short paragraphs (<{chars}
chars), long_paragraph_ratio_{chars}
: Ratio of long paragraphs (>{chars}
chars)perplexity_stats.py
: ccnet_perplexity_{model_dataset}_{language}
: Perplexity of the document using the CCNet model for {model}
on {dataset}
in {language}
sentence_stats.py
: n_sentences
: Number of sentences, avg_sentence_length
: Average sentence length, short_sentence_ratio_{chars}
: Ratio of short sentences (<{chars}
chars), long_sentence_ratio_{chars}
: Ratio of long sentences (>{chars}
chars)token_stats.py
:token_count
: Number of tokens in the documentword_stats.py
: n_words
: Number of words in the document, avg_word_length
: Average length of words in the document, avg_words_per_line
: Average number of words per line in the document, short_word_ratio_{chars}
: Ratio of words shorter than {chars}
characters, stop_word_ratio
: Ratio of stop words, long_word_ratio_{chars}
: Ratio of words longer than {chars}
characters, type_token_ratio
: Number of unique words / Number of tokens, capitalized_word_ratio
: Ratio of capitalized words, uppercase_word_ratio
: Ratio of uppercase wordsYou can pass an iterable of Document
directly as a pipeline block like so:
from datatrove.data import Document
from datatrove.pipeline.filters import SamplerFilter
from datatrove.pipeline.writers import JsonlWriter
pipeline = [
[
Document(text="some data", id="0"),
Document(text="some more data", id="1"),
Document(text="even more data", id="2"),
],
SamplerFilter(rate=0.5),
JsonlWriter(
output_folder="/my/output/path"
)
]
Do note, however, that this iterable will not be sharded (if you launch more than 1 task they will all get the full iterable). This is usually useful for small workloads/testing.
For simple processing you can simply pass in a custom function with the following signature:
from datatrove.data import DocumentsPipeline
def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""
`data` is a generator of Document. You must also return a generator of Document (yield)
You can optionally use `rank` and `world_size` for sharding
"""
for document in data:
document.text = document.text.upper()
yield document
pipeline = [
...,
uppercase_everything,
...
]
[!TIP] You might have some pickling issues due to the imports. If this happens, simply move whatever imports you need inside the function body.
You can also define a full block inheriting from PipelineStep
or one of its subclasses:
from datatrove.pipeline.base import PipelineStep
from datatrove.data import DocumentsPipeline
from datatrove.io import DataFolderLike, get_datafolder
class UppercaserBlock(PipelineStep):
def __init__(self, some_folder: DataFolderLike, some_param: int = 5):
super().__init__()
# you can take whatever parameters you need and save them here
self.some_param = some_param
# to load datafolders use get_datafolder()
self.some_folder = get_datafolder(some_folder)
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
# you could also load data from the `some_folder`:
for filepath in self.some_folder.get_shard(rank, world_size): # it also accepts a glob pattern, among other things
with self.some_folder.open(filepath, "rt") as f:
# do something
...
yield doc
#
# OR process data from previous blocks (`data`)
#
for doc in data:
with self.track_time():
# you can wrap the main processing code in `track_time` to know how much each document took to process
nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
# you can also keep track of stats per document using stat_update
self.stat_update("og_upper_letters", value=nr_uppercase_letters)
doc.text = doc.text.upper()
# make sure you keep the yield outside the track_time block, or it will affect the time calculation
yield doc
#
# OR save data to disk
#
with self.some_folder.open("myoutput", "wt") as f:
for doc in data:
f.write(doc...)
pipeline = [
...,
UppercaserBlock("somepath"),
...
]
You could also inherit from BaseExtractor
, BaseFilter
, BaseReader
/BaseDiskReader
, or DiskWriter
.
git clone git@github.com:huggingface/datatrove.git && cd datatrove
pip install -e ".[dev]"
Install pre-commit code style hooks:
pre-commit install
Run the tests:
pytest -sv ./tests/
@misc{penedo2024datatrove,
author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas},
title = {DataTrove: large scale data processing},
year = {2024},
publisher = {GitHub},
journal = {GitHub repository},
url = {https://github.com/huggingface/datatrove}
}