This repository contains a set of functions and scripts for extraction to and transformation/pre-processing of MEDS-formatted data.
Completed functions include scripts and utilities for extraction of various forms of raw data into the MEDS format in a scalable, parallelizable manner, as well as general configuration management utilities for complex pipelines over MEDS data. In progress functions include more model-specific pre-processing steps for MEDS data.
Examples of these capabilities in action can be seen in the MIMIC-IV_Example
directory,
which contains a working, end-to-end examples to extract MEDS formatted data from MIMIC-IV v2.2. A working
example for eICU v2.0 is also present, though needs to be adapted to recent interface improvements. These
directories also have README.md
files with more detailed information on how to run the scripts in those
directories.
pip install MEDS-transforms
.pip install .
from the repository root.pip install MEDS-transforms[examples]
.pip install MEDS-transforms[local_parallelism]
.pip install MEDS-transforms[slurm_parallelism]
.pip install .[dev,tests]
.pip install MEDS-transforms[examples,local_parallelism]
.The fundamental design philosophy of this repository can be summarized as follows:
subject_id
,
time
, code
, numeric_value
columns.MIMIC-IV_Example
and
eICU_Example
directories for examples of how and where per-dataset code can be leveraged in concert
with the configurable aspects of the standardized MEDS extraction pipeline.This pipeline is intended to be used both as a total or partial standalone ETL pipeline for converting raw EHR data into the MEDS format (this operation is often much more standardized than model-specific pre-processing needs) and as a template for model-specific pre-processing pipelines.
The MEDS ETL and pre-processing pipelines are designed to be run in a modular, stage-based manner, with each
stage of the pipeline being run as a separate script. For a single pipeline, all scripts will take the same
arguments by leveraging the same Hydra configuration file, and to run multiple workers on a single stage in
parallel, the user can launch the same script multiple times without changing the arguments or configuration
file or (to facilitate multirun capabilities) by simply changing the worker
configuration value (this
configuration value is not used by anything except log file names), and the scripts will automatically handle
the parallelism and avoid duplicative work. This permits significant flexibility in how these pipelines can be
run.
submitit
launcher to automate the creation of appropriate scheduler worker jobs. Note this will typically
required a distributed file system to work correctly, as these scripts use manually created file locks to
avoid duplicative work.joblib
launcher. This can result in a
significant speedup depending on the machine configuration as sharding ensures that parallelism can be used
with minimal file read contention.Two of these methods of parallelism, in particular local-machine parallelism and slurm-based cluster
parallelism, are supported explicitly by this package through the use of the joblib
and submitit
Hydra
plugins and Hydra's multirun capabilities, which will be discussed in more detail below.
By following this design convention, each individual stage of the pipeline can be kept extremely simple (often each stage corresponds to a single short "dataframe" function), can be rigorously tested, can be cached after completion to permit easy re-suming or re-running of the pipeline, and permits extremely flexible and efficient (through parallelization) use of the pipeline in a variety of environments, all without imposing significant complexity, overhead, or computational dependencies on the user.
To see each of the scripts for the various pipelines, examine the scripts
directory. Each script will, when
run with the --help
flag, provide a detailed description of the script's purpose, arguments, and usage.
E.g.,
❯ MEDS_extract-shard_events --help
== MEDS/shard_events ==
MEDS/shard_events is a command line tool that provides an interface for running MEDS pipelines.
**Pipeline description:**
This pipeline extracts raw MEDS events in longitudinal, sparse form from an input dataset meeting select
criteria and converts them to the flattened, MEDS format. It can be run in its entirety, with controllable
levels of parallelism, or in stages. Arguments:
- `event_conversion_config_fp`: The path to the event conversion configuration file. This file defines
the events to extract from the various rows of the various input files encountered in the global input
directory.
- `input_dir`: The path to the directory containing the raw input files.
- `cohort_dir`: The path to the directory where the output cohort will be written. It will be written in
various subfolders of this dir depending on the stage, as intermediate stages cache their output during
computation for efficiency of re-running and distributing.
**Stage description:**
This stage shards the raw input events into smaller files for easier processing. Arguments:
- `row_chunksize`: The number of rows to read in at a time.
- `infer_schema_length`: The number of rows to read in to infer the schema (only used if the source
files are csvs)
Note that these stage scripts can be used for either a full pipeline or just as a component of a larger, user-defined process -- it is up to the user to decide how to leverage these scripts in their own work.
To use this repository as an importable library, the user should follow these steps:
MEDS_transform
utilities such as
MEDS_transform.mapreduce.mapper.map_over
to easily apply your transform over a sharded MEDS dataset.MEDS_transforms
configuration schema to enable easy configuration of your pipeline, by
importing the MEDS transforms configs via your hydra search path and using them as a base for your own
configuration files, enabling you to intermix your new stage configuration with the existing MEDS
transform stages.To use this repository as a template, the user should follow these steps:
scripts/preprocessing/collect_code_metadata.py
script for an example.configs/preprocessing.yaml
configuration file to assemble a configuration
file for the necessary stages of your pipeline. Identify in advance what dataset-specific information the
user will need to specify to run your pipeline (e.g., will they need links between dataset codes and
external ontologies? Will they need to specify select key-event concepts to identify in the data? etc.).
Proper pipeline design should enable running the pipeline across multiple datasets with minimal
dataset-specific information required, and such that that information can be specified in as easy a
manner as possible. Examples of how to do this are forthcoming.Assumptions:
subject_id
in the same type.code_metadata
file that
describes the codes in your data as necessary. This file is not used in the provided pre-processing
pipeline in this package, but is necessary for other uses of the MEDS data.Computational Resource Requirements:
The provided ETL consists of the following steps, which can be performed as needed by the user with whatever degree of parallelism is desired per step.
The ETL scripts all use Hydra for configuration management, leveraging the shared
configs/extraction.yaml
file for configuration. The user can override any of these settings in the normal
way for Hydra configurations.
If desired, appropriate scripts can be written and run at a per-subject shard level to convert between the flat format and any of the other valid nested MEDS format, but for now we leave that up to the user.
Input events extraction configurations are defined through a simple configuration file language, stored in YAML form on disk, which specified for a collection of events how the individual rows from the various input dataframes should be parsed into different event formats. The YAML file stores a simple dictionary with the following structure:
subject_id: $GLOBAL_SUBJECT_ID_OVERWRITE # Optional, if you want to overwrite the subject ID column name for
# all inputs. If not specified, defaults to "subject_id".
$INPUT_FILE_STEM:
subject_id: $INPUT_FILE_SUBJECT_ID # Optional, if you want to overwrite the subject ID column name for
# this input. IF not specified, defaults to the global subject ID.
$EVENT_NAME:
code:
- $CODE_PART_1
- $CODE_PART_2
... # These will be combined with "//" to form the final code.
time: $TIME
$MEDS_COLUMN_NAME: $RAW_COLUMN_NAME
...
...
...
In this structure, $INPUT_FILE_STEM
is the stem of the input file name, $EVENT_NAME
is the name of a
particular kind of event that can be extracted from the input file, $CODE
is the code for the event, either
as a constant string or (with the syntax "col($COLUMN)"
the name of the column in the raw data to be read to
get the code), and $TIME
is the time for the event, either as null
to indicate the event has a
null time (e.g., a static measurement) or with the "col($COLUMN)"
syntax refenced above, and all
subsequent key-value pairs are mappings from the MEDS column name to the raw column name in the input data.
Here, these mappings can only point to columns in the input data, not constant values, and the input data
columns must be either string or categorical types (in which case they will be converted to categorical) or
numeric types. You can see this extraction logic in the scripts/extraction/convert_to_sharded_events.py
file, in the extract_event
function.
See tests/test_extraction.py
for an example of the end-to-end ETL pipeline being run on synthetic data. This
script is a functional test that is also run with pytest
to verify correctness of the algorithm.
scripts/extraction/shard_events.py
shards the input data into smaller, event-level shards by splitting
raw files into chunks of a configurable number of rows. Files are split sequentially, with no regard for
data content or subject boundaries. The resulting files are stored in the subsharded_events
subdirectory of the output directory.scripts/extraction/split_and_shard_subjects.py
splits the subject population into ML splits and shards
these splits into subject-level shards. The result of this process is only a simple JSON
file
containing the subject IDs belonging to individual splits and shards. This file is stored in the
output_directory/splits.json
file.scripts/extraction/convert_to_sharded_events.py
converts the input, event-level shards into the MEDS
event format and splits them into subject-level sub-shards. So, the resulting files are sharded into
subject-level, then event-level groups and are not merged into full subject-level shards or appropriately
sorted for downstream use.scripts/extraction/merge_to_MEDS_cohort.py
merges the subject-level, event-level shards into full
subject-level shards and sorts them appropriately for downstream use. The resulting files are stored in
the output_directory/final_cohort
directory.Once the MEDS dataset is created, in needs to be effectively pre-processed for downstream use. This package contains a variety of pre-processing transformations and scripts that can be applied on diverse MEDS datasets in various ways to prepare them for downstream modeling. Broadly speaking, the pre-processing pipeline can be broken down into the following steps:
Filtering the dataset by criteria that do not require cross-subject analyses, e.g.,
Adding any extra events to the records that are necessary for downstream modeling, e.g.,
Iteratively (a) grouping the dataset by code
and associated code modifier columns and collecting
statistics on the numeric and categorical values for each code then (b) filtering the dataset down to
remove outliers or other undesired codes or values, e.g.,
Transforming the code space to appropriately include or exclude any additional measurement columns that
should be included during code grouping and modeling operations. The goal of this step is to ensure that
the only columns that need be processed going into the pre-processing, tokenization, and tensorization
stage are expressible in the code
and numeric_values
columns of the dataset, which helps
standardize further downstream use.
Normalizing the data to convert codes to indices and numeric values to the desired form (either categorical indices or normalized numeric values).
Tokenizing the data in time to create a pre-tensorized dataset with clear delineations between subjects, subject sequence elements, and measurements per sequence element (note that various of these delineations may be fully flat/trivial for unnested formats).
Tensorizing the data to permit efficient retrieval from disk of subject data for deep-learning modeling via PyTorch.
Much like how the entire MEDS ETL pipeline is controlled by a single configuration file, the pre-processing
pipeline is also controlled by a single configuration file, stored in configs/preprocessing.yaml
. Scripts
leverage this file once again through the Hydra configuration management system. Similar
to the ETL, this pipeline is designed to enable seamless parallelism and efficient use of resources simply by
running multiple copies of the same script on independent workers to process the data in parallel. "Reduction"
steps again need to happen in a single-threaded manner, but these steps are generally very fast and should not
be a bottleneck.
Tokenization is the process of producing dataframes that are arranged into the sequences that will eventually be processed by deep-learning methods. Generally, these dataframes will be arranged such that each row corresponds to a unique subject, with nested list-type columns corresponding either to events (unique timepoints), themselves with nested, list-type measurements, or to measurements (unique measurements within a timepoint) directly. Importantly, tokenized files are generally not ideally suited to direct ingestion by PyTorch datasets. Instead, they should undergo a tensorization process to be converted into a format that permits fast, efficient, scalable retrieval for deep-learning training.
Tensorization is the process of producing files of the tokenized, normalized sequences that permit efficient,
scalable deep-learning. Here, by efficiency, we mean that the file structure and arrangement should permit
the deep learning process to (1) begin smoothly after startup, without a long, data-ingestion phase, (2) be
organized such that individual items (e.g., in a __getitem__
call) can be retrieved quickly in a manner that
does not inhibit rapid training, and (3) be organized such that CPU and GPU resources are used efficiently
during training. Similarly, by scalability, we mean that the three desiderata above should hold true even as
the dataset size grows much larger---while total training time can increase, time to begin training, to
process the data per-item, and CPU/GPU resources required should remain constant, or only grow negligibly,
such as the cost of maintaining a larger index of subject IDs to file offsets or paths (though disk space will
of course increase).
Depending on one's performance needs and dataset sizes, there are 3 modes of deep learning training that can be used that warrant different styles of tensorization:
This mode of training does not scale to large datasets, and given the parallelizability of the data-loading phase, may or may not actually be significantly faster than other modes. It is not currently supported in this repository. TODO describe in more detail.
This mode of training has the data needed for any given PyTorch Dataset __getitem__
call retrieved from disk
on an as-needed basis. This mode is extremely scalable, because the entire dataset never need be
loaded or stored in memory in its entirety. When done properly, retrieving data from disk can be done in a
manner that is independent of the total dataset size as well, thereby rendering the load time similarly
unconstrained by total dataset size. This mode is also extremely flexible, because different cohorts can be
loaded from the same base dataset simply by changing which subjects and what offsets within subject data are
read on any given cohort, all without changing the base files or underlying code. However, this mode does
require ragged dataset collation which can be more resource intensive than pre-batched iteration, so it is
slower than the "Fixed-batch retrieval" approach. This mode is what is currently supported by this repository.
In this mode of training, batches are selected once (potentially over many epochs), the items making up those batches are selected, then their contents are frozen and written to disk in a fully tensorized, padded format. This enables one to merely load batched data from disk directly onto the GPU during training, which is the fastest possible way to train a model. However, this mode is less flexible than the other modes, as the batches are frozen during training and cannot be changed without re-tensorizing the dataset, meaning that every new cohort for training requires a new tensorization step. This mode is not currently supported by this repository.
The pipeline configuration file for both the provided extraction and pre-processing pipelines are structured
to permit both ease of understanding, flexibility for user-derived modifications, and ease of use in the
simple, file-in/file-out scripts that this repository promotes. How this works is that each pipeline
(extraction and pre-processing) defines one global configuration file which is used as the Hydra specification
for all scripts in that pipeline. This file leverages some generic pipeline configuration options, specified
in pipeline.yaml
and imported via the Hydra defaults:
list, but also defines a list of stages with
stage-specific configurations.
The user can specify the stage in question on the command line either manually (e.g., stage=stage_name
) or
allow the stage name to be inferred automatically from the script name. Each script receives both the global
configuration file but also a sub-configuration (within the stage_cfg
node in the received global
configuration) which is pre-populated with the stage-specific configuration for the stage in question and
automatically inferred input and output file paths (if not overwritten in the config file) based on the stage
name and its position in the overall pipeline. This makes it easy to leverage transformations and scripts
defined here in new configuration pipelines, simply by placing them as a stage in a broader pipeline in a
different configuration or order relative to other stages.
We support two (optional) hydra multirun job launchers for parallelizing ETL and pre-processing pipeline
steps: joblib
(for local parallelism) and
submitit
to launch things with slurm for cluster
parallelism.
To use either of these, you need to install additional optional dependencies:
pip install -e .[local_parallelism]
for joblib local parallelism support, orpip install -e .[slurm_parallelism]
for submitit cluster parallelism support.You can overwrite the stages
parameter on the command line to run a dynamic pipeline with just a subset of
options (the --cfg job --resolve
is just to make hydra show the induced, resolved config instead of trying
to run anything):
MEDS_transforms on reusable_interface [$⇡] is v0.0.1 via v3.12.4 via MEDS_fns
❯ MEDS_transform-normalization input_dir=foo cohort_dir=bar 'stages=["normalization", "tensorization"]' --cfg job --resolve