pydata / parallel-tutorial

Parallel computing in Python tutorial materials
303 stars 111 forks source link

Add draft of nyc-taxi futures notebook #27

Closed mrocklin closed 7 years ago

mrocklin commented 7 years ago

This adds a notebook that transitions readers from concurrent.futures to dataframes while they first interact with distributed data.

This needs to be modified to use parquet data on gcs

mrocklin commented 7 years ago

It looks like we also need to add the nyc-taxi data to gcs.

mrocklin commented 7 years ago

cc @martindurant for review. This is new material, and so probably needs a critical eye.

martindurant commented 7 years ago

Looks good at first glance, don't have much to say. I can go over with a fine-tooth comb yet. There is reference here to "before we did" (the threaded scheduler) - but for their is no obvious order comparing to the rest of the tutorial. Is this only for temporary use, as a extra add-on notebook, or is there a plan for how to fit in with the rest of the tutorial?

mrocklin commented 7 years ago

We typically do the first half on their machine, covering concurrent.futures with map (section 1), submit (section 2), and then collections with Spark RDDs or Dask bags (section 3). We do a capstone project straddling the break and then come back with a distributed cluster.

mrocklin commented 7 years ago

I expect this to fit in around notebook 07

mrocklin commented 7 years ago

cc @quasiben @martindurant

Here is a status update on what I have and current issues. We have the following:

  1. A notebook where students read blocks of pandas data from a parquet file on GCS and do a few simple distributed database computations
  2. A notebook where students see those same computations with dask.dataframe and then are asked to do more complex computations using some basic pandas syntax that is explained for them

Some problems:

  1. I suspect that the parquet data I have made for dask won't work for spark due to int96 datetimes (martin knows about this).
  2. I also suspect that the data we have placed on GCS won't be easy to read with Spark. We probably need to write to S3.
  3. I haven't done the second notebook in Spark Dataframes yet (which I think we should do)
  4. I have chosen a large partition size in order to minimize the amount of thrift metadata the students send around in the futures section (the read_row_group function closes over the parquetfile object, which is large). I'm optimizing for simplicity here over efficiency. @martindurant if you can think of a faster solution that still feels easy-ish I'd love to hear it. I think that the students would have a better experience if we had more and smaller partitions.
  5. I've been using JLab, but widgets and things don't work. There may be some tradeoff here with bloating the docker image. I'm not sure. We can stick to classic notebooks to avoid this if we want to, although I think that JLab is nice.

I'm going to be out for a bit. If anyone wants to take on anything here that'd be welcome, though I think I can also handle things on Monday.

For reference I have been creating the parquet data after first sorting by the pickup time. I'm doing this using dask-kubernetes and the convenient notebook 06, which does this for us nicely. I'm also mapping lambda x: x.sort_index() after calling set_index to make sure that things stay sorted.

mrocklin commented 7 years ago

Something like this

import gcsfs
import dask.dataframe as dd

df = dd.read_csv('gcs://anaconda-public-data/nyc-taxi/csv/2015/yellow_*.csv', 
                 parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                 storage_options={'token': 'cloud'})

df = df.set_index('tpep_pickup_datetime')
df = df.astype({'VendorID': 'int8', 'passenger_count': 'int8', 'payment_type': 'int8', 'RateCodeID': 'int8'})
df.repartition(npartitions=50).map_partitions(lambda x: x.sort_index()).to_parquet('gcs://anaconda-public-data/nyc-taxi/2015.parquet')
martindurant commented 7 years ago

Some thoughts.

mrocklin commented 7 years ago

I have no desire to replicate the futures notebook in spark, just the second dataframe notebook (I pushed this after you last reviewed)

Serialization is an issue for the futures code. My function there closes over pf.

I'll eventually use JLab for other talks and demos at the conference, so it's something that I need to resolve regardless.

I agree that gcs and spark may not be necessary. I think our plan is to use s3 for the Spark section.

On Sep 24, 2017 18:34, "Martin Durant" notifications@github.com wrote:

Some thoughts.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/pydata/parallel-tutorial/pull/27#issuecomment-331744965, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLbDs6EsKv0kQ9-SdPmTU3X2m_Erks5sltjkgaJpZM4Ph4XF .

martindurant commented 7 years ago

My function there closes over pf.

I could conceive of copying pf objects and only giving on row_group to each; but could this be a learning point to scatter pf to the cluster beforehand?

For spark, the nyc-taxi-dask-dataframes.ipynb notebook? I could have a go (on a single partition of the original, not on a cluster), but I bet my spark is rough. That would be airport work tomorrow.