Medical-Event-Data-Standard / meds_etl

A collection of ETLs from common data formats to Medical Event Data Standard
Apache License 2.0
16 stars 3 forks source link

Polars streaming is unreliable for the OMOP ETL #29

Closed EthanSteinberg closed 2 weeks ago

EthanSteinberg commented 1 month ago

I have not been able to reproduce, but I have received two reports that the streaming engine in Polars is failing for parquet files when using the OMOP ETL.

The precise error message is the following:

  File "/home/shreya99/miniconda3/envs/ehrshot_env/lib/python3.10/site-packages/meds_etl/omop.py", line 320, in write_event_data

    raise e

  File "/home/shreya99/miniconda3/envs/ehrshot_env/lib/python3.10/site-packages/meds_etl/omop.py", line 315, in write_event_data

    event_data.sink_parquet(fname, compression="zstd", compression_level=1, maintain_order=False)

  File "/home/shreya99/.local/lib/python3.10/site-packages/polars/_utils/unstable.py", line 59, in wrapper

    return function(*args, **kwargs)

  File "/home/shreya99/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 2070, in sink_parquet

    return lf.sink_parquet(

polars.exceptions.InvalidOperationError: sink_Parquet(ParquetWriteOptions { compression: Zstd(Some(ZstdLevel(1))), statistics: true, row_group_size: None, data_pagesize_limit: None, maintain_order: false }) not yet supported in standard engine. Use 'collect().write_parquet()'

This needs to be fixed, if only by switching to the workaround collect().write_parquet().

egillax commented 1 month ago

I just ran into this. For me the offending operation was here:

  backup_value = (
                pl.when((source_concept_id == 0) & (concept_id_value != 0))
                .then(
                    # Source concept 0 indicates we need a backup value since it's not captured by the source
                    "SOURCE_CODE/"
                    + pl.col(concept_id_field.replace("_concept_id", "_source_value"))
                )
                .otherwise(
                    # Should be captured by the source concept id, so just map the value to a string.
                    concept_id_value.map_dict(concept_name_map)
                )
            )

By removing the otherwise I could sink to parquet, likewise if I changed the otherwise to have only pl.lit(1)

Collecting and then writing to parquet works fine.

EthanSteinberg commented 2 weeks ago

Fixed