eqasim-org / ile-de-france

An open synthetic population of Île-de-France for agent-based transport simulation
GNU General Public License v2.0
51 stars 71 forks source link

Add dataframe validation before stage execution #197

Open Nitnelav opened 1 year ago

Nitnelav commented 1 year ago

I think it would be a good idea to use Pandera to describe and check the input dataframes of a given stage at runtime.

It has the benefit of :

I don't think it can or should be be imposed in every existing stage but it can be strongly encouraged by the community.

For exemple :

import pandas as pd
import pandera as pa
import numpy as np
import data.hts.hts as hts

"""
This stage cleans the Loire Atlantique EDGT.
"""

def configure(context):
    context.stage("data.hts.edgt_44.raw")

PURPOSE_MAP = {
    "home": [1, 2],
    "work": [11, 12, 13, 81],
    "education": [21, 22, 23, 24, 25, 26, 27, 28, 29],
    "shop": [30, 31, 32, 33, 34, 35, 82],
    "leisure": [51, 52, 53, 54],
    "other": [41, 42, 43, 44, 45, 61, 62, 63, 64, 71, 72, 73, 74, 91]
}

MODES_MAP = {
    "car": [13, 15, 21, 81],
    "car_passenger": [14, 16, 22, 82],
    "pt": [30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 51, 52, 53, 61, 71, 72, 73, 91, 92, 94, 95],
    "bike": [11, 17, 12, 18, 93, 19],
    "walk": [1, 2] # Actually, 2 is not really explained, but we assume it is walk
}

# expected input formats
HOUSEHOLDS_SCHEMA = pa.DataFrameSchema({
    'MTIR': pa.Column(object),
    'MP2': pa.Column(object),
    'ECH': pa.Column(object),
    'M5': pa.Column(np.int32),
    'M6': pa.Column(np.int32),
    'M7': pa.Column(np.int32),
    'COEM': pa.Column(float)
})
PERSONS_SCHEMA = pa.DataFrameSchema({
    "PTIR": pa.Column(object),
    "PP2": pa.Column(object),
    "ECH": pa.Column(object),
    "PER": pa.Column(np.int32),
    "P1": pa.Column(np.int32),
    "P2": pa.Column(np.int32),
    "P3": pa.Column(np.int32),
    "P4": pa.Column(np.int32),
    "P5": pa.Column(object, nullable=True),
    "P7": pa.Column(object, nullable=True),
    "P9": pa.Column(object, nullable=True),
    "P12": pa.Column(object, nullable=True),
    "COEP": pa.Column(float),
    "COEQ": pa.Column(float)
})
TRIPS_SCHEMA = pa.DataFrameSchema({
    "DTIR": pa.Column(object),
    "DP2": pa.Column(object),
    "ECH": pa.Column(object),
    "PER": pa.Column(np.int32),
    "NDEP": pa.Column(np.int32),
    "D2A": pa.Column(np.int32),
    "D3": pa.Column(object),
    "D4A": pa.Column(np.int32),
    "D4B": pa.Column(np.int32),
    "D5A": pa.Column(np.int32),
    "D7": pa.Column(object),
    "D8A": pa.Column(np.int32),
    "D8B": pa.Column(np.int32),
    "D8C": pa.Column(np.int32),
    "MODP": pa.Column(np.int32),
    "DOIB": pa.Column(np.int32),
    "DIST": pa.Column(np.int32)
})

def execute(context):
    df_households, df_persons, df_trips = context.stage("data.hts.edgt_44.raw")

    # check expected input formats
    df_households = HOUSEHOLDS_SCHEMA.validate(df_households)
    df_persons = PERSONS_SCHEMA.validate(df_persons)
    df_trips = TRIPS_SCHEMA.validate(df_trips)

   ...

    return df_households, df_persons, df_trips
sebhoerl commented 1 year ago
Nitnelav commented 1 year ago

O_o snakemake looks quite interesting indeed ! joining a broader "pipeline" community would make a lot of sense.

regarding the 2nd point I think I would prefer defining everything inside the script but I see how that might lead to a certain amount of code duplication (if df_persons structure doesn't change much across many scripts for exemple...).

Nitnelav commented 1 week ago

FYI, I'm using pandera right now in another pipeline, and I find it very verbose if you want to validate the whole dataframe at every stage... I'll have a better opinion in a few weeks