rocs-org / data-pipelines

Data engineering monorepo with Airflow, dbt and postgres
https://ornt.biologie.hu-berlin.de/airflow
4 stars 1 forks source link

Adding a script that computes 7-day rolling incidences #21

Closed benmaier closed 3 years ago

benmaier commented 3 years ago

we usually need case numbers in (i) raw, (ii) 7d-averaged/summed, or (iii) 7d-averaged/summed per 100k. I would like to have this updated as an extra table called incidences in our db.

I've written the following the following script that can be run whenever the case numbers are downloaded and processed. It uses polars v 0.8.9 and computes incidences for every location on every level of abstraction that we have (4 levels):

these are the columns with types (for creating the table in the db):

[
             ('location_id',po.UInt16),
             ('location_level',po.UInt8),
             ('date_of_report',Date),
             ('new_cases',po.Int32),
             ('new_cases_last_7d',po.Int32),
             ('incidence_7d_per_100k',po.Float32),
             ('new_deaths',po.Int32),
             ('nuts3',str),
             ('population',po.UInt32),
             ('state',po.UInt8),
]
jakobkolb commented 3 years ago

@benmaier We can definitely add post processing steps to the pipelines. Certainly also this one. However, to make sure that we do it right, we need a testcase (unit test) for this function. Please provide one.

benmaier commented 3 years ago

script

name: compute_incidences.py

import polars as po
from rocsDB import rocsDB

def get_incidence_data(f='Aktuell_Deutschland_SarsCov2_Infektionen.csv'):

    db = rocsDB()

    data = db.submit_query("""
                    SELECT
                        germanid,
                        nuts3,
                        population
                    FROM
                        censusdata.german_counties_info cns
                    WHERE
                        TL_merge_id is not null
                """)

    db.close()
    germanid, nuts3, population = zip(*data)
    census = po.DataFrame({
            'germanid': germanid,
            'nuts3': nuts3,
            'population': population,
        })

    df = po.read_csv(f)

    keys = ['IdLandkreis','Meldedatum']

    grouped = ( df.groupby(keys)
                  .agg([
                        po.col('AnzahlFall').sum().alias("new_cases"),
                        po.col('AnzahlTodesfall').sum().alias("new_deaths"),
                        ])
                  .sort(by=keys)
              )

    # get landkreis ids
    LK = grouped['IdLandkreis'].unique().sort().to_list()

    # get date range 
    min_date = df['Meldedatum'].min()
    max_date = df['Meldedatum'].max()
    continuous_dates = list(range(min_date,max_date+1))

    # create full key dataset
    ids = []
    [ ids.extend([i]*len(continuous_dates)) for i in LK ]
    continuous_dates *= len(LK)

    dates = po.DataFrame({ 'date': continuous_dates, 'id': ids})
    dates['date'] = dates['date'].cast(po.Date32)

    # create non-existing entries and fill cases with zeros
    continuous = (grouped.join(dates,left_on=keys,
                               right_on=['id','date'],how='outer')
                         .sort(by=keys)
                         )

    continuous = continuous.join(census,left_on='IdLandkreis',right_on='germanid',how='left')

    # add zeros
    mask = continuous['new_cases'].is_null()
    continuous[mask,'new_cases'] = 0
    continuous[mask,'new_deaths'] = 0
    continuous[mask,'new_deaths'] = 0

    new = None

    # add rolling sum for every landkreis and date

    for i in LK:

        this_LK = continuous[continuous['IdLandkreis'] == i]
        roll = this_LK['new_cases'].rolling_sum(7)

        this_LK['new_cases_last_7d'] = roll
        if new is None:
            new = this_LK
        else:
            new = new.vstack(this_LK)

    # these are columns over which stuff will be aggregated
    agg_cols = [
                po.col('new_cases').sum().alias('new_cases'),
                po.col('new_cases_last_7d').sum().alias('new_cases_last_7d'),
                po.col('new_deaths').sum().alias('new_deaths'),
                po.col('population').sum().alias('population'),
                po.col('state').first().alias('state'),
               ]

    # add state data
    new['state'] = [ i // 1000 for i in new['IdLandkreis'] ]
    new['state'] = new['state']

    # get berlin and add it as a Landkreis
    berlin = (new.filter(po.col('state')==11)
                 .groupby('Meldedatum')
                 .agg(agg_cols)
                 .sort('Meldedatum')
                 )

    berlin['nuts3'] = [ 'DE300' for i in berlin['Meldedatum'] ]
    berlin['IdLandkreis'] = [ 11000 for i in berlin['Meldedatum'] ]
    berlin['IdLandkreis'] = berlin['IdLandkreis']
    berlin = berlin[new.columns]

    # construct location_level 4 (landkreise and berliner bezirke)
    landkreise_and_berliner_bezirke = new.clone()
    landkreise_and_berliner_bezirke['id'] = landkreise_and_berliner_bezirke['IdLandkreis']
    landkreise_and_berliner_bezirke['location_level'] = [ 4 for _ in landkreise_and_berliner_bezirke['IdLandkreis'] ]
    landkreise_and_berliner_bezirke = landkreise_and_berliner_bezirke.drop('IdLandkreis')

    # construct location_level 3 (landkreise)
    landkreise = new.filter(po.col('state') != 11)
    landkreise = (landkreise.vstack(berlin)
                            .sort(keys)
                            )
    landkreise['id'] = landkreise['IdLandkreis']
    landkreise = landkreise.drop('IdLandkreis')
    landkreise['location_level'] = [3 for _ in landkreise['id']]

    # construct location_level 1 (states)
    states = (new
                 .groupby(['state','Meldedatum'])
                 .agg(agg_cols[:-1])
                 .sort(['state','Meldedatum'])
             )
    states['id'] = states['state']
    states['location_level'] = [1 for _ in states['id']]
    states = states.lazy().with_columns([ po.lit(None).alias('nuts3')]).collect()

    # construct location_level 0 (germany)
    germany = (new
                 .groupby('Meldedatum')
                 .agg(agg_cols)
                 .sort('Meldedatum')
             )
    germany['id'] = [ 0 for _ in germany['Meldedatum'] ]
    germany['location_level'] = [ 0 for _ in germany['Meldedatum'] ]
    germany = germany.lazy().with_columns([ po.lit(None).alias('state')]).collect()
    germany = germany.lazy().with_columns([ po.lit(None).alias('nuts3')]).collect()
    germany['state'] = germany['state'].cast(po.Int64)

    # function to compute 7d incidence
    def add_7d_per_100k(df):
        df['incidence_7d_per_100k'] = df['new_cases_last_7d'] / df['population'] * 1e5
        #df['offset_incidence_7d_per_100k'] = (df['new_cases_last_7d']+1) / df['population'] * 1e5

    # add 7d incidence and concatenate
    dfall = None

    for df in [
            landkreise_and_berliner_bezirke,
            landkreise,
            states,
            germany,
            ]:
        add_7d_per_100k(df)
        df['nuts3'] = df['nuts3'].cast(po.Utf8)
        df = df.sort(by=['id','Meldedatum'])
        if dfall is None:
            dfall = df
        else:
            df = df[dfall.columns]
            dfall = dfall.vstack(df)

    dfall['date_of_report'] = dfall['Meldedatum']
    dfall = dfall.drop('Meldedatum')

    dfall['location_id'] = dfall['id']
    dfall = dfall.drop('id')

    dfall['new_cases'] = dfall['new_cases'].cast(po.Int32)
    dfall['new_deaths'] = dfall['new_deaths'].cast(po.Int32)
    dfall['population'] = dfall['population'].cast(po.UInt32)
    dfall['new_cases_last_7d'] = dfall['new_cases_last_7d'].cast(po.Int32)
    dfall['state'] = dfall['state'].cast(po.UInt8)
    dfall['location_id'] = dfall['location_id'].cast(po.UInt16)
    dfall['location_level'] = dfall['location_level'].cast(po.UInt8)
    dfall['incidence_7d_per_100k'] = dfall['incidence_7d_per_100k'].cast(po.Float32)
    #dfall['offset_incidence_7d_per_100k'] = dfall['offset_incidence_7d_per_100k'].cast(po.Float32)

    cols = [
             'location_id',
             'location_level',
             'date_of_report',
             'new_cases',
             'new_cases_last_7d',
             'incidence_7d_per_100k',
             'new_deaths',
             'nuts3',
             'population',
             'state',
           ]

    dfall = dfall[cols]

    return  dfall

if __name__=="__main__":
    dfall = get_incidence_data()

    print(dfall)
    dfall.to_csv('incidences.csv')

test script

name: test_incidences.py

import polars as po
import tempfile
from io import StringIO
from compute_incidences import get_incidence_data

_TEST_DATA = """
IdLandkreis,Altersgruppe,Geschlecht,Meldedatum,Refdatum,IstErkrankungsbeginn,NeuerFall,NeuerTodesfall,NeuGenesen,AnzahlFall,AnzahlTodesfall,AnzahlGenesen
1001,A15-A34,M,2020-03-19,2020-03-13,1,0,-9,0,1,0,1
1001,A15-A34,M,2020-03-21,2020-03-13,1,0,-9,0,1,0,1
1001,A15-A34,W,2020-03-19,2020-03-16,3,0,-9,0,1,0,1
11001,A15-A34,M,2020-03-03,2020-02-28,1,0,-9,0,1,0,1
11001,A35-A59,M,2020-03-03,2020-03-01,1,0,-9,0,1,0,1
11001,A35-A59,M,2020-03-05,2020-03-02,1,0,-9,0,1,1,1
11001,A35-A59,M,2020-03-17,2020-03-03,1,0,-9,0,1,0,1
11002,A15-A34,M,2020-03-03,2020-02-28,1,0,-9,0,1,0,1
11002,A35-A59,M,2020-03-03,2020-03-01,1,0,-9,0,1,0,1
11002,A35-A59,M,2020-03-05,2020-03-02,1,0,-9,0,1,0,1
11002,A35-A59,M,2020-03-17,2020-03-03,1,0,-9,0,1,0,1
"""

_TEST_RESULT_DATA = """location_id,location_level,date_of_report,new_cases,new_cases_last_7d,incidence_7d_per_100k,new_deaths,nuts3,population,state
1001,4,2020-03-03,0,,,0,DEF01,90164,1
1001,4,2020-03-04,0,,,0,DEF01,90164,1
1001,4,2020-03-05,0,,,0,DEF01,90164,1
1001,4,2020-03-06,0,,,0,DEF01,90164,1
1001,4,2020-03-07,0,,,0,DEF01,90164,1
1001,4,2020-03-08,0,,,0,DEF01,90164,1
1001,4,2020-03-09,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-10,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-11,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-12,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-13,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-14,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-15,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-16,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-17,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-18,0,0,0.0,0,DEF01,90164,1
1001,4,2020-03-19,2,2,2.2181802,0,DEF01,90164,1
1001,4,2020-03-20,0,2,2.2181802,0,DEF01,90164,1
1001,4,2020-03-21,1,3,3.3272703,0,DEF01,90164,1
11001,4,2020-03-03,2,,,0,,385748,11
11001,4,2020-03-04,0,,,0,,385748,11
11001,4,2020-03-05,1,,,1,,385748,11
11001,4,2020-03-06,0,,,0,,385748,11
11001,4,2020-03-07,0,,,0,,385748,11
11001,4,2020-03-08,0,,,0,,385748,11
11001,4,2020-03-09,0,3,0.7777098,0,,385748,11
11001,4,2020-03-10,0,1,0.2592366,0,,385748,11
11001,4,2020-03-11,0,1,0.2592366,0,,385748,11
11001,4,2020-03-12,0,0,0.0,0,,385748,11
11001,4,2020-03-13,0,0,0.0,0,,385748,11
11001,4,2020-03-14,0,0,0.0,0,,385748,11
11001,4,2020-03-15,0,0,0.0,0,,385748,11
11001,4,2020-03-16,0,0,0.0,0,,385748,11
11001,4,2020-03-17,1,1,0.2592366,0,,385748,11
11001,4,2020-03-18,0,1,0.2592366,0,,385748,11
11001,4,2020-03-19,0,1,0.2592366,0,,385748,11
11001,4,2020-03-20,0,1,0.2592366,0,,385748,11
11001,4,2020-03-21,0,1,0.2592366,0,,385748,11
11002,4,2020-03-03,2,,,0,,290386,11
11002,4,2020-03-04,0,,,0,,290386,11
11002,4,2020-03-05,1,,,0,,290386,11
11002,4,2020-03-06,0,,,0,,290386,11
11002,4,2020-03-07,0,,,0,,290386,11
11002,4,2020-03-08,0,,,0,,290386,11
11002,4,2020-03-09,0,3,1.0331076,0,,290386,11
11002,4,2020-03-10,0,1,0.34436923,0,,290386,11
11002,4,2020-03-11,0,1,0.34436923,0,,290386,11
11002,4,2020-03-12,0,0,0.0,0,,290386,11
11002,4,2020-03-13,0,0,0.0,0,,290386,11
11002,4,2020-03-14,0,0,0.0,0,,290386,11
11002,4,2020-03-15,0,0,0.0,0,,290386,11
11002,4,2020-03-16,0,0,0.0,0,,290386,11
11002,4,2020-03-17,1,1,0.34436923,0,,290386,11
11002,4,2020-03-18,0,1,0.34436923,0,,290386,11
11002,4,2020-03-19,0,1,0.34436923,0,,290386,11
11002,4,2020-03-20,0,1,0.34436923,0,,290386,11
11002,4,2020-03-21,0,1,0.34436923,0,,290386,11
1001,3,2020-03-03,0,,,0,DEF01,90164,1
1001,3,2020-03-04,0,,,0,DEF01,90164,1
1001,3,2020-03-05,0,,,0,DEF01,90164,1
1001,3,2020-03-06,0,,,0,DEF01,90164,1
1001,3,2020-03-07,0,,,0,DEF01,90164,1
1001,3,2020-03-08,0,,,0,DEF01,90164,1
1001,3,2020-03-09,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-10,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-11,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-12,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-13,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-14,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-15,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-16,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-17,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-18,0,0,0.0,0,DEF01,90164,1
1001,3,2020-03-19,2,2,2.2181802,0,DEF01,90164,1
1001,3,2020-03-20,0,2,2.2181802,0,DEF01,90164,1
1001,3,2020-03-21,1,3,3.3272703,0,DEF01,90164,1
11000,3,2020-03-03,4,,,0,DE300,676134,11
11000,3,2020-03-04,0,,,0,DE300,676134,11
11000,3,2020-03-05,2,,,1,DE300,676134,11
11000,3,2020-03-06,0,,,0,DE300,676134,11
11000,3,2020-03-07,0,,,0,DE300,676134,11
11000,3,2020-03-08,0,,,0,DE300,676134,11
11000,3,2020-03-09,0,6,0.88739806,0,DE300,676134,11
11000,3,2020-03-10,0,2,0.29579934,0,DE300,676134,11
11000,3,2020-03-11,0,2,0.29579934,0,DE300,676134,11
11000,3,2020-03-12,0,,,0,DE300,676134,11
11000,3,2020-03-13,0,,,0,DE300,676134,11
11000,3,2020-03-14,0,,,0,DE300,676134,11
11000,3,2020-03-15,0,,,0,DE300,676134,11
11000,3,2020-03-16,0,,,0,DE300,676134,11
11000,3,2020-03-17,2,2,0.29579934,0,DE300,676134,11
11000,3,2020-03-18,0,2,0.29579934,0,DE300,676134,11
11000,3,2020-03-19,0,2,0.29579934,0,DE300,676134,11
11000,3,2020-03-20,0,2,0.29579934,0,DE300,676134,11
11000,3,2020-03-21,0,2,0.29579934,0,DE300,676134,11
1,1,2020-03-03,0,,,0,,90164,1
1,1,2020-03-04,0,,,0,,90164,1
1,1,2020-03-05,0,,,0,,90164,1
1,1,2020-03-06,0,,,0,,90164,1
1,1,2020-03-07,0,,,0,,90164,1
1,1,2020-03-08,0,,,0,,90164,1
1,1,2020-03-09,0,0,0.0,0,,90164,1
1,1,2020-03-10,0,0,0.0,0,,90164,1
1,1,2020-03-11,0,0,0.0,0,,90164,1
1,1,2020-03-12,0,0,0.0,0,,90164,1
1,1,2020-03-13,0,0,0.0,0,,90164,1
1,1,2020-03-14,0,0,0.0,0,,90164,1
1,1,2020-03-15,0,0,0.0,0,,90164,1
1,1,2020-03-16,0,0,0.0,0,,90164,1
1,1,2020-03-17,0,0,0.0,0,,90164,1
1,1,2020-03-18,0,0,0.0,0,,90164,1
1,1,2020-03-19,2,2,2.2181802,0,,90164,1
1,1,2020-03-20,0,2,2.2181802,0,,90164,1
1,1,2020-03-21,1,3,3.3272703,0,,90164,1
11,1,2020-03-03,4,,,0,,676134,11
11,1,2020-03-04,0,,,0,,676134,11
11,1,2020-03-05,2,,,1,,676134,11
11,1,2020-03-06,0,,,0,,676134,11
11,1,2020-03-07,0,,,0,,676134,11
11,1,2020-03-08,0,,,0,,676134,11
11,1,2020-03-09,0,6,0.88739806,0,,676134,11
11,1,2020-03-10,0,2,0.29579934,0,,676134,11
11,1,2020-03-11,0,2,0.29579934,0,,676134,11
11,1,2020-03-12,0,,,0,,676134,11
11,1,2020-03-13,0,,,0,,676134,11
11,1,2020-03-14,0,,,0,,676134,11
11,1,2020-03-15,0,,,0,,676134,11
11,1,2020-03-16,0,,,0,,676134,11
11,1,2020-03-17,2,2,0.29579934,0,,676134,11
11,1,2020-03-18,0,2,0.29579934,0,,676134,11
11,1,2020-03-19,0,2,0.29579934,0,,676134,11
11,1,2020-03-20,0,2,0.29579934,0,,676134,11
11,1,2020-03-21,0,2,0.29579934,0,,676134,11
0,0,2020-03-03,4,,,0,,766298,
0,0,2020-03-04,0,,,0,,766298,
0,0,2020-03-05,2,,,1,,766298,
0,0,2020-03-06,0,,,0,,766298,
0,0,2020-03-07,0,,,0,,766298,
0,0,2020-03-08,0,,,0,,766298,
0,0,2020-03-09,0,6,0.7829852,0,,766298,
0,0,2020-03-10,0,2,0.26099506,0,,766298,
0,0,2020-03-11,0,2,0.26099506,0,,766298,
0,0,2020-03-12,0,,,0,,766298,
0,0,2020-03-13,0,,,0,,766298,
0,0,2020-03-14,0,,,0,,766298,
0,0,2020-03-15,0,,,0,,766298,
0,0,2020-03-16,0,,,0,,766298,
0,0,2020-03-17,2,2,0.26099506,0,,766298,
0,0,2020-03-18,0,2,0.26099506,0,,766298,
0,0,2020-03-19,2,4,0.5219901,0,,766298,
0,0,2020-03-20,0,4,0.5219901,0,,766298,
0,0,2020-03-21,1,5,0.6524877,0,,766298,
"""

# note: this function will fail when population data changes (e.g. in 2022).
# if that happens, just generate new _TEST_RESULT_DATA with the new population data
def test_incidences():

    # make file-like object to be read by polars
    with StringIO(_TEST_DATA) as f:
        df = get_incidence_data(f)

    # let polars write the result to a temporary file
    # so we can compare strings (unfortunately, polars does not write
    # into file-like objects, it only opens file by name)
    with tempfile.NamedTemporaryFile(mode='r',suffix='.csv') as f2:
        df.to_csv(f2.name)
        result_data = f2.read()

        assert(result_data == _TEST_RESULT_DATA)

if __name__=="__main__":

    test_incidences()
benmaier commented 3 years ago

Can we make sure that this runs asap? The modeling group needs access to this data regularly!

jakobkolb commented 3 years ago

I can start implementing this tomorrow. One question: what is the data source in production. Is there a link to download the data or is it already in our DB? If so , where?

benmaier commented 3 years ago

cool, thanks! Somewhere there must be a process running that downloads the case number csv file from this repo: https://github.com/robert-koch-institut/SARS-CoV-2_Infektionen_in_Deutschland and uploads its content to the database. I suspect @davhin knows how and where this is running.

the script above uses the same csv-file to produce the 7d-incidence dataframe.

davhin commented 3 years ago

as a hotfix, I suggest adding it to https://github.com/rocs-org/rocs-scripts tomorrow, which contains data scripts I run ~every day

davhin commented 3 years ago

We are currently updating the cases from https://prod-hub-indexer.s3.amazonaws.com/files/dd4580c810204019a7b8eb3e0b329dd6/0/full/4326/dd4580c810204019a7b8eb3e0b329dd6_0_full_4326.csv, this one has a couple more columns (seem superfluous) than the new source @benmaier provided, so I will

thoughts @benmaier ?