Closed tiffanychu90 closed 1 year ago
@ian-r-rose: Can I pick your brain on how to use more dask
? A super meaty rewrite of the workflow. Broadly speaking, I say the entire workflow still needs to undergo the following iterations:
dask.futures
can be used?scheduler
and delay
and persist
and futures
concepts? :wave: Hi @tiffanychu90! I would avoid explicit manipulation of distributed.Future
s to start. If you are using a distributed cluster (it sounds like you are?) then they will be involved, but I'd suggest starting with higher level APIs like dask.dataframe
or dask.delayed
.
If things are already in a dask-geopandas
dataframe, then I'd suggest keeping things that way. You can do custom processing of individual partitions using df.map_partitions(some_custom_clipping_fn)
. If things are not already in a dataframe, and you just want to run some pre-processing functions, then I'd suggest wrapping your functions with dask.delayed
to execute them on the cluster.
In general, it's not your responsibility to decide how to distribute work across the workers. You decide what tasks to run, and what dependencies they have, and then the scheduler decides how to execute them. If they are embarrassingly parallel, then it should evenly distribute them. If some tasks depend on some others, it will be a bit more complex, but it still tries to make sure that work is well-distributed.
persist
certainly can be useful, but mostly if a large number of downstream tasks all depend on some pre-processing step, and you want to make sure that that step isn't cleaned up.
I could probably get a bit more specific if you have some pseudo-code to share.
@ian-r-rose: I'm going to digest this and tackle incorporating dask.delayed
and df.map_partitions
and getting it to run successfully on the distributed cluster. Thanks for the very helpful tips!
@ian-r-rose: (feel free to tackle this during the work week!) Do you have feedback for how to use both map_partitions
with loops? Haven't implemented dask.delayed
yet, but want to try and figure out how to parallelize some of the loops first.
I tried 2 methods in this notebook.
map_partitions
, but mostly because I think I'm getting confused with how to deal with groups within the partition. Since I need to just focus on a subset within the same transit operator to do the spatial join, I'm not sure how to handle it, since I basically just bury the loop within the partition. Is this ideal? I'm not sure if I want to set the same number of partitions as there are transit operators, since that would result in 200-ish partitions.
After receiving a research request, use this template to plan and track your work. Be sure to also add the appropriate project-level label to this issue (eg gtfs-rt, DLA).
Epic Information - HQTA v3
Summary
dask
?dask.dataframes
anddask_geopandas
is done, but it does take 4-5 hrs to run...too long!dask
cluster anddask.distributed client connection
in place, try to incorporate partitioning, delays, futures, etc and see which components can be sped uppersist
rather thancompute
wherever possibleResearch required:
dask.distributed
clustercompiled_cached_views/
instead ofcached_views
):stop_times
aggregation by time-of-day...which is actually before 12pm and after 12pm and finding the max, this can be done at the startshape_id
by direction, then cutting it into hqta segments, this can be done across all operatorsNotes, misc:
Reviewers [Stakeholders]
1.
Issues
compiled_cached_views
parquets. Conceptual change: no more looping across operators. Find longest shapes for route, find symmetric difference, and cut into hqta segments right away. Separately, aggregatestop_times
to stop-level, then join to hqta segments, tag hq transit corridors. ReplaceB1
,B2
scriptsdf.itertuples
to find intersections, instead of moving it betweendg.GeoDataFrame
andgpd.GeoDataFrame
. ReplaceC1
,C2
, scripts, dropC3
completely, moveC4
up (no substantial changes needed forC4
,D1
,D2
, since those deal with assembling the export-ready files)intersection
for 2 GeoSeries insetad ofdf.itertuples
...faster, simpler, cleanervalid_hqta_operators.json
by looking for cachedstop_times
and not all 4 tables. Move towards using more cached parquets instead of runningviews.gtfs_schedule_index_feed_trip_stops
dask.delayed
objects to download data and run our queries, rework so thattrips
,routelines
, andstops
are downloaded simultaneously. Also test majority of latter scripts for data processing to usedask.distributed
cluster and make sure parquets and geoparquets save to GCS. Move all intermediate file storage to GCS, no local.Deliverables