cal-itp / data-infra

Cal-ITP data infrastructure
https://docs.calitp.org/data-infra
GNU Affero General Public License v3.0
47 stars 12 forks source link

Research: Large queries using dim tables in Jupyter Notebooks #594

Closed tiffanychu90 closed 2 years ago

tiffanychu90 commented 2 years ago

Question

Larger queries have run into a size limit.

Investigating a question like, on a given day, how many times did the bus stop at a certain stop_id is one of the bigger queries. Opting for a "long" dataframe because I wanted to be able to calculate where the min/max/midpoint for a trip_id using stop_sequence and subset the data...and rather than doing the count directly in the query, I wanted to explore a bit with pandas.

import os
os.environ["CALITP_BQ_MAX_BYTES"] = str(100_000_000_000)

(tbl.views.gtfs_schedule_fact_daily_trips()
 >> filter(_.service_date == SELECTED_DATE)
 >> left_join(_, tbl.views.gtfs_schedule_dim_stop_times(),
             ["calitp_itp_id", "trip_id"])
 >> select(_.itp_id==_.calitp_itp_id, _.trip_id, _.service_date,
           _.stop_id, _.stop_sequence, _.arrival_time )
 >> filter(_.arrival_time >= "05:00:00", 
          _.arrival_time <= "20:00:00")
 >> group_by(_.itp_id, _.trip_id, _.service_date, 
             _.stop_id, _.stop_sequence)
 >> count(_.arrival_time)
 >> collect()
)

Metrics

Data sources

(Data Servicess Team to Copy and Fill Out Below)

The QuVR MD template below will be filled out by a member of the data services team. This allows us to describe the request, in a way that is easy to hand-off for analysis. After the research phase, we will sync with the asker to figure out if the metric and dashboard pieces are needed.

Before starting research:

After reviewing research with the asker:

machow commented 2 years ago

Do you mind trying to use something like this query below to get stop times for a given day? Happy to pair and go over. The trick is that in your query there is a time dimension that affects how we get the data from dim stop times.

edit: nevermind, I'm seeing your querying for a specific date, put simpler approach below...

old SQL code ```SQL WITH -- select stop times across a specific date range feed_trips_stops AS ( SELECT FTS.*, D.full_date FROM `views.gtfs_schedule_index_feed_trip_stops` FTS -- Join in any dimensional tables here, -- they should only be joined on the *_key column JOIN views.gtfs_schedule_dim_feeds USING (feed_key) -- Join in the dim date table to expand the data for -- a specific date range JOIN `views.dim_date` D ON FTS.calitp_extracted_at <= D.full_date AND FTS.calitp_deleted_at > D.full_date WHERE -- replace with dates you're interested in D.full_date BETWEEN "2021-06-01" AND "2021-06-07" ) SELECT full_date, COUNT(*) AS n FROM feed_trips_stops GROUP BY 1 ```
tbl_stop_times = (
    tbl.views.gtfs_schedule_dim_stop_times()
    >> filter(_.calitp_extracted_at <= SELECTED_DATE, _.calitp_deleted_at > SELECTED_DATE)
)

(tbl.views.gtfs_schedule_fact_daily_trips()
 >> filter(_.service_date == SELECTED_DATE)

 # note tbl_stop_times swapped in here ----
 >> left_join(_, tbl_stop_times,

            # also added url number to the join keys ----
             ["calitp_itp_id", "calitp_url_number', "trip_id"])
 >> select(_.itp_id==_.calitp_itp_id, _.trip_id, _.service_date,
           _.stop_id, _.stop_sequence, _.arrival_time )
 >> filter(_.arrival_time >= "05:00:00", 
          _.arrival_time <= "20:00:00")
 >> group_by(_.itp_id, _.trip_id, _.service_date, 
             _.stop_id, _.stop_sequence)
 >> count(_.arrival_time)
 >> collect()
)
tiffanychu90 commented 2 years ago

Still running into an issue with size. I am using os.environ["CALITP_BQ_MAX_BYTES"] = str(100_000_000_000), but the error that comes up is DatabaseError: 500 Query exceeded limit for bytes billed: 5000000000. 6688866304 or higher required. Is setting the max_bytes not working?

machow commented 2 years ago

Ah, sorry--I think you have to set max bytes prior to importing the tables. So like this...


# set bytes prior to import of other things ----
import os
os.environ["CALITP_BQ_MAX_BYTES"] = str(20_000_000_000)

# import tools for analysis ----
from calitp.tables import *
from siuba import *

SELECTED_DATE = "2021-10-01"

tbl_stop_times = (
    tbl.views.gtfs_schedule_dim_stop_times()
    >> filter(_.calitp_extracted_at <= SELECTED_DATE, _.calitp_deleted_at > SELECTED_DATE)
)

(tbl.views.gtfs_schedule_fact_daily_trips()
 >> filter(_.service_date == SELECTED_DATE)

 # note tbl_stop_times swapped in here ----
 >> left_join(_, tbl_stop_times,

            # also added url number to the join keys ----
             ["calitp_itp_id", "calitp_url_number", "trip_id"])
 >> select(_.itp_id==_.calitp_itp_id, _.trip_id, _.service_date,
           _.stop_id, _.stop_sequence, _.arrival_time )

 >> filter(_.arrival_time >= "05:00:00", 
          _.arrival_time <= "20:00:00")
 >> group_by(_.itp_id, _.trip_id, _.service_date, 
             _.stop_id, _.stop_sequence)
 >> count(_.arrival_time)
 >> collect()
)

When I ran it, the query returned 9 million rows, roughly the size of the stop_times table for a given day, so I wasn't able to collect (9 million isn't the craziest number, but I think because things like trip and stop id are strings, it might be pretty memory intensive).