CityofToronto / bdit_data-sources

Data sources used by the Big Data Innovation Team
https://github.com/orgs/CityofToronto/teams/bigdatainnovationteam
GNU General Public License v3.0
39 stars 8 forks source link

Miovision Data Refresh #331

Closed radumas closed 2 years ago

radumas commented 4 years ago

For reasons #329 #323 we need to refresh all of the data in miovision_api.volumes. The steps are as follows

  1. Make a copy of the aggregated metrics we are looking at now for performance, we will compare how this change affects numbers before/after in case anyone notices a difference in products we have previously pushed to stakeholders.

  2. Ensure the above issues are complete.

  3. 🛑 refreshing and publishing/sending aggregated Miovision data products

  4. TRUNCATE miovision_api.volumes CASCADE

  5. run pull_intersection_tmc.py for all of time

  6. truncate the following

    TRUNCATE miovision_api.volumes_15min_tmc;
    TRUNCATE miovision_api.volumes_15min;
    TRUNCATE miovision_api.report_dates;
    TRUNCATE miovision_api.unacceptable_gaps;
  7. to re-aggregate the data, miovision_api.gapsize_lookup needs to be refreshed for the dates in question since it is currently refreshed daily to look at the last 60 days. I think the ideal would be to do this in 2 month chunks: a) https://github.com/CityofToronto/bdit_data-sources/issues/334 b) loop over the process_data() Python function in a Python script https://github.com/CityofToronto/bdit_data-sources/blob/a22e407b09534dc01da99e15893725c2917f0248/volumes/miovision/api/intersection_tmc.py#L208-L243

  8. Refresh downstream materialized views

  9. Do a comparison of before/after data change

KatiRG commented 4 years ago

So far I have not worked with the miovision data stored on the RDS. Joven has some scripts that read in bicycle data which I will need later, so I will make sure not to do anything until the refresh is complete.

cczhu commented 4 years ago

Backed up materialized views required by covid webapp, and commented out the refresh Miovision operator in the relevant DAG. Ready to TRUNCATE miovision_api.volumes CASCADE.

aharpalaniTO commented 4 years ago

Thanks @cczhu, that was my only concern. Nothing else to add here - an estimate on duration of the refresh once you have an idea would be helpful.

Thank you!

cczhu commented 4 years ago

Performed TRUNCATE miovision_api.volumes CASCADE;. Now running

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-01-01 --end_date 2020-09-23 --pull &> log_refresh_2020.txt
cczhu commented 4 years ago

2020 data refresh complete. Checking for gaps between the last timestamp of the day and 23:59 using Aakash's query (from here):

WITH X AS (
    SELECT intersection_uid, datetime_bin::date, EXTRACT(EPOCH FROM ('23:59'::time without time zone - MAX(datetime_bin::time without time zone)))/60/(60*24)  AS time_missing
    FROM miovision_api.volumes
    WHERE classification_uid = 1
    GROUP BY intersection_uid, datetime_bin::date
)

SELECT date_trunc('month', datetime_bin)::date AS mth, AVG(time_missing) * 100 AS pct_missing
FROM X
GROUP BY date_trunc('month', datetime_bin)
ORDER BY date_trunc('month', datetime_bin)
Month Percent
2020-01 0.25462962962963
2020-02 0.0900725764644894
2020-03 0.142882187938289
2020-04 0.223149492017417
2020-05 0.0152914134330684
2020-06 0.0155976792242887
2020-07 0.065116189596799
2020-08 0.0250311672120929
2020-09 4.38301779433064

Except for September, these numbers are all tiny. In September we only have partial data on 2020-09-23 because the data was refreshed on that day. For every other day, the difference between the last timestamp and 23:59 is within a few seconds of zero.

radumas commented 4 years ago

Is September bumped up only because yesterday's data was pulled using the old method?

cczhu commented 4 years ago

Is September bumped up only because yesterday's data was pulled using the old method?

Yes. Sorry, posted before I finished commenting.

radumas commented 4 years ago

Sweet looks like the problem was solved!

cczhu commented 4 years ago
SELECT datetime_bin::date, COUNT(*)
FROM miovision_api.volumes
WHERE classification_uid = 0
GROUP BY datetime_bin::date;

Returns 1136 entries from 2020-09-22, because the Miovision update pipeline ran last night. Dropping data from 2020-09-22 using

DELETE FROM miovision_api.volumes
WHERE datetime_bin >= '2020-09-22 00:00:00';

and rerunning the puller with

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-09-22 --end_date 2020-09-23 --pull &> log_refresh_2020_09_22.txt
cczhu commented 4 years ago

Checking population breakdown by class:

SELECT classification_uid, COUNT(*) n_class
FROM miovision_api.volumes
GROUP BY classification_uid
ORDER BY 1
classification_uid n_class
1 33891301
2 4879887
3 4184999
4 2548066
5 121103
6 22460279
9 161986

Seems like it comes from legitimate data.

SELECT intersection_uid,
       datetime_bin::date date_bin,
       COUNT(DISTINCT datetime_bin) / (60. * 24) frac_complete_data
FROM miovision_api.volumes
WHERE datetime_bin >= '2020-01-01'
GROUP BY intersection_uid, datetime_bin::date
ORDER BY 1, 2

shows most intersections have more than 90% of data per day. We can modify the above to

SELECT intersection_uid,
       datetime_bin::date date_bin,
       COUNT(DISTINCT datetime_bin) / (60. * 24) frac_complete_data
FROM miovision_api.volumes
WHERE datetime_bin >= '2020-01-01'
GROUP BY intersection_uid, datetime_bin::date
HAVING COUNT(DISTINCT datetime_bin) / (60. * 24) < 0.9
ORDER BY 1, 2

which shows there's no regular pattern of missing data consistent across multiple intersections, but there are definitely days for many intersections where more than a third of data is missing (even excluding the first and last days each station were operational). Since cameras go down and whatnot, this is expected.

cczhu commented 4 years ago

Running 2019 update using

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2019-01-01 --end_date 2019-12-31 --pull &> log_refresh_2019.txt
cczhu commented 4 years ago

Checks for 2019 data refresh

WITH X AS (
    SELECT intersection_uid, datetime_bin::date, EXTRACT(EPOCH FROM ('23:59'::time without time zone - MAX(datetime_bin::time without time zone)))/60/(60*24)  AS time_missing
    FROM miovision_api.volumes
    WHERE classification_uid = 1 AND datetime_bin < '2020-01-01 00:00:00'
    GROUP BY intersection_uid, datetime_bin::date
)

SELECT date_trunc('month', datetime_bin)::date AS mth, AVG(time_missing) * 100 AS pct_missing
FROM X
GROUP BY date_trunc('month', datetime_bin)
ORDER BY date_trunc('month', datetime_bin)
Month Percent
2019-01-01 0.42903213042102
2019-02-01 0.207779549884813
2019-03-01 0.750734214390602
2019-04-01 0.00140901771336554
2019-05-01 0.498967586099424
2019-06-01 0.000100644122383253
2019-07-01 0.0847358578775129
2019-08-01 0.206282231598687
2019-09-01 0.68001171352075
2019-10-01 0.134917725643532
2019-11-01 0.00021043771043771
2019-12-01 0.747790404040404

which looks legit - less than 1% data dropout for all of 2019.

SELECT COUNT(*)
FROM miovision_api.volumes
WHERE classification_uid = 0

returns 0 rows (this also checks 2020 and today's refresh).

SELECT classification_uid, COUNT(*) n_class
FROM miovision_api.volumes
WHERE datetime_bin < '2020-01-01'
GROUP BY classification_uid
ORDER BY 1

gives

classification_uid n_class
1 56592454
2 4017079
3 3375005
4 3117807
5 156800
6 40014068
9 2559674

which is a surprising number of generic motorized vehicles, though might make sense if streetcar detections have all been moved there.

```sql
SELECT intersection_uid,
       datetime_bin::date date_bin,
       COUNT(DISTINCT datetime_bin) / (60. * 24) frac_complete_data
FROM miovision_api.volumes
WHERE datetime_bin < '2020-01-01'
GROUP BY intersection_uid, datetime_bin::date
HAVING COUNT(DISTINCT datetime_bin) / (60. * 24) < 0.9
ORDER BY 1, 2

does lead to a bunch of days with less than 90% of all timestamps represented. I don't see any regular pattern of missing data across different intersections and months, though, like we'd expect if we were missing eg. the last hour of each 6 hour block. In a few cases (eg. intersection_uid = 6 for January 2019) I did spot checks to make sure every hour was represented on most days, and this was the case.

cczhu commented 4 years ago

Just to remind myself later, the best practice for truncating derived views is

TRUNCATE miovision_api.volumes_15min, miovision_api.volumes_15min_tmc, miovision_api.volumes_tmc_atr_xover, miovision_api.report_dates, miovision_api.unacceptable_gaps;
cczhu commented 4 years ago

To compare our newly pulled data against historical processed data, we saved the Miovision summary tables from the COVID webapp under covid.miovision_summary_20200922backup. Then, using intersection_tmc.process_data, created 15-minute binned data for the same intersections within 03 - Postprocessing Smoke Test.ipynb. Finally, created a script to aggregate the 15-minute data to hourly data. Some results:

Richmond / Bathurst

image

The blue line is for timestamps where there is data in both covid.miovision_summary_20200922backup and in the new hourly data, and represents the difference (new - old) of the datasets. For timestamps where there's data in the new view, but nothing in 202009022backup, or vice versa, I plot them as points and assume the missing data is 0. This means that all green points are always positive and orange points are always negative (since we use new - old, and there is no new data so new = 0).

On a typical day where we have new and old data, the difference between them peaks in the middle of the day with a peak amplitude of around 50:

image

This indicates either that more rows have been pulled from the database in the new data refresh, or some data was pulled.

On April 3 from 18:00 - 24:00 there's a large drop in new volumes:

image

This doesn't correspond to an HTTP error in the download logs, and an attempt at re-pulling the data using:

import psycopg2
import math
import pytz
import datetime
from requests import Session
import pandas as pd
import pathlib
import configparser

import sys
sys.path.append("bdit_data-sources/volumes/miovision/api/")
import intersection_tmc as itmc

# Grab Postgres and Miovision credentials.
config = configparser.ConfigParser()
config.read(pathlib.Path.home().joinpath('.charlesconfig').as_posix())
postgres_settings = config['POSTGRES']
miov_token = config['MIOVISION']['key']

local_tz = pytz.timezone('US/Eastern')

url = 'https://api.miovision.com/intersections/' 
tmc_endpoint = '/tmc'

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    df_intcodes = pd.read_sql("SELECT * FROM miovision_api.intersections", con=conn)

intcode_lookup = dict(zip(df_intcodes['intersection_uid'], df_intcodes['id']))

# Generator for cycling through 6-hour time blocks.
def timerange(start_time, end_time, dt):
    for i in range(math.ceil((end_time - start_time) / dt)):
        c_start = start_time + i * dt
        c_end = min(c_start + dt, end_time)
        yield (c_start, c_end)

# As above, but uses intersection_tmc.py directly.
def get_intersection_tmc_loop(start_time, end_time, dt, intersection_uid):
    tmc_all = []

    intersection_id = intcode_lookup[intersection_uid]

    for (c_start_t, c_end_t) in timerange(start_time, end_time, dt):
        params = {'endTime': c_end_t, 'startTime' : c_start_t} 
        tmc = itmc.get_intersection_tmc(c_start_t, c_end_t, intersection_id,
                                        intersection_uid, miov_token)
        tmc_all += tmc
    return tmc_all

def pull_api(start_time, end_time, dt, intersection_uid):
    raw_tmc = get_intersection_tmc_loop(start_time, end_time, dt, intersection_uid)
    raw_tmc = pd.DataFrame(
        raw_tmc,columns=['intersection_uid', 'datetime_bin', 'classification_uid',
                         'leg', 'movement', 'volume'])
    raw_tmc["classification_uid"] = raw_tmc["classification_uid"].astype(int)
    raw_tmc["movement"] = raw_tmc["movement"].astype(int)
    raw_tmc["volume"] = raw_tmc["volume"].astype(int)
    return raw_tmc

start_time = local_tz.localize(datetime.datetime(2020, 4, 3, 19, 0, 0))
end_time = local_tz.localize(datetime.datetime(2020, 4, 3, 20, 0, 0))
dt = datetime.timedelta(hours=6)
intersection_uid = 26

raw_tmc = pull_api(start_time, end_time, dt, intersection_uid)

leads to a count of 882, vs. 281 if we just sum up the raw volumes using

SELECT SUM(volume) FROM miovision_api.volumes
WHERE intersection_uid = 26
    AND datetime_bin BETWEEN '2020-04-03 19:00' AND '2020-04-03 19:59:59'
    AND classification_uid = 1
cczhu commented 4 years ago

Almost all cases where there's only data in the new dataset come from between 03:00 and 05:00, with one exception at 23:00. Meanwhile, all cases where there's only data in the old dataset are during high-volume hours, sometimes in the middle of the day.

image

A scan of a few of these suggest they're mainly due to the gap finding algorithm. On April 23, there's a gap of data between 11:25 and 16:00, which led to the algorithm masking out 11:00 - 17:15. This can be seen by looking at the volume_15min_tmc_uid column of this query:

SELECT * FROM miovision_api.volumes
WHERE intersection_uid = 26
    AND datetime_bin BETWEEN '2020-04-23 10:00' AND '2020-04-23 17:59:59'
    AND classification_uid = 1
ORDER BY datetime_bin

On April 26 there's a gap between 15-minute data between 12:00 and 14:00, but there's no corresponding gap in the minute-data:

SELECT * FROM miovision_api.volumes
WHERE intersection_uid = 26
    AND datetime_bin BETWEEN '2020-04-26 11:00' AND '2020-04-26 13:59:59'
    AND classification_uid = 1
ORDER BY datetime_bin

King / Yonge

For intersection_uid = 18 we see similar patterns as in Richmond / Bathurst for light autos.

image

Switching to pedestrians, we get some strange results:

image

From March 6 06:00 - 12:00 we see a massive drop in pedestrian volume. Going back to the source data, summing up those six hours from the old data

SELECT SUM(volume_actual) FROM covid.miovision_summary_20200922backup
WHERE intersection_uid = 18 AND class_type = 'Pedestrians'
      AND datetime_bin BETWEEN '2020-03-12 06:00' AND '2020-03-12 11:59'

gives 12147, but

SELECT SUM(volume) FROM miovision_api.volumes
WHERE intersection_uid = 18
    AND datetime_bin BETWEEN '2020-03-12 06:00' AND '2020-03-12 11:59'
    AND classification_uid = 6

gives only 3976. This anomaly seems to be in the new volumes, since summing up volumes from 1 day earlier

SELECT SUM(volume) FROM miovision_api.volumes
WHERE intersection_uid = 18
    AND datetime_bin BETWEEN '2020-03-11 06:00' AND '2020-03-11 11:59'
    AND classification_uid = 6

gives 12638.

Pulling directly from the API:

import psycopg2
import math
import pytz
import datetime
from requests import Session
import pandas as pd
import pathlib
import configparser

import sys
sys.path.append("bdit_data-sources/volumes/miovision/api/")
import intersection_tmc as itmc

# Grab Postgres and Miovision credentials.
config = configparser.ConfigParser()
config.read(pathlib.Path.home().joinpath('.charlesconfig').as_posix())
postgres_settings = config['POSTGRES']
miov_token = config['MIOVISION']['key']

local_tz = pytz.timezone('US/Eastern')

url = 'https://api.miovision.com/intersections/' 
tmc_endpoint = '/tmc'

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    df_intcodes = pd.read_sql("SELECT * FROM miovision_api.intersections", con=conn)

intcode_lookup = dict(zip(df_intcodes['intersection_uid'], df_intcodes['id']))

# Generator for cycling through 6-hour time blocks.
def timerange(start_time, end_time, dt):
    for i in range(math.ceil((end_time - start_time) / dt)):
        c_start = start_time + i * dt
        c_end = min(c_start + dt, end_time)
        yield (c_start, c_end)

# As above, but uses intersection_tmc.py directly.
def get_ped_loop(start_time, end_time, dt, intersection_uid):
    tmc_all = []

    intersection_id = intcode_lookup[intersection_uid]

    for (c_start_t, c_end_t) in timerange(start_time, end_time, dt):
        params = {'endTime': c_end_t, 'startTime' : c_start_t} 
        tmc = itmc.get_pedestrian(c_start_t, c_end_t, intersection_id,
                                  intersection_uid, miov_token)
        tmc_all += tmc
    return tmc_all

def pull_api(start_time, end_time, dt, intersection_uid):
    raw_tmc = get_ped_loop(start_time, end_time, dt, intersection_uid)
    raw_tmc = pd.DataFrame(
        raw_tmc,columns=['intersection_uid', 'datetime_bin', 'classification_uid',
                         'leg', 'movement', 'volume'])
    raw_tmc["classification_uid"] = raw_tmc["classification_uid"].astype(int)
    raw_tmc["movement"] = raw_tmc["movement"].astype(int)
    raw_tmc["volume"] = raw_tmc["volume"].astype(int)
    return raw_tmc

start_time = local_tz.localize(datetime.datetime(2020, 3, 12, 6, 0, 0))
end_time = local_tz.localize(datetime.datetime(2020, 3, 12, 12, 0, 0))
dt = datetime.timedelta(hours=6)
intersection_uid = 18

raw_tmc = pull_api(start_time, end_time, dt, intersection_uid)

gives 12147! This is another case where we only got a fraction of the rows we expect during the last pull.

radumas commented 4 years ago

To summarize what I'm reading:

  1. Pulling data in 6 hour chunks is leading to higher Light vehicle volumes in the middle of the day, in more of a bell shape. This is unexpected considering the issue this was solving #323 had volumes stop coming in for evening hours.
  2. In the newly pulled data there are the occasional dramatic drops in volume for certain intersection/hours. Based on an examination of the disaggregate volumes not documented here, certain legs for Richmond were missing for a few hours when the data was pulled for the whole year, but an independent script was able to pull the data. This would appear to be the issue that we have been trying to tackle: that data vanishes from the API when being pulled for sustained periods of time. First it was entire stretches from an evening hour until midnight, then it was the 500error, and now it appears that a few legs of data for an intersection vanish. Please repull that date and intersection to validate that that error cannot be reproduced. In other words, are we sure the issue is with sustained use of the API and not somewhere else in our process? Are there ways that these drops in data can be systematically identified?
  3. Some change in gap finding means that larger periods of time are getting filtered out, hence no new data for certain intersection/times, in particular during heavier volume periods.
  4. For unknown reasons there now appears to be more data in evening hours when comparing the two highly aggregated datasets, again likely due to some change in gap finding.

The priority focus for this analysis should be whether we are confident we have re-pulled one-minute data for the past two years successfully. We may need to revisit gap finding but let's first make sure we have good data.

cczhu commented 4 years ago

Ah, point 1. is really important - for the same intersections (18 and 26) I looked at above, the bell-shaped volume excess only occurs for light vehicles, not trucks and peds.

cczhu commented 4 years ago

Investigating further into how the gap finder is masking out peak hour data, for Richmond / Bathurst, we see in the figure above that 2020-04-02 17:00 - 19:00 is missing hourly binned data in the new aggregation, but not the old. Looking at the minute-binned volumes using:

SELECT * FROM miovision_api.volumes
WHERE intersection_uid = 26
    AND datetime_bin BETWEEN '2020-04-02 16:00' AND '2020-04-02 18:59:59'
    AND classification_uid = 1
ORDER BY datetime_bin

we see no associated volume_15min_tmc_uid between 17:00 and 18:15 even though there's 997 counts available, so this data is being dropped by the aggregation. Looking into miovision_api.unacceptable_gaps using

SELECT * FROM miovision_api.unacceptable_gaps WHERE intersection_uid = 26

there's indeed a gap of 6 minutes that afternoon:

image

If we aggregate 2020-04-02 up to 15 minutes using this (modified from function-aggregate-volumes_15min_tmc.sql):

WITH zero_padding_movements AS (
        /*Cross product of legal movement for cars, bikes, and peds and the bins to aggregate*/
        SELECT m.*, datetime_bin15 
        FROM miovision_api.intersection_movements m
        CROSS JOIN generate_series('2020-04-02'::date - interval '1 hour', '2020-04-03'::date - interval '1 hour 15 minutes', INTERVAL '15 minutes') AS dt(datetime_bin15)
        -- make sure that the intersection is still active
        WHERE intersection_uid = 26
        )
SELECT pad.intersection_uid,
        pad.datetime_bin15 AS datetime_bin,
        pad.classification_uid,
        pad.leg,
        pad.movement_uid,
        CASE WHEN un.accept = FALSE THEN NULL ELSE (COALESCE(SUM(A.volume), 0)) END AS volume
FROM zero_padding_movements pad
--To set unacceptable ones to NULL instead (& only gap fill light vehicles, cyclist and pedestrian)
LEFT JOIN miovision_api.unacceptable_gaps un 
    ON un.intersection_uid = pad.intersection_uid
    AND pad.datetime_bin15 BETWEEN DATE_TRUNC('hour', gap_start) 
    AND DATE_TRUNC('hour', gap_end) + interval '1 hour' -- may get back to this later on for fear of removing too much data
--To get 1min bins
LEFT JOIN miovision_api.volumes A
    ON A.datetime_bin >= '2020-04-02'::date - INTERVAL '1 hour' 
    AND A.datetime_bin < '2020-04-03'::date - INTERVAL '1 hour'
    AND A.datetime_bin >= pad.datetime_bin15 
    AND A.datetime_bin < pad.datetime_bin15 + interval '15 minutes'
    AND A.intersection_uid = pad.intersection_uid 
    AND A.classification_uid = pad.classification_uid
    AND A.leg = pad.leg 
    AND A.movement_uid = pad.movement_uid
GROUP BY pad.intersection_uid, pad.datetime_bin15, pad.classification_uid, pad.leg, pad.movement_uid, un.accept
HAVING pad.classification_uid IN (1,2,6) OR SUM(A.volume) > 0 
ORDER BY 1, 2, 3, 4, 5

The volume is NULL from 17:00 - 18:15. So likely what happened is there is currently an unacceptable gap where before there was none, but a separate issue is that the masked timespan is 1 hour 15 minutes long, leading to two NULL values when aggregating up to hourly bins. We should either switch to DATE_TRUNC('hour', gap_end) + interval '59 minutes' in the LEFT JOIN with miovision_api.unacceptable_gaps, or allow for NULLs in the aggregation up to 1 hour.

cczhu commented 4 years ago

To test whether simply re-running the API script can fill the dramatic drops in volume mentioned above, I will try to fill in Front / Jarvis (intersection_uid = 8) from 2020-03-14 to 2020-03-15.

image

Between 0600 and 1200, the old hourly binned data

SELECT SUM(volume_actual) FROM covid.miovision_summary_20200922backup
WHERE intersection_uid = 8 AND class_type = 'Pedestrians'
      AND datetime_bin BETWEEN '2020-03-14 06:00' AND '2020-03-14 11:59:59'

returns 6143 counts. The new raw data

SELECT SUM(volume) FROM miovision_api.volumes
WHERE intersection_uid = 8
    AND datetime_bin BETWEEN '2020-03-14 06:00' AND '2020-03-14 11:59:59'
    AND classification_uid = 6

returns 2538.

I run

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-03-14 --end_date 2020-03-15 --pull &> log_repull_2020_03_14.txt

Now

SELECT SUM(volume) FROM miovision_api.volumes
WHERE intersection_uid = 8
    AND datetime_bin BETWEEN '2020-03-14 06:00' AND '2020-03-14 11:59:59'
    AND classification_uid = 6

returns 2832.

😡

cczhu commented 4 years ago

Oh GOD, tried running this script to read from the API into a Pandas table:

import psycopg2
import math
import pytz
import datetime
from requests import Session
import pandas as pd
import pathlib
import configparser

import sys
sys.path.append("bdit_data-sources/volumes/miovision/api/")
import intersection_tmc as itmc

# Grab Postgres and Miovision credentials.
config = configparser.ConfigParser()
config.read(pathlib.Path.home().joinpath('.charlesconfig').as_posix())
postgres_settings = config['POSTGRES']
miov_token = config['MIOVISION']['key']

local_tz = pytz.timezone('US/Eastern')

url = 'https://api.miovision.com/intersections/' 
tmc_endpoint = '/tmc'

with psycopg2.connect(database='bigdata', **postgres_settings) as conn:
    df_intcodes = pd.read_sql("SELECT * FROM miovision_api.intersections", con=conn)

intcode_lookup = dict(zip(df_intcodes['intersection_uid'], df_intcodes['id']))

# Generator for cycling through 6-hour time blocks.
def timerange(start_time, end_time, dt):
    for i in range(math.ceil((end_time - start_time) / dt)):
        c_start = start_time + i * dt
        c_end = min(c_start + dt, end_time)
        yield (c_start, c_end)

# As above, but uses intersection_tmc.py directly.
def get_ped_loop(start_time, end_time, dt, intersection_uid):
    tmc_all = []

    intersection_id = intcode_lookup[intersection_uid]

    for (c_start_t, c_end_t) in timerange(start_time, end_time, dt):
        params = {'endTime': c_end_t, 'startTime' : c_start_t} 
        tmc = itmc.get_pedestrian(c_start_t, c_end_t, intersection_id,
                                  intersection_uid, miov_token)
        tmc_all += tmc
    return tmc_all

def pull_api(start_time, end_time, dt, intersection_uid):
    raw_tmc = get_ped_loop(start_time, end_time, dt, intersection_uid)
    raw_tmc = pd.DataFrame(
        raw_tmc,columns=['intersection_uid', 'datetime_bin', 'classification_uid',
                         'leg', 'movement', 'volume'])
    raw_tmc["classification_uid"] = raw_tmc["classification_uid"].astype(int)
    raw_tmc["movement"] = raw_tmc["movement"].astype(int)
    raw_tmc["volume"] = raw_tmc["volume"].astype(int)
    return raw_tmc

start_time = local_tz.localize(datetime.datetime(2020, 3, 14, 6, 0, 0))
end_time = local_tz.localize(datetime.datetime(2020, 3, 14, 12, 0, 0))
dt = datetime.timedelta(hours=6)
intersection_uid = 8

raw_tmc = pull_api(start_time, end_time, dt, intersection_uid)

Then ran

raw_tmc['datetime_bin'] = pd.to_datetime(raw_tmc['datetime_bin'])

raw_tmc['hour'] = raw_tmc['datetime_bin'].dt.hour

raw_tmc.groupby('hour')['volume'].sum()

to get hourly counts.

The first time I ran the API, I got:

image

The second time:

image

The second time is consistent with the old hourly counts:

SELECT * FROM covid.miovision_summary_20200922backup
WHERE intersection_uid = 8 AND class_type = 'Pedestrians'
      AND datetime_bin BETWEEN '2020-03-14 06:00' AND '2020-03-14 11:59:59'
ORDER BY datetime_bin

image

Running the Python script for a third, fourth and fifth time, I get the same results as the second time, so am unable to reproduce this problem. Regardless it shows that this issue is with the API and not our code. What's disturbing is the API is unstable enough to return different volumes on two subsequent 1 day pulls.

cczhu commented 4 years ago

Now I'm confused: running the Python script multiple times leads to the same volumes being pulled (the ones that are the same as in the backup version), but running python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-03-14 --end_date 2020-03-15 --pull &> log_repull_2020_03_14.txt, then running

SELECT EXTRACT(hour FROM datetime_bin) date_hour, SUM(volume) volume
FROM miovision_api.volumes
WHERE intersection_uid = 8
    AND datetime_bin BETWEEN '2020-03-14 06:00' AND '2020-03-14 11:59:59'
    AND classification_uid = 6
GROUP BY EXTRACT(hour FROM datetime_bin)
ORDER BY 1, 2

keeps leading to

image

which are significantly lower than in the old data. Are we losing volumes when inserting data?

cczhu commented 4 years ago

Some progress! Made this test table:

CREATE TABLE czhu.miovision_volumes_test
(
  volume_uid serial NOT NULL,
  intersection_uid integer,
  datetime_bin timestamp without time zone,
  classification_uid integer,
  leg text,
  movement_uid integer,
  volume integer,
  volume_15min_tmc_uid integer
);
ALTER TABLE czhu.miovision_volumes_test
  OWNER TO czhu;
GRANT ALL ON TABLE czhu.miovision_volumes_test TO miovision_admins;
GRANT ALL ON TABLE czhu.miovision_volumes_test TO miovision_api_bot;
GRANT USAGE, SELECT ON SEQUENCE miovision_volumes_test_volume_uid_seq TO miovision_api_bot;

and created miov_api_puller_test.py, which is identical to intersection_tmc.py except that insert_data looks like:

def insert_data(conn, start_time, end_iteration_time, table, dupes):
    time_period = (start_time, end_iteration_time)
    conn.notices=[]
    with conn:
        with conn.cursor() as cur:
            insert_data = '''INSERT INTO czhu.miovision_volumes_test(intersection_uid, datetime_bin, classification_uid,
                             leg,  movement_uid, volume) VALUES %s'''
            execute_values(cur, insert_data, table)
            if conn.notices != []:
                logger.warning(conn.notices[-1])
                if dupes:
                    sys.exit(2)

    # with conn:
    #     with conn.cursor() as cur:
    #         api_log="SELECT miovision_api.api_log(%s::date, %s::date)"
    #         cur.execute(api_log, time_period)

    # logger.info('Inserted into volumes and updated log')

    # with conn:
    #     with conn.cursor() as cur:
    #         invalid_movements="SELECT miovision_api.find_invalid_movements(%s::date, %s::date)"
    #         cur.execute(invalid_movements, time_period)
    #         logger.info(conn.notices[-1])

which redirects pulled data to czhu.miovision_volumes_test rather than our usual miovision_api.volumes (and then doesn't update the log tables).

Running

python miov_api_puller_test.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-03-14 --end_date 2020-03-15 --intersection=8 --pull

and then using

SELECT EXTRACT(hour FROM datetime_bin) date_hour, SUM(volume) volume
FROM czhu.miovision_volumes_test
WHERE intersection_uid = 8
    AND datetime_bin BETWEEN '2020-03-14 06:00' AND '2020-03-14 11:59:59'
    AND classification_uid = 6
GROUP BY EXTRACT(hour FROM datetime_bin)
ORDER BY 1, 2

we get

image

the numbers we're looking for!

So something about the way miovision_api.volumes is set is removing rows that aren't duplicates.

cczhu commented 4 years ago

Finally, we can directly compare miovision_api.volumes with czhu.miovision_volumes_test:

WITH miov_data AS (
    SELECT datetime_bin,
           leg,
           movement_uid,
           volume
    FROM miovision_api.volumes
    WHERE intersection_uid = 8
        AND datetime_bin BETWEEN '2020-03-14 09:00' AND '2020-03-14 11:59:59'
        AND classification_uid = 6
), charles_test_data AS (
    SELECT datetime_bin,
           leg,
           movement_uid,
           volume
    FROM czhu.miovision_volumes_test
    WHERE intersection_uid = 8
        AND datetime_bin BETWEEN '2020-03-14 09:00' AND '2020-03-14 11:59:59'
        AND classification_uid = 6
)
SELECT datetime_bin,
       leg,
       movement_uid,
       a.volume,
       b.volume
FROM miov_data a
FULL OUTER JOIN charles_test_data b USING (datetime_bin, leg, movement_uid)
WHERE a.volume != b.volume

There are no rows where a.volume or b.volume are NULL, so all times are represented in both datasets. However, there are hundreds of rows where the volumes simply disagree:

image

radumas commented 4 years ago

The trigger function doesn't do anything when it hits the unique constraint, that's why when you attempted to repull the data into volumes it didn't update the volumes, because all the rows had the same time stamps.

It is extremely unexpected that the API is returning different volumes for the same 1-minute bins

cczhu commented 4 years ago

To evaluate more broadly the error found above, Raph copied the recently refreshed volumes into a volume_20201007 column. I've added a sleep(60) command at the end of each iteration of the for loop in pull_data. Now running

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-01-01 --end_date 2020-10-06 --pull &> log_refresh_2020_withsleep.txt

to re-pull all 2020 data. Will investigate once it's complete how different the volumes are.

radumas commented 4 years ago

Updating the trigger function to update rows on conflict

CREATE OR REPLACE FUNCTION miovision_api.volumes_insert_trigger()
    RETURNS trigger
    LANGUAGE 'plpgsql'
    COST 100
    VOLATILE NOT LEAKPROOF
AS $BODY$
BEGIN
    IF new.datetime_bin >= '2018-01-01'::date AND new.datetime_bin < ('2018-01-01'::date + '1 year'::interval) THEN 
        INSERT INTO miovision_api.volumes_2018 (intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume) 
        VALUES (NEW.*);
    ELSIF new.datetime_bin >= '2019-01-01'::date AND new.datetime_bin < ('2019-01-01'::date + '1 year'::interval) THEN 
        INSERT INTO miovision_api.volumes_2019 AS vol (intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume) 
        VALUES (NEW.intersection_uid, NEW.datetime_bin, NEW.classification_uid, NEW.leg, NEW.movement_uid, NEW.volume)
        ON CONFLICT ON CONSTRAINT volumes_2019_intersection_uid_datetime_bin_classification_u_key DO 
            UPDATE SET volume = EXCLUDED.volume
            WHERE vol.volume < EXCLUDED.volume;

    ELSIF new.datetime_bin >= '2020-01-01'::date AND new.datetime_bin < ('2020-01-01'::date + '1 year'::interval) THEN 
        INSERT INTO miovision_api.volumes_2020 AS vol (intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume) 
        VALUES (NEW.intersection_uid, NEW.datetime_bin, NEW.classification_uid, NEW.leg, NEW.movement_uid, NEW.volume)
        ON CONFLICT ON CONSTRAINT volumes_2020_intersection_uid_datetime_bin_classification_u_key DO 
            UPDATE SET volume = EXCLUDED.volume
            WHERE vol.volume < EXCLUDED.volume;
  ELSE 
    RAISE EXCEPTION 'Datetime_bin out of range.  Fix the volumes_insert_trigger() function!';
    END IF;
    RETURN NULL;
EXCEPTION
    WHEN UNIQUE_VIOLATION THEN 
        RAISE WARNING 'You are trying to insert duplicate data!';
    RETURN NULL;
END;
$BODY$;

ALTER FUNCTION miovision_api.volumes_insert_trigger()
    OWNER TO miovision_admins;

Add historic volume column

ALTER TABLE miovision_api.volumes
ADD COLUMN volume_20201007 INTEGER;
COMMENT ON COLUMN miovision_api.volumes.volume_20201007 is 'Old volume as of 2020-10-07';

UPDATE miovision_api.volumes SET volume_20201007 = volume
cczhu commented 4 years ago

Ran

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2019-01-01 --end_date 2020-01-01 --pull &> log_refresh_2019_withsleep.txt

and hit on an issue described in #342. Following Raph switching volume_uid to bigint, running:

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2019-04-18 --end_date 2020-01-01 --pull &>
 log_refresh_2019_withsleep_2.txt
cczhu commented 4 years ago

Now that we have an old API pull, a new API pull and a CSV dump from Miovision, we can check the data integrity of the three. I chose to do this by binning the raw minute-bin data in miovision_csv.volumes_2020 and miovision_api.volumes up to the hour, and joining the two aggregate tables together. This is done in 05 - Compare Miovision API and CSV Data.ipynb.

Missing API data due to auto-retrieval of active intersections

From the get-go, we notice a lot of missing data in miovision_api.volumes. This is because intersection_tmc.py checks that the activation date is before the start of the pull period, and the decommission date is before the current day (that should really be the last timestamp of the pull...). In total, 563,109 hours of data are missing.

Missing CSV data in August/September 2020

There's also data missing in the CSV pull. For almost all intersection_uids the number of missing rows is less than 10. The exceptions are King / Jarvis (UID 20) and Queen / Jarvis (25). These are missing 4281 and 272 hours of data, respectively, in August/September 2020.

cczhu commented 4 years ago

Differences in Data - Adelaide / Bathurst (2)

For timestamps that exist in both CSV and API data, we check for differences in volumes. For light vehicles at Adelaide / Bathurst, here's the difference between CSV and the most recent API pull:

image

and here is the difference between CSV and the older API pull:

image

Upward spikes indicate locations where the CSV has a higher volume, and represent the same phenomenon as these downward spikes we discussed several weeks ago. We see that the new API pull volume difference has fewer spikes than the old one, but spikes still exist.

There are also negative spikes, which indicate locations where the CSV has lower volume than the API pull.

Differences in volume are due to either

Differences in Data - Adelaide / Jarvis (4)

image

image

To investigate further, we look at a negative spike (CSV has less data) on 2020-08-08 10:00-12:00 and a positive spike (CSV has more data) on 2019-06-11 21:00 (2019-11-03 01:00 is due to Daylight Savings Time).

SELECT datetime_bin,
       leg,
       movement_uid,
       volume_csv,
       volume_api
FROM (
    SELECT datetime_bin,
            leg,
            movement_uid,
            volume volume_csv
    FROM miovision_csv.volumes_2020
    WHERE intersection_uid = 4 AND classification_uid = 1
        AND datetime_bin BETWEEN '2020-08-08 10:50:00' AND '2020-08-08 11:05:00'
) a
FULL OUTER JOIN (
    SELECT datetime_bin,
           leg,
           movement_uid,
           volume volume_api
    FROM miovision_api.volumes
    WHERE intersection_uid = 4 AND classification_uid = 1
        AND datetime_bin BETWEEN '2020-08-08 10:50:00' AND '2020-08-08 11:05:00'
) b USING (datetime_bin, leg, movement_uid)
WHERE (volume_csv IS NULL) OR (volume_api IS NULL) OR (volume_api != volume_csv)
ORDER BY datetime_bin, leg, movement_uid

image

Here we see BOTH missing minutes and differences between CSV and API volumes 😡.

For 2019-06-11 21:00:

SELECT datetime_bin,
       leg,
       movement_uid,
       volume_csv,
       volume_api
FROM (
    SELECT datetime_bin,
            leg,
            movement_uid,
            volume volume_csv
    FROM miovision_csv.volumes_2020
    WHERE intersection_uid = 4 AND classification_uid = 1
        AND datetime_bin BETWEEN '2019-06-11 21:15:00' AND '2019-06-11 21:35:00'
) a
FULL OUTER JOIN (
    SELECT datetime_bin,
           leg,
           movement_uid,
           volume volume_api
    FROM miovision_api.volumes
    WHERE intersection_uid = 4 AND classification_uid = 1
        AND datetime_bin BETWEEN '2019-06-11 21:15:00' AND '2019-06-11 21:35:00'
) b USING (datetime_bin, leg, movement_uid)
WHERE (volume_csv IS NULL) OR (volume_api IS NULL) OR (volume_api != volume_csv)
ORDER BY datetime_bin, leg, movement_uid

image

Here we're mainly just missing 15 minutes of data on the API side.

cczhu commented 4 years ago

Differences in Data - Richmond / Bathurst (26)

We look at 2019-11-09 08:00 - 13:00 and 2019-06-11 21:00 (again).

SELECT datetime_bin,
       leg,
       movement_uid,
       volume_csv,
       volume_api
FROM (
    SELECT datetime_bin,
            leg,
            movement_uid,
            volume volume_csv
    FROM miovision_csv.volumes_2020
    WHERE intersection_uid = 26 AND classification_uid = 1
        AND datetime_bin BETWEEN '2019-11-09 08:00:00' AND '2019-11-09 13:00:00'
) a
FULL OUTER JOIN (
    SELECT datetime_bin,
           leg,
           movement_uid,
           volume volume_api
    FROM miovision_api.volumes
    WHERE intersection_uid = 26 AND classification_uid = 1
        AND datetime_bin BETWEEN '2019-11-09 08:00:00' AND '2019-11-09 13:00:00'
) b USING (datetime_bin, leg, movement_uid)
WHERE (volume_csv IS NULL) OR (volume_api IS NULL) OR (volume_api != volume_csv)
ORDER BY datetime_bin, leg, movement_uid

image

SELECT datetime_bin,
       leg,
       movement_uid,
       volume_csv,
       volume_api
FROM (
    SELECT datetime_bin,
            leg,
            movement_uid,
            volume volume_csv
    FROM miovision_csv.volumes_2020
    WHERE intersection_uid = 26 AND classification_uid = 1
        AND datetime_bin BETWEEN '2019-06-11 21:00:00' AND '2019-06-11 21:59:00'
) a
FULL OUTER JOIN (
    SELECT datetime_bin,
           leg,
           movement_uid,
           volume volume_api
    FROM miovision_api.volumes
    WHERE intersection_uid = 26 AND classification_uid = 1
        AND datetime_bin BETWEEN '2019-06-11 21:00:00' AND '2019-06-11 21:59:00'
) b USING (datetime_bin, leg, movement_uid)
WHERE (volume_csv IS NULL) OR (volume_api IS NULL) OR (volume_api != volume_csv)
ORDER BY datetime_bin, leg, movement_uid

image

These results are pretty similar to Adelaide / Jarvis - in the case where the CSV hourly volume is smaller than the API one, we see missing rows of data and some rows that don't agree. In the case where the API hourly volume is smaller, we mainly see missing rows in the API data.

Conclusions

radumas commented 4 years ago

Thanks for this investigation. I think the highest priority check is if these volume drops affect the classifications and intersections @KatiRG is using right now.

In order to send off an email to the vendor: can you recreate the screenshot tables but with the input legs and movements, i.e. the format of miovision_csv.raw_data2020?

KatiRG commented 4 years ago

The activeto intersections we are currently investigating are: 33,34,35,36,37,38,40

cczhu commented 4 years ago

So the way I test volume drops is not by looking at absolute values (where it's very difficult to tell whether a volume change is physical or a data artifact) but by comparing the CSV and API pulls. Of course, the API pull doesn't have any data on intersections 33 - 40 until 2020-09-29 because of the active intersection selection problem discussed in this issue.

The only solution I can think of is to re-rig the script mentioned in that issue to re-pull data from June - Sept inclusive.

cczhu commented 4 years ago

As for producing tables with entry_dir_name and exit_dir_name, I'm not certain where those columns are defined and how to translate movement IDs into them. (miovision_csv.raw_data2020 is empty.)

radumas commented 4 years ago

You can find the query transforming the data https://github.com/CityofToronto/bdit_data-sources/issues/341#issuecomment-705746484

Don't worry about the exit_dir_name, entry and movement are enough to identify the movement.

Also making sure we understand what might be missing from the data used in ActiveTO is a priority.

cczhu commented 4 years ago

Ran a new pull of the Miovision API data using

 python intersection_tmc_junesept2020.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --start_date 2020-06-20 --end_date 2020-10-01 --pull &> log_refresh_2020_stns33_40.txt

This refreshed data from intersections 33 - 40 from their activation to 2020-09-30.

Looking at differences in the raw data between miovision_csv.volumes_2020 and miovision_api.volumes for cyclists (classification_uid = 2), I find an annoying number of drops, none of which severely affect the overall trends of the Miovision data but do cast doubt on the exact hourly volumes. The lines below are all CSV data - API data so negative values indicate data missing from the CSV but found by the most recent API pull. Data is binned to the hour.

33 - Bloor / Avenue

image

Those drops are all 1 hour long, and occur at:

The magnitude of the drop is on order of 20. Note that absolute volumes during those times are ~250, so these constitute 10% drops.

34 - Bloor / Sherbourne

image

Drops are < 20 and

35 - Bloor / Shaw

image

Drops are < 20 and

The largest drop of 35 is on the 30th of August, but the hourly volume there was 408 (according to CSV data).

36 - Brimley / Eglinton

image

Drops are < 3 and at

There's a gain of 2 at 2020-09-13 11:00.

37 - Danforth / Jones

image

Drops are < 15 and at

38 - Danforth / Woodbine

image

Drops are < 20 and at

40 - Dundas / River

image

Drops are at

The largest drops are around 10.

Spikes of ~50 at

Typical volumes are 40-80, so this could represent a ~20% change in the volumes. I wouldn't trust the API data over the CSV, though.

cczhu commented 4 years ago

Executive summary of the above:

cczhu commented 4 years ago

Got really tired of writing the same Postgres code over and over again, so here's a function czhu.miov_api_csv_compare, that extracts all differences between miovision_csv.volumes_2020 and miovision_api.volumes given an intersection_uid, classification_uid, start_time, and end_time.

create function czhu.miov_api_csv_compare (
    intersect_uid integer,
    class_uid integer,
    start_time timestamp,
    end_time timestamp
) 
returns table ( 
    datetime_bin_r timestamp,
    entry_dir_name_r text,
    movement_r text,
    volume_csv_r integer,
    volume_api_r integer
)
language plpgsql
as $$
begin
    return query
        WITH comp_data AS (
            SELECT datetime_bin,
                   leg,
                   movement_uid,
                   volume_csv,
                   volume_api
            FROM (
                SELECT datetime_bin,
                        leg,
                        movement_uid,
                        volume volume_csv
                FROM miovision_csv.volumes_2020
                WHERE intersection_uid = intersect_uid AND classification_uid = class_uid
                    AND datetime_bin BETWEEN start_time AND end_time
            ) a
            FULL OUTER JOIN (
                SELECT datetime_bin,
                       leg,
                       movement_uid,
                       volume volume_api
                FROM miovision_api.volumes
                WHERE intersection_uid = intersect_uid AND classification_uid = class_uid
                    AND datetime_bin BETWEEN start_time AND end_time
            ) b USING (datetime_bin, leg, movement_uid)
            WHERE (volume_csv IS NULL) OR (volume_api IS NULL) OR (volume_api != volume_csv)
        )
        SELECT a.datetime_bin,
               a.leg entry_dir_name,
               b.movement,
               a.volume_csv,
               a.volume_api
        FROM comp_data a
        LEFT JOIN miovision_csv.movements b USING (movement_uid)
        ORDER BY 1, 2, 3;
end;$$
cczhu commented 4 years ago

For example, to replicate the query here for Adelaide / Jarvis (uid = 4) light vehicles (uid = 1) from '2020-08-08 10:50:00' to '2020-08-08 11:05:00':

SELECT * FROM czhu.miov_api_csv_compare(4, 1, '2020-08-08 10:50:00', '2020-08-08 11:05:00');

image

To look at Bloor / Avenue (uid = 33) cyclists from '2020-08-07 15:00:00' to '2020-08-07 15:59:00'):

SELECT * FROM czhu.miov_api_csv_compare(33, 2, '2020-08-07 15:00:00', '2020-08-07 15:59:00');

image

cczhu commented 4 years ago

I'm pretty sure entry_dir_name and leg are the same... The function assumes this is so.

cczhu commented 3 years ago

Following extensive discussion with Brent Rogerson from Miovision, his e-mail on 2020-11-04 provides some details:

When we did our data dump, we compiled the data from the Raw Counts directly (which is where the Cloud API pulls from).
We did this because the API cannot handle 2 years worth data pull at one time. The API is designed around short intervals of data that can be pulled over time from a large database of TMC data.

The API accesses the raw data counts from a backend storage search client that's intended to handle billions and billions of records, and relatively quickly search and aggregate them. But, when you ask a database for a count of records, it needs to load up every one of those records in memory to ensure it's a match, and add it to the count. This tool is geared towards searching and aggregating; Its method for counting applies an algorithm that produces a 95+% accuracy count of records without having to count each one.

We learned through this process that the data dump we provided you with is 100% accurate to the raw counts provided from the smartsense and the API is returning a 95+% accuracy due to this architecture/tool state above.

Thus:

cczhu commented 3 years ago

We're now just going to update both the API (on a daily basis) and CSV (on a perhaps monthly one) until Miovision can resolve the discrepancies between the two. Once pipelines for both are set up we can close this issue and #341.

cczhu commented 3 years ago

Due to the issue described in #355, we're missing six hours of each day's data since 2020-11-03 (inclusive) for intersections 1 - 40, and since 2020-11-09 (inclusive) for intersections 41 - 54:

With raw as (
    SELECT a.intersection_uid,
           b.intersection_name,
           a.datetime_bin::date date_bin,
           MIN(a.datetime_bin) first_timestamp,
           MAX(a.datetime_bin) most_recent_timestamp
    FROM miovision_api.volumes a
    LEFT JOIN miovision_api.intersections b USING (intersection_uid)
    WHERE a.datetime_bin BETWEEN '2020-11-03' AND '2020-11-13 23:59:59'
    GROUP BY a.intersection_uid, b.intersection_name, a.datetime_bin::date
    ORDER BY a.intersection_uid
)
SELECT *
FROM raw

image

To repair this, hacked intersection_tmc.py to produce a script that only downloads the final 6 hours of each day's worth of data. Manually specified intersections, and inserted either 2020-11-03 or 2020-11-09 depending on whether we're updating the old or new intersections.

The script doesn't perform 15-minute aggregation, and we'll have to rig up a separate script to do that (it may require us dump existing aggregated data and re-aggregate all of October to accommodate new stations).

import intersection_tmc as itmc
import click
import configparser
from psycopg2 import connect
import dateutil.parser
import traceback
import sys
import datetime
from requests import exceptions
from time import sleep

CONTEXT_SETTINGS = dict(
    default_map={'run_api': {'flag': 0}}
)

@click.group(context_settings=CONTEXT_SETTINGS)
def cli():
    pass

@cli.command()
@click.option('--path' , default='config_miovision_api_bot.cfg', help='enter the path/directory of the config.cfg file')
def run_api(path):

    CONFIG = configparser.ConfigParser()
    CONFIG.read(path)
    api_key = CONFIG['API']
    key = api_key['key']
    dbset = CONFIG['DBSETTINGS']
    conn = connect(**dbset)
    conn.autocommit = True
    itmc.logger.debug('Connected to DB')

   # CHANGE TO SET START AND END DATES
    start_date = dateutil.parser.parse(str('2020-11-03'))
    end_date = dateutil.parser.parse(str('2020-11-12'))
    start_time = itmc.local_tz.localize(start_date)
    end_time = itmc.local_tz.localize(end_date)
    itmc.logger.info('Pulling from %s to %s' %(start_time,end_time))

    # We normally run without --dupes.
    dupes = False
    # CHANGE TO SET INTERSECTIONS
    intersection = tuple(range(1, 41))

    try:
        pull_data(conn, intersection, start_time, end_time, path, key, dupes)
    except Exception as e:
        itmc.logger.critical(traceback.format_exc())
        sys.exit(1)

def daterange_last6hrsofday(start_time, end_time):
    """Generator for a sequence of the last 6 hours of each day."""
    dt_1day = datetime.timedelta(hours=24)
    dt_6hrs = datetime.timedelta(hours=6)
    dt_18hrs = datetime.timedelta(hours=18)
    for i in range(round((end_time - start_time) / dt_1day)):
        c_start_t = start_time + dt_18hrs + i * dt_1day
        yield (c_start_t, c_start_t + dt_6hrs)

def pull_data(conn, intersection, start_time, end_time, path, key, dupes):

    int_info = itmc.IntersectionInfo(conn, intersection)

    if len(int_info.int_uids) == 0:
        itmc.logger.critical('No intersections found in '
                             'miovision_api.intersections for the specified '
                             'start time.')
        sys.exit(3)

    # So we don't make the comparison thousands of times below.
    user_def_intersection = len(intersection) > 0

    for (c_start_t, c_end_t) in daterange_last6hrsofday(start_time, end_time):

        table = []

        for intersection_uid in int_info.int_uids:
            intersection_id1, intersection_name = (
                int_info.getinfo(intersection_uid))

            if int_info.is_active(intersection_uid, c_start_t):
                itmc.logger.info(intersection_name + '     ' + str(c_start_t))

                for attempt in range(3):
                    try:
                        table_veh = itmc.get_intersection_tmc(
                            c_start_t, c_end_t, intersection_id1,
                            intersection_uid, key)
                        table_ped = itmc.get_pedestrian(
                            c_start_t, c_end_t,
                            intersection_id1, intersection_uid, key)
                        break
                    except (exceptions.ProxyError,
                            exceptions.RequestException,
                            itmc.RetryError) as err:
                        itmc.logger.error(err)
                        itmc.logger.warning('Retrying in 2 minutes '
                                            'if tries remain.')
                        sleep(120)
                    except itmc.BreakingError as err:
                        itmc.logger.error(err)
                        table_veh = []
                        table_ped = []
                        break
                else:
                    itmc.logger.error(
                        'Could not successfully pull '
                        'data for this intersection after 3 tries.')
                    table_veh = []
                    table_ped = []

                table.extend(table_veh)
                table.extend(table_ped)

                # Hack to slow down API hit rate.
                sleep(1)

            elif user_def_intersection:
                itmc.logger.info(intersection_name + ' not active on '
                                 + str(c_start_t))

        itmc.logger.info('Completed data pulling from %s to %s'
                         %(c_start_t, c_end_t))

        try:
            itmc.insert_data(conn, c_start_t, c_end_t, table, dupes)
        except psycopg2.Error as exc:
            itmc.logger.exception(exc)
            sys.exit(1)

    itmc.logger.info('Done')

if __name__ == '__main__':
    cli()

Ran with

python myscript.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg &> pull_november_1800.txt
cczhu commented 3 years ago

Check following data pull:

With raw as (
    SELECT a.intersection_uid,
           b.intersection_name,
           a.datetime_bin::date date_bin,
           MIN(a.datetime_bin) first_timestamp,
           MAX(a.datetime_bin) most_recent_timestamp
    FROM miovision_api.volumes a
    LEFT JOIN miovision_api.intersections b USING (intersection_uid)
    WHERE a.datetime_bin BETWEEN '2020-11-03' AND '2020-11-13 23:59:59'
    GROUP BY a.intersection_uid, b.intersection_name, a.datetime_bin::date
    ORDER BY a.intersection_uid
)
SELECT *
FROM raw
WHERE most_recent_timestamp::time <= '23:59:00'
intersection_uid intersection_name date_bin first_timestamp most_recent_timestamp
1 Adelaide / Bathurst 2020-11-07 2020-11-07 00:00:00 2020-11-07 23:58:00
41 Bloor / Bay 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
42 College / University 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
43 Danforth / Logan 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
44 Dundas / Bloor 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
45 Eglinton / Kingston 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
46 Ellesmere / Morningside 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
47 Greenwood / Danforth 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
48 Huntingwood / Warden 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
49 Lawrence / Brimley 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
50 Morningside / Kingston 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
51 Runnymede / Bloor 2020-11-03 2020-11-03 00:00:00 2020-11-03 23:58:00
51 Runnymede / Bloor 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
52 Sheppard / Wilmington 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
53 St. Clair / Brimley 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00
54 Thorncliffe Park / Overlea 2020-11-08 2020-11-08 00:00:00 2020-11-08 22:59:00

So there's still some missing data on 2020-11-08 23:00:00.

Modified the above script to handle that block of time. Used

def daterange_last1hrsofday(start_time, end_time):
    """Generator for a sequence of the last 6 hours of each day."""
    dt_1day = datetime.timedelta(hours=24)
    dt_1hrs = datetime.timedelta(hours=1)
    dt_23hrs = datetime.timedelta(hours=23)
    for i in range(round((end_time - start_time) / dt_1day)):
        c_start_t = start_time + dt_23hrs + i * dt_1day
        yield (c_start_t, c_start_t + dt_1hrs)
cczhu commented 3 years ago

Quick check for duplicates yields no rows:

WITH raw_data AS (
    SELECT * FROM miovision_api.volumes
    WHERE datetime_bin >= '2020-11-03'
)
SELECT intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume, COUNT(volume) n_rows
FROM raw_data
GROUP BY intersection_uid, datetime_bin, classification_uid, leg, movement_uid, volume
HAVING COUNT(volume) > 1
cczhu commented 3 years ago

Pulling intersections removed after 2020-06-15 into miovision_api.volumes, partly to see if it's possible. All data is missing for them in 2019 as well (since the old script checked if the stations were still active today, not on the last date being pulled), and I'll run that over the weekend.

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --intersection=9 --intersection=11 --intersection=13 --intersection=14 --intersection=16 --intersection=19 --intersection=30 --intersection=32 --start_date 2020-01-01 --end_date 2020-06-15 --pull &> pull_legacy_intersections_2020.txt
cczhu commented 3 years ago

Pulling 2019

python intersection_tmc.py run-api --path /etc/airflow/data_scripts/volumes/miovision/api/config.cfg --intersection=9 --intersection=11 --intersection=13 --intersection=14 --intersection=16 --intersection=19 --intersection=30 --intersection=32 --start_date 2019-01-01 --end_date 2020-01-01 --pull &> pull_legacy_intersections_2019.txt
cczhu commented 3 years ago

The 2019 pull ran successfully for around 6 hours before failing on:

13 Nov 2020 15:04:33            INFO    King / Strachan     2019-08-22 06:00:00-05:00
13 Nov 2020 15:04:33            ERROR    HTTPSConnectionPool(host='api.miovision.com', port=443): Max retries exceeded with url: /intersections/c3834b0e-7f87-458a-9742-0832048f04db/crosswalktmc?endTime=2019-08-22+12%3A00%3A00-05%3A00&startTime=2019-08-22+06%3A00%3A00-05%3A00 (Caused by SSLError(SSLError("bad handshake: Error([('SSL routines', 'tls_process_server_certificate', 'certificate verify failed')],)",),))
13 Nov 2020 15:04:33            WARNING    Retrying in 2 minutes if tries remain.

This same failure has affected our Airflow scripts as well. Not sure if it's because of too much pulling in general or not. If it is, great reason not to try to re-pull all the data after Miovision upgrades its API.

Contacted Miovision to get it resolved.