Medical-Event-Data-Standard / meds_etl

A collection of ETLs from common data formats to Medical Event Data Standard
Apache License 2.0
16 stars 3 forks source link

Tests fail #10

Closed rvandewater closed 4 months ago

rvandewater commented 5 months ago

For two tests (test_roundtrip, test_roundtrip_no_metadata), the multiprocessing context does not get initialized

__________________________________________________________________________________ test_roundtrip __________________________________________________________________________________
tmp_path = PosixPath('/tmp/pytest-of-root/pytest-1/test_roundtrip0')

    def test_roundtrip(tmp_path: pathlib.Path):
        meds_dataset = tmp_path / "meds"
        create_dataset(meds_dataset)
        patients = pq.read_table(meds_dataset / "data" / "patients.parquet").to_pylist()
        patients.sort(key=lambda a: a["patient_id"])

        roundtrip_helper(tmp_path, patients, "csv", 1)
        roundtrip_helper(tmp_path, patients, "compressed_csv", 1)
        roundtrip_helper(tmp_path, patients, "parquet", 1)

>       roundtrip_helper(tmp_path, patients, "csv", 4)

tests/test_flat.py:139:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
tests/test_flat.py:115: in roundtrip_helper
    meds_etl.flat.convert_meds_to_flat(str(meds_dataset), str(meds_flat_dataset), num_proc=num_proc, format=format)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

source_meds_path = '/tmp/pytest-of-root/pytest-1/test_roundtrip0/meds', target_flat_path = '/tmp/pytest-of-root/pytest-1/test_roundtrip0/meds_flat_csv_4', num_proc = 4
format = 'csv', time_format = '%Y-%m-%d %H:%M:%S%.f'

    def convert_meds_to_flat(
        source_meds_path: str,
        target_flat_path: str,
        num_proc: int = 1,
        format: Format = "compressed_csv",
        time_format: str = "%Y-%m-%d %H:%M:%S%.f",
    ) -> None:
        """Convert an entire MEDS dataset to MEDS Flat"""
        os.mkdir(target_flat_path)

        source_meds_data_path = os.path.join(source_meds_path, "data")
        source_files = os.listdir(source_meds_data_path)
        tasks = [os.path.join(source_meds_data_path, source_file) for source_file in source_files]

        if os.path.exists(os.path.join(source_meds_path, "metadata.json")):
            shutil.copyfile(
                os.path.join(source_meds_path, "metadata.json"), os.path.join(target_flat_path, "metadata.json")
            )

        target_flat_data_path = os.path.join(target_flat_path, "flat_data")
        os.mkdir(target_flat_data_path)

        converter = functools.partial(
            convert_file_to_flat,
            target_flat_data_path=target_flat_data_path,
            format=format,
            time_format=time_format,
        )

        if num_proc != 1:
>           with mp.Pool(processes=num_proc) as pool:
E           AttributeError: 'NoneType' object has no attribute 'Pool'

/root/miniconda3/envs/meds_etl/lib/python3.12/site-packages/meds_etl/flat.py:92: AttributeError
tompollard commented 5 months ago

Odd, I assume this is the issue at: https://github.com/Medical-Event-Data-Standard/meds_etl/actions/runs/7673982626/job/20917734572.

Not had a chance to look into this yet, but I'm unclear whether something has changed or whether the tests never passed to begin with.

tompollard commented 5 months ago

These tests were definitely passing in the past: https://github.com/Medical-Event-Data-Standard/meds_etl/actions/runs/7576283870/job/20634733112

Tests are passing for me locally:

tests/test_etl.py::test_hello_world PASSED                                                                                                                   [ 11%]
tests/test_etl.py::TestMimicETL::test_destination_contains_files PASSED                                                                                      [ 22%]
tests/test_etl.py::TestMimicETL::test_load_dataset PASSED                                                                                                    [ 33%]
tests/test_etl.py::TestMimicETL::test_number_of_patients PASSED                                                                                              [ 44%]
tests/test_etl.py::TestMimicETL::test_expected_features_exist PASSED                                                                                         [ 55%]
tests/test_etl.py::TestMimicETL::test_expected_data_types PASSED                                                                                             [ 66%]
tests/test_flat.py::test_roundtrip PASSED                                                                                                                    [ 77%]
tests/test_flat.py::test_roundtrip_no_metadata PASSED                                                                                                        [ 88%]
tests/test_flat.py::test_shuffle PASSED                                                                                                                      [100%]

My guess it is a versioning issue, perhaps related to the deprecation warning below:

tests/test_flat.py::test_shuffle
  /opt/hostedtoolcache/Python/3.10.13/x64/lib/python3.10/site-packages/meds_etl/flat.py:230: DeprecationWarning: `partition_by(..., as_dict=True)` will change to always return tuples as dictionary keys. Pass `by` as a list to silence this warning, e.g. `partition_by(['shard'], as_dict=True)`.
    table.select(
rvandewater commented 5 months ago

I am not quite sure if this issue related as I am getting the same error on your fixed branch. I am looking into it. Despite that I think I am getting file not found error on Python 3.10 which seems to be the same as in the CI (output below ) (with 3.12 I get the error above, due to the mpnot being initialized).

Running pytest with args: ['-p', 'vscode_pytest', '--rootdir', '/mnt/c/Users/Robin/Documents/Git/meds_etl', '/mnt/c/Users/Robin/Documents/Git/meds_etl/tests/test_flat.py::test_roundtrip']
============================= test session starts ==============================
platform linux -- Python 3.10.13, pytest-8.0.0, pluggy-1.4.0
rootdir: /mnt/c/Users/Robin/Documents/Git/meds_etl
collected 1 item

tests/test_flat.py F                                                     [100%]

=================================== FAILURES ===================================
________________________________ test_roundtrip ________________________________

tmp_path = PosixPath('/tmp/pytest-of-root/pytest-31/test_roundtrip0')

    def test_roundtrip(tmp_path: pathlib.Path):
        meds_dataset = tmp_path / "meds"
        create_dataset(meds_dataset)
        patients = pq.read_table(meds_dataset / "data" / "patients.parquet").to_pylist()
        patients.sort(key=lambda a: a["patient_id"])

>       roundtrip_helper(tmp_path, patients, "csv", 1)

tests/test_flat.py:136: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/test_flat.py:118: in roundtrip_helper
    meds_etl.flat.convert_flat_to_meds(str(meds_flat_dataset), str(meds_dataset2), num_proc=num_proc, num_shards=10)
/root/miniconda3/envs/meds_etl310/lib/python3.10/site-packages/meds_etl/flat.py:401: in convert_flat_to_meds
    csv_processor(task)
/root/miniconda3/envs/meds_etl310/lib/python3.10/site-packages/meds_etl/flat.py:295: in process_csv_file
    create_and_write_shards_from_table(batch, temp_dir, num_shards, time_formats, metadata_columns, fname)
/root/miniconda3/envs/meds_etl310/lib/python3.10/site-packages/meds_etl/flat.py:248: in create_and_write_shards_from_table
    shard.write_parquet(fname, compression="uncompressed")
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = shape: (120, 8)
┌────────────┬────────────┬────────────┬────────────┬────────────┬────────────┬────────────┬───────┐
│...m"}      ┆       │
└────────────┴────────────┴────────────┴────────────┴────────────┴────────────┴────────────┴───────┘
file = '/tmp/pytest-of-root/pytest-31/test_roundtrip0/meds2_csv_1/temp/(9,)/patients_1.parquet'

    def write_parquet(
        self,
        file: str | Path | BytesIO,
        *,
        compression: ParquetCompression = "zstd",
        compression_level: int | None = None,
        statistics: bool = False,
        row_group_size: int | None = None,
        data_page_size: int | None = None,
        use_pyarrow: bool = False,
        pyarrow_options: dict[str, Any] | None = None,
    ) -> None:
        """
        Write to Apache Parquet file.

        Parameters
        ----------
        file
            File path or writeable file-like object to which the result will be written.
        compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'}
            Choose "zstd" for good compression performance.
            Choose "lz4" for fast compression/decompression.
            Choose "snappy" for more backwards compatibility guarantees
            when you deal with older parquet readers.
        compression_level
            The level of compression to use. Higher compression means smaller files on
            disk.

            - "gzip" : min-level: 0, max-level: 10.
            - "brotli" : min-level: 0, max-level: 11.
            - "zstd" : min-level: 1, max-level: 22.

        statistics
            Write statistics to the parquet headers. This requires extra compute.
        row_group_size
            Size of the row groups in number of rows. Defaults to 512^2 rows.
        data_page_size
            Size of the data page in bytes. Defaults to 1024^2 bytes.
        use_pyarrow
            Use C++ parquet implementation vs Rust parquet implementation.
            At the moment C++ supports more features.
        pyarrow_options
            Arguments passed to `pyarrow.parquet.write_table`.

            If you pass `partition_cols` here, the dataset will be written
            using `pyarrow.parquet.write_to_dataset`.
            The `partition_cols` parameter leads to write the dataset to a directory.
            Similar to Spark's partitioned datasets.

        Examples
        --------
        >>> import pathlib
        >>>
        >>> df = pl.DataFrame(
        ...     {
        ...         "foo": [1, 2, 3, 4, 5],
        ...         "bar": [6, 7, 8, 9, 10],
        ...         "ham": ["a", "b", "c", "d", "e"],
        ...     }
        ... )
        >>> path: pathlib.Path = dirpath / "new_file.parquet"
        >>> df.write_parquet(path)

        We can use pyarrow with use_pyarrow_write_to_dataset=True
        to write partitioned datasets. The following example will
        write the first row to ../watermark=1/*.parquet and the
        other rows to ../watermark=2/*.parquet.

        >>> df = pl.DataFrame({"a": [1, 2, 3], "watermark": [1, 2, 2]})
        >>> path: pathlib.Path = dirpath / "partitioned_object"
        >>> df.write_parquet(
        ...     path,
        ...     use_pyarrow=True,
        ...     pyarrow_options={"partition_cols": ["watermark"]},
        ... )
        """
        if compression is None:
            compression = "uncompressed"
        if isinstance(file, (str, Path)):
            if pyarrow_options is not None and pyarrow_options.get("partition_cols"):
                file = normalize_filepath(file, check_not_directory=False)
            else:
                file = normalize_filepath(file)

        if use_pyarrow:
            tbl = self.to_arrow()
            data = {}

            for i, column in enumerate(tbl):
                # extract the name before casting
                name = f"column_{i}" if column._name is None else column._name

                data[name] = column

            tbl = pa.table(data)

            # do not remove this import!
            # needed below
            import pyarrow.parquet  # noqa: F401

            if pyarrow_options is None:
                pyarrow_options = {}
            pyarrow_options["compression"] = (
                None if compression == "uncompressed" else compression
            )
            pyarrow_options["compression_level"] = compression_level
            pyarrow_options["write_statistics"] = statistics
            pyarrow_options["row_group_size"] = row_group_size
            pyarrow_options["data_page_size"] = data_page_size

            if pyarrow_options.get("partition_cols"):
                pa.parquet.write_to_dataset(
                    table=tbl,
                    root_path=file,
                    **(pyarrow_options or {}),
                )
            else:
                pa.parquet.write_table(
                    table=tbl,
                    where=file,
                    **(pyarrow_options or {}),
                )

        else:
>           self._df.write_parquet(
                file,
                compression,
                compression_level,
                statistics,
                row_group_size,
                data_page_size,
            )
E           FileNotFoundError: No such file or directory (os error 2)

/root/miniconda3/envs/meds_etl310/lib/python3.10/site-packages/polars/dataframe/frame.py:3540: FileNotFoundError
EthanSteinberg commented 4 months ago

Sorry, just saw this. I'll look into and fix this.

tompollard commented 4 months ago

Sorry, I'd also been meaning to look at this!

EthanSteinbergPrealize commented 4 months ago

Ok. I believe this might be fixed now? I added a fallback for the multiprocessing to use the default context if forkserver isn't available? Going to close this. Feel free to reopen @rvandewater if it's still not working.