CityofToronto / bdit_traffic_prophet

Suite of algorithms for predicting average daily traffic on Toronto streets
GNU General Public License v3.0
1 stars 1 forks source link

Revise CountMatch ETL to read from PostGIS #15

Closed cczhu closed 4 years ago

cczhu commented 5 years ago

CountMatch currently only reads from text files. In production, it must read it raw data direction from our PostGIS database. We need to revise the ETL pipeline in reader.py to accommodate this new data source.

I'll refrain from trying to create a test suite for now, since it would require spinning up a test PostGIS database.

cczhu commented 4 years ago

Prototype script to read from Postgres:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import configparser
import pathlib
import psycopg2

config = configparser.RawConfigParser()
config.read(pathlib.Path.home().joinpath('cf.txt').as_posix());
postgres_settings = config['localpg']

db_con = psycopg2.connect(database='bigdata', **postgres_settings)

pg_command = """WITH step_1 AS (
    SELECT a.*,
           a.count_bin::date count_date
    FROM {dbt} a
    WHERE a.count_bin::date >= '{start_date}' AND a.count_bin::date < '{end_date}'
)
SELECT centreline_id,
       dir_bin direction,
       count_date,
       SUM(volume) daily_count
FROM step_1
GROUP BY centreline_id, dir_bin, count_date
HAVING COUNT(volume) = 96
ORDER BY count_year, centreline_id, direction, count_date;
"""

year = 2006
dbt = 'prj_vol.bd_centreline_volumes'

with psycopg2.connect(database='bigdata', **postgres_settings) as db_con:
    all_data = pd.read_sql(pg_command.format(
        dbt=dbt,
        start_date="{y}-01-01".format(y=year),
        end_date="{y}-01-01".format(y=(year + 1))), db_con)

    for key, df_sub in all_data.groupby(['centreline_id', 'direction']):
        centreline_id = key[0]
        direction = key[1]

        data = df_sub[['count_date', 'daily_count']].copy()
        data.columns = ['Date', 'Daily Count']

        # Filename is used to flag for HW401 data in Arman's zip files, so
        # just pass a dummy value here.
        print({'filename': 'frompostgres',
               'centreline_id': int(centreline_id),
               'direction': int(direction),
               'data': data,
               'year': year})
cczhu commented 4 years ago

Created a new branch to deal with connections to Postgres, which fills out the connection object. To use:

from traffic_prophet import cfg
cfg.postgres['schema_table'] = 'prj_vol.bd_centreline_volumes'
import pathlib
filepath = pathlib.Path.home().joinpath('cf.txt')

from traffic_prophet import connection

conn = connection.Connection(filepath, 'localpg')
cczhu commented 4 years ago

Command to create materialized view on Postgres:

CREATE MATERIALIZED VIEW prj_vol.bd_centreline_daily_counts
TABLESPACE pg_default
AS
 WITH step_1 AS (
         SELECT a.volume_id,
            a.centreline_id,
            a.dir_bin,
            a.count_bin,
            a.volume,
            a.couttype,
            a.count_bin::date AS count_date
           FROM prj_vol.bd_centreline_volumes a
          WHERE a.count_bin::date >= '2006-01-01'::date
        )
 SELECT step_1.centreline_id,
    step_1.dir_bin AS direction,
    date_part('year'::text, step_1.count_date) AS count_year,
    step_1.count_date,
    sum(step_1.volume) AS daily_count
   FROM step_1
  GROUP BY step_1.centreline_id, step_1.dir_bin, step_1.count_date
 HAVING count(step_1.volume) = 96
  ORDER BY (date_part('year'::text, step_1.count_date)), step_1.centreline_id, step_1.dir_bin, step_1.count_date
WITH DATA;

ALTER TABLE prj_vol.bd_centreline_daily_counts
    OWNER TO czhubd;

COMMENT ON MATERIALIZED VIEW prj_vol.bd_centreline_daily_counts
    IS 'Aggregation of bd_centreline_volumes data after 2006-01-01 inclusive to daily bins, excluding any days of incomplete data.';

GRANT ALL ON TABLE prj_vol.bd_centreline_daily_counts TO czhubd;
cczhu commented 4 years ago

Realizing there are some count bins that aren't at the quarter hour, revised the above materialized view:

CREATE MATERIALIZED VIEW prj_vol.bd_centreline_daily_counts
TABLESPACE pg_default
AS
    -- Select only counts from after 2006.
    with step_1 AS (
        SELECT *
        FROM prj_vol.bd_centreline_volumes
        WHERE count_bin::date >= '2006-01-01'::date
    -- Round time bin to the nearest 15 minutes.
    -- https://stackoverflow.com/questions/7299342/what-is-the-fastest-way-to-truncate-timestamps-to-5-minutes-in-postgres
    ), step_2 AS (
        SELECT volume_id,
               centreline_id,
               dir_bin,
               count_bin,
               DATE_TRUNC('HOUR', count_bin) + ROUND((DATE_PART('MINUTE', count_bin) / 15.)::numeric, 0) * INTERVAL '15 min' count_bin_rounded,
               volume
        FROM step_1
    -- In the <10 cases where we now have two timestamps at the same time, average the two.
    ), step_3 AS (
        SELECT centreline_id,
               dir_bin,
               count_bin_rounded::date count_date,
               AVG(volume) volume
        FROM step_2
        GROUP BY centreline_id, dir_bin, count_bin_rounded
    )
    -- Determine 24 hour volumes, removing data from incomplete days.
    SELECT centreline_id,
           dir_bin AS direction,
           DATE_PART('YEAR', count_date) AS count_year,
           count_date,
           SUM(volume) AS daily_count
    FROM step_3
    GROUP BY centreline_id, dir_bin, count_date
    HAVING COUNT(volume) = 96
    ORDER BY DATE_PART('YEAR', count_date), centreline_id, dir_bin, count_date
WITH DATA;

ALTER TABLE prj_vol.bd_centreline_daily_counts
    OWNER TO czhubd;

COMMENT ON MATERIALIZED VIEW prj_vol.bd_centreline_daily_counts
    IS 'Aggregation of bd_centreline_volumes data after 2006-01-01 inclusive to daily bins, averaging duplicate bins and excluding any days of incomplete data.';

GRANT ALL ON TABLE prj_vol.bd_centreline_daily_counts TO czhubd;
cczhu commented 4 years ago

Another table hosting centreline midpoint lon-lat locations:

CREATE MATERIALIZED VIEW czhu.btp_centreline_lonlat AS
    SELECT geo_id centreline_id,
           fcode_desc,
           geom,
           ST_X(ST_LineInterpolatePoint(ST_LineMerge(geom), 0.5)) lon,
           ST_Y(ST_LineInterpolatePoint(ST_LineMerge(geom), 0.5)) lat
    FROM gis.centreline
    WHERE NOT fcode_desc IN ('River', 'Major Shoreline', 'Minor Shoreline (Land locked)', 'Ferry Route', 'Major Railway', 'Pending', 'Geostatistical line', 'Other', 'Walkway', 'Trail', 'Minor Railway', 'Hydro Line', 'Creek/Tributary')
WITH DATA;

ALTER TABLE czhu.btp_centreline_lonlat
    OWNER TO czhu;

COMMENT ON MATERIALIZED VIEW czhu.btp_centreline_lonlat
    IS 'Lon-lat centres of centreline segments.';

GRANT ALL ON TABLE czhu.btp_centreline_lonlat TO czhu;
GRANT SELECT ON TABLE czhu.btp_centreline_lonlat TO bdit_humans;
cczhu commented 4 years ago

Encountered the first cross-platform design issue: I want to set a minimum number of counts per day in the raw data for the day to be included in the daily counts. Currently it's set to 24 (1/4 of the 96 15-minute bins for a complete day). The trouble is that the Postgres ETL already aggregates to daily counts, so must handle this removal separately. Will raise an issue suggesting that we at least document this (#23).

cczhu commented 4 years ago

Created a countmatch integration test framework in a Jupyter notebook. Produced views of btp_centreline_daily_counts and btp_centreline_lonlat that only have the counts from SAMPLE_ZIP, and read those in using Traffic Prophet to check if the objects created from Postgres queries are effectively identical to those created by reading the SAMPLE_ZIP files in.

The result can't be run independently of a working Postgres database and credentials to access them, so won't be migrated to the test suite (yet). Instead, storing the notebook inside the sandbox (even though the sandbox doesn't currently have the code to run this stuff).

cczhu commented 4 years ago

Not sure why this didn't self-close after #22.