Closed mmcdermott closed 1 month ago
[!IMPORTANT]
Review skipped
Auto reviews are disabled on base/target branches other than the default branch.
Please check the settings in the CodeRabbit UI or the
.coderabbit.yaml
file in this repository. To trigger a single review, invoke the@coderabbitai review
command.You can disable this status message by setting the
reviews.review_status
tofalse
in the CodeRabbit configuration file.
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?
Test Failures Detected: Due to failing tests, we cannot provide coverage reports at this time.
Completed 77 tests with 1 failed
, 76 passed and 0 skipped.
want = shape: (12, 13)
┌───────────────────────┬────────────────────┬─────────────────┬──────────────────────┬───────────────...─────┴────────────┴────────────┴─────────────────────────────────┴──────────────────┴─────────────────────────────────┘
got = shape: (12, 13)
┌───────────────────────┬────────────────────┬─────────────────┬──────────────────────┬───────────────...─────┴────────────┴────────────┴─────────────────────────────────┴─────────────────────────────────┴──────────────────┘
msg = 'Expected the dataframe at .../output_cohort/metadata/codes.parquet to be equal to the target.\nScript st...etadata\x1b[0m:\x1b[36mrun_map_reduce\x1b[0m:\x1b[36m721\x1b[0m - \x1b[1mFinished reduction in 0:00:00.035312\x1b[0m\n'
kwargs = {'check_column_order': False, 'check_row_order': True}
def assert_df_equal(want: pl.DataFrame, got: pl.DataFrame, msg: str = None, **kwargs):
try:
> assert_frame_equal(want, got, **kwargs)
tests/utils.py:172:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (shape: (12, 13)
┌───────────────────────┬────────────────────┬─────────────────┬──────────────────────┬──────────────...────┴────────────┴────────────┴─────────────────────────────────┴─────────────────────────────────┴──────────────────┘)
kwargs = {'check_column_order': False, 'check_row_order': True}
@wraps(function)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
_rename_keyword_argument(
old_name, new_name, kwargs, function.__qualname__, version
)
> return function(*args, **kwargs)
E AssertionError: DataFrames are different (dtypes do not match)
E [left]: {'code': String, 'code/n_occurrences': UInt8, 'code/n_patients': UInt8, 'values/n_occurrences': UInt8, 'values/n_patients': UInt8, 'values/sum': Float32, 'values/sum_sqd': Float32, 'values/n_ints': UInt8, 'values/min': Float32, 'values/max': Float32, 'description': String, 'parent_codes': List(String), 'values/quantiles': Struct({'values/quantile/0.25': Float64, 'values/quantile/0.5': Float64, 'values/quantile/0.75': Float64})}
E [right]: {'code': String, 'code/n_occurrences': UInt8, 'code/n_patients': UInt8, 'values/n_occurrences': UInt8, 'values/n_patients': UInt8, 'values/sum': Float32, 'values/sum_sqd': Float32, 'values/n_ints': UInt8, 'values/min': Float32, 'values/max': Float32, 'values/quantiles': Struct({'values/quantile/0.25': Float32, 'values/quantile/0.5': Float32, 'values/quantile/0.75': Float32}), 'description': String, 'parent_codes': List(String)}
.../hostedtoolcache/Python/3.12.4....../x64/lib/python3.12.../polars/_utils/deprecation.py:91: AssertionError
The above exception was the direct cause of the following exception:
def test_aggregate_code_metadata():
> single_stage_transform_tester(
transform_script=AGGREGATE_CODE_METADATA_SCRIPT,
stage_name="aggregate_code_metadata",
transform_stage_kwargs={"aggregations": AGGREGATIONS, "do_summarize_over_all_codes": True},
want_outputs=WANT_OUTPUT_CODE_METADATA_FILE,
code_metadata=MEDS_CODE_METADATA_FILE,
do_use_config_yaml=True,
)
tests/test_aggregate_code_metadata.py:179:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/transform_tester_base.py:391: in single_stage_transform_tester
check_df_output(cohort_metadata_dir / "codes.parquet", want_outputs, stderr, stdout)
tests/transform_tester_base.py:289: in check_df_output
assert_df_equal(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
want = shape: (12, 13)
┌───────────────────────┬────────────────────┬─────────────────┬──────────────────────┬───────────────...─────┴────────────┴────────────┴─────────────────────────────────┴──────────────────┴─────────────────────────────────┘
got = shape: (12, 13)
┌───────────────────────┬────────────────────┬─────────────────┬──────────────────────┬───────────────...─────┴────────────┴────────────┴─────────────────────────────────┴─────────────────────────────────┴──────────────────┘
msg = 'Expected the dataframe at .../output_cohort/metadata/codes.parquet to be equal to the target.\nScript st...etadata\x1b[0m:\x1b[36mrun_map_reduce\x1b[0m:\x1b[36m721\x1b[0m - \x1b[1mFinished reduction in 0:00:00.035312\x1b[0m\n'
kwargs = {'check_column_order': False, 'check_row_order': True}
def assert_df_equal(want: pl.DataFrame, got: pl.DataFrame, msg: str = None, **kwargs):
try:
assert_frame_equal(want, got, **kwargs)
except AssertionError as e:
pl.Config.set_tbl_rows(-1)
print(f"DFs are not equal: {msg}\nwant:")
print(want)
print("got:")
print(got)
> raise AssertionError(f"{msg}\n{e}") from e
E AssertionError: Expected the dataframe at .../output_cohort/metadata/codes.parquet to be equal to the target.
E Script stdout:
E
E Script stderr:
E .../hostedtoolcache/Python/3.12.4....../x64/lib/python3.12.../hydra/_internal/defaults_list.py:251: UserWarning: In 'config': Defaults list is missing `_self_`. See https://hydra.cc/docs/1.2/upgrades/1.0_to_1.1/default_composition_order for more information
E warnings.warn(msg, UserWarning)
E #x1B[32m2024-08-14 14:37:24.412#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.utils#x1B[0m:#x1B[36mstage_init#x1B[0m:#x1B[36m72#x1B[0m - #x1B[1mRunning aggregate_code_metadata with the following configuration:
E input_dir: .../tmp/tmpug3vkmt6/MEDS_cohort
E cohort_dir: .../tmp/tmpug3vkmt6/output_cohort
E _default_description: 'This is a MEDS pipeline ETL. Please set a more detailed description
E at the top of your specific pipeline
E
E configuration file.'
E log_dir: ${stage_cfg.output_dir}/.logs
E do_overwrite: false
E seed: 1
E stages:
E - aggregate_code_metadata
E stage_configs:
E reshard_to_split:
E n_patients_per_shard: 50000
E filter_patients:
E min_events_per_patient: null
E min_measurements_per_patient: null
E add_time_derived_measurements:
E age:
E DOB_code: MEDS_BIRTH
E age_code: AGE
E age_unit: years
E time_of_day:
E time_of_day_code: TIME_OF_DAY
E endpoints:
E - 6
E - 12
E - 18
E - 24
E count_code_occurrences:
E aggregations:
E - code/n_occurrences
E - code/n_patients
E do_summarize_over_all_codes: true
E filter_measurements:
E min_patients_per_code: null
E min_occurrences_per_code: null
E fit_outlier_detection:
E aggregations:
E - values/n_occurrences
E - values/sum
E - values/sum_sqd
E occlude_outliers:
E stddev_cutoff: 4.5
E fit_normalization:
E aggregations:
E - code/n_occurrences
E - code/n_patients
E - values/n_occurrences
E - values/sum
E - values/sum_sqd
E fit_vocabulary_indices:
E is_metadata: true
E ordering_method: lexicographic
E output_dir: ${cohort_dir}
E reorder_measurements:
E ordered_code_patterns: &&&
E aggregate_code_metadata:
E aggregations:
E - code/n_occurrences
E - code/n_patients
E - values/n_occurrences
E - values/n_patients
E - values/sum
E - values/sum_sqd
E - values/n_ints
E - values/min
E - values/max
E - name: values/quantiles
E quantiles:
E - 0.25
E - 0.5
E - 0.75
E do_summarize_over_all_codes: true
E worker: 0
E polling_time: 300
E stage: ${current_script_name:}
E stage_cfg: ${oc.create:${populate_stage:${stage}, ${input_dir}, ${cohort_dir}, ${stages},
E ${stage_configs}}}
E etl_metadata:
E pipeline_name: &&&
E dataset_name: &&&
E dataset_version: &&&
E package_name: ${get_package_name:}
E package_version: ${get_package_version:}
E etl_metadata.pipeline_name: preprocess
E code_modifiers: &&&
E hydra.verbose: true
E #x1B[0m
E #x1B[32m2024-08-14 14:37:24.448#x1B[0m | #x1B[34m#x1B[1mDEBUG #x1B[0m | #x1B[36mMEDS_transforms.utils#x1B[0m:#x1B[36mstage_init#x1B[0m:#x1B[36m96#x1B[0m - #x1B[34m#x1B[1mStage config:
E aggregations:
E - code/n_occurrences
E - code/n_patients
E - values/n_occurrences
E - values/n_patients
E - values/sum
E - values/sum_sqd
E - values/n_ints
E - values/min
E - values/max
E - name: values/quantiles
E quantiles:
E - 0.25
E - 0.5
E - 0.75
E do_summarize_over_all_codes: true
E is_metadata: true
E data_input_dir: ....../tmpug3vkmt6/MEDS_cohort/data
E metadata_input_dir: ....../tmpug3vkmt6/MEDS_cohort/metadata
E output_dir: ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata
E reducer_output_dir: ....../tmpug3vkmt6/output_cohort/metadata
E train_only: true
E
E Paths: (checkbox indicates if it exists)
E - input_dir: ✅ ....../tmpug3vkmt6/MEDS_cohort/data
E - output_dir: ✅ ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata
E - metadata_input_dir: ✅ ....../tmpug3vkmt6/MEDS_cohort/metadata#x1B[0m
E #x1B[32m2024-08-14 14:37:24.483#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mshard_iterator#x1B[0m:#x1B[36m493#x1B[0m - #x1B[1mMapping computation over a maximum of 2 shards#x1B[0m
E #x1B[32m2024-08-14 14:37:24.483#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.mapper#x1B[0m:#x1B[36mmap_over#x1B[0m:#x1B[36m589#x1B[0m - #x1B[1mProcessing train split only via shard prefix. Not filtering with ....../tmpug3vkmt6/MEDS_cohort/metadata/patient_split.parquet.#x1B[0m
E #x1B[32m2024-08-14 14:37:24.500#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.mapper#x1B[0m:#x1B[36mmap_over#x1B[0m:#x1B[36m617#x1B[0m - #x1B[1mProcessing ....../tmpug3vkmt6/MEDS_cohort/data/train/1.parquet into ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/1.parquet#x1B[0m
E #x1B[32m2024-08-14 14:37:24.502#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m217#x1B[0m - #x1B[1mRegistered lock at 2024-08-14 14:37:24.502294. Double checking no earlier locks have been registered.#x1B[0m
E #x1B[32m2024-08-14 14:37:24.508#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m224#x1B[0m - #x1B[1mReading input dataframe from ....../tmpug3vkmt6/MEDS_cohort/data/train/1.parquet#x1B[0m
E #x1B[32m2024-08-14 14:37:24.509#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m226#x1B[0m - #x1B[1mRead dataset#x1B[0m
E #x1B[32m2024-08-14 14:37:24.511#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m238#x1B[0m - #x1B[1mWriting final output to ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/1.parquet#x1B[0m
E #x1B[32m2024-08-14 14:37:24.516#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m240#x1B[0m - #x1B[1mSucceeded in 0:00:00.013849#x1B[0m
E #x1B[32m2024-08-14 14:37:24.516#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m241#x1B[0m - #x1B[1mLeaving cache directory ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/.1.parquet_cache, but clearing lock at ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/.1.parquet_cache/locks/2024-08-14T14:37:24.502294.json#x1B[0m
E #x1B[32m2024-08-14 14:37:24.516#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.mapper#x1B[0m:#x1B[36mmap_over#x1B[0m:#x1B[36m617#x1B[0m - #x1B[1mProcessing ....../tmpug3vkmt6/MEDS_cohort/data/train/0.parquet into ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/0.parquet#x1B[0m
E #x1B[32m2024-08-14 14:37:24.517#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m217#x1B[0m - #x1B[1mRegistered lock at 2024-08-14 14:37:24.517531. Double checking no earlier locks have been registered.#x1B[0m
E #x1B[32m2024-08-14 14:37:24.518#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m224#x1B[0m - #x1B[1mReading input dataframe from ....../tmpug3vkmt6/MEDS_cohort/data/train/0.parquet#x1B[0m
E #x1B[32m2024-08-14 14:37:24.518#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m226#x1B[0m - #x1B[1mRead dataset#x1B[0m
E #x1B[32m2024-08-14 14:37:24.519#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m238#x1B[0m - #x1B[1mWriting final output to ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/0.parquet#x1B[0m
E #x1B[32m2024-08-14 14:37:24.521#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m240#x1B[0m - #x1B[1mSucceeded in 0:00:00.003860#x1B[0m
E #x1B[32m2024-08-14 14:37:24.521#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.utils#x1B[0m:#x1B[36mrwlock_wrap#x1B[0m:#x1B[36m241#x1B[0m - #x1B[1mLeaving cache directory ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/.0.parquet_cache, but clearing lock at ....../tmpug3vkmt6/output_cohort/aggregate_code_metadata/train/.0.parquet_cache/locks/2024-08-14T14:37:24.517531.json#x1B[0m
E #x1B[32m2024-08-14 14:37:24.521#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.mapreduce.mapper#x1B[0m:#x1B[36mmap_over#x1B[0m:#x1B[36m628#x1B[0m - #x1B[1mFinished mapping in 0:00:00.072798#x1B[0m
E #x1B[32m2024-08-14 14:37:24.522#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.aggregate_code_metadata#x1B[0m:#x1B[36mrun_map_reduce#x1B[0m:#x1B[36m695#x1B[0m - #x1B[1mStarting reduction process#x1B[0m
E #x1B[32m2024-08-14 14:37:24.522#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.aggregate_code_metadata#x1B[0m:#x1B[36mrun_map_reduce#x1B[0m:#x1B[36m702#x1B[0m - #x1B[1mAll map shards complete! Starting code metadata reduction computation.#x1B[0m
E .../src/MEDS_transforms/aggregate_code_metadata.py:672: PerformanceWarning: Determining the column names of a LazyFrame requires resolving its schema, which is a potentially expensive operation. Use `LazyFrame.collect_schema().names()` to get the column names without this warning.
E if agg not in df.columns:
E #x1B[32m2024-08-14 14:37:24.551#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.aggregate_code_metadata#x1B[0m:#x1B[36mrun_map_reduce#x1B[0m:#x1B[36m715#x1B[0m - #x1B[1mJoining to existing code metadata at ....../tmpug3vkmt6/MEDS_cohort/metadata/codes.parquet#x1B[0m
E .../src/MEDS_transforms/aggregate_code_metadata.py:717: PerformanceWarning: Determining the column names of a LazyFrame requires resolving its schema, which is a potentially expensive operation. Use `LazyFrame.collect_schema().names()` to get the column names without this warning.
E existing = existing.drop(*[c for c in existing.columns if c in set(reduced.columns) - set(join_cols)])
E #x1B[32m2024-08-14 14:37:24.558#x1B[0m | #x1B[1mINFO #x1B[0m | #x1B[36mMEDS_transforms.aggregate_code_metadata#x1B[0m:#x1B[36mrun_map_reduce#x1B[0m:#x1B[36m721#x1B[0m - #x1B[1mFinished reduction in 0:00:00.035312#x1B[0m
E
E DataFrames are different (dtypes do not match)
E [left]: {'code': String, 'code/n_occurrences': UInt8, 'code/n_patients': UInt8, 'values/n_occurrences': UInt8, 'values/n_patients': UInt8, 'values/sum': Float32, 'values/sum_sqd': Float32, 'values/n_ints': UInt8, 'values/min': Float32, 'values/max': Float32, 'description': String, 'parent_codes': List(String), 'values/quantiles': Struct({'values/quantile/0.25': Float64, 'values/quantile/0.5': Float64, 'values/quantile/0.75': Float64})}
E [right]: {'code': String, 'code/n_occurrences': UInt8, 'code/n_patients': UInt8, 'values/n_occurrences': UInt8, 'values/n_patients': UInt8, 'values/sum': Float32, 'values/sum_sqd': Float32, 'values/n_ints': UInt8, 'values/min': Float32, 'values/max': Float32, 'values/quantiles': Struct({'values/quantile/0.25': Float32, 'values/quantile/0.5': Float32, 'values/quantile/0.75': Float32}), 'description': String, 'parent_codes': List(String)}
tests/utils.py:179: AssertionError
Adds a full integration test to the aggregation and makes the mapper work properly in both
do_summarize_all
and not cases with quantile reduction.Closes #163 and #165