pangeo-data / pangeo

Pangeo website + discussion of general issues related to the project.
http://pangeo.io
693 stars 187 forks source link

Creating a US electricity system themed Pangeo deployment #489

Closed zaneselvans closed 5 years ago

zaneselvans commented 5 years ago

I'm working as part of @catalyst-cooperative to compile useful datasets relevant to the US electricity system for use by academic researchers, climate advocates, data journalists, and anyone else working in the public interest to address climate change and improve our energy system.

While I'm sure our data is nowhere near as big as what the geoscience community is working with, it has started getting hard to work with on a normal laptop, and annoying to move around. We've integrated one 100GB data set, and there are another two data sets that are each more than 100GB which we want to bring in, and moving toward a shared platform with storage and computation resources seems like the right way to go. We've got all the environment files set up to allow the creation of a deployment container (we set them up to use Travis CI), so our repository can be launched with Binder... but the data is too large to store in the repository, so there's no data available.

Right now PUDL (=Public Utility Data Liberation) is set up to pull down the original data from government sources (as CSVs, or archaic binary database files, or Excel spreadsheets), and transform them into a single well normalized postgres database. This also isn't the friendliest interface for users. While they may appreciate that the data processing pipeline is open and publicly available for inspection, mostly they just want to get the data and start playing with it.

As a result we've started work to generate some commonly used outputs from the database, and package them up using the tabular data package standard developed by Open Knowledge International. We want to publish new data packages whenever new data is released, and provide DOIs for the data, so it can be cited in research, and so folks can know exactly what data was used in an analysis. This can work well for many of the smaller (100MB to a few GB) data sets that we've integrated, but it doesn't seem like it's going to scale up well. At the same time, if we're going to provide access to the smaller data for download anyway, being able to store those files in the same place for use in the cloud and for download would be great, and would ensure they were always in sync.

Poking around at Pangeo a little, it seems like the data storage model is file based, and primarily consists of large multi-dimensional arrays. Is that right? Are folks hosting relational data as well? Is there a database-like way to interact with the storage buckets? Could we use the data packages for the smaller data, and another format for the big data? I guess I need to get familiar with Dask. I'm having a hard time imagining reading in a 100GB CSV file every time I open up a notebook, but maybe that's normal.

I'm also wondering what the best way to have folks integrate the code they're working on with the data we would be storing. The Binder setup is nice, since anybody can deploy their own repository to the place where the data is. Is it easy for folks to commit their notebook directly back into the repository that was cloned to the Pangeo instance?

How does one manage the costs of running a Pangeo instance? Is there some kind of billing mechanism set up so you can see how much resources each user is using and charge them appropriately? Does everybody chip in some fixed amount just to have access to the data? Or is it all covered by Google & NSF at this point? We're trying to find funding to keep the work going, but at the moment we're largely a volunteer effort. I would think ~1TB of storage would be enough for our foreseeable data catalog.

Is there just one big storage pool from which different Pangeo instances select what they want access to, or does each one have its own isolated collection? Mostly I think our data is not going to be already available, but we're interested in pulling in something like the ERA Interim or MERRA-2 historical climate data to let people investigate the relationship between weather and energy use, and weather and renewable generation. And there are some datasets (PV WATTS & WINDS) form NREL that let you estimate renewable generation over time in different locations that would be good for energy systems modelers.

Anyway, I hope these questions aren't too basic. Thanks for your work on this, it seems awesome.

mrocklin commented 5 years ago

An answer to one of your questions

Are folks hosting relational data as well?

Yes.

Is there a database-like way to interact with the storage buckets? Could we use the data packages for the smaller data, and another format for the big data? I guess I need to get familiar with Dask. I'm having a hard time imagining reading in a 100GB CSV file every time I open up a notebook, but maybe that's normal.

Reading a 100GB CSV file (or many CSV files) is perfectly normal. You might also consider more modern standard file formats like Parquet, which is self describing and generally much more efficient.

Assuming that these are stored on the same cloud storage system as where the compute is running it's a pretty smooth process to read and process these each time.

How does one manage the costs of running a Pangeo instance? Is there some kind of billing mechanism set up so you can see how much resources each user is using and charge them appropriately? Does everybody chip in some fixed amount just to have access to the data? Or is it all covered by Google & NSF at this point? We're trying to find funding to keep the work going, but at the moment we're largely a volunteer effort. I would think ~1TB of storage would be enough for our foreseeable data catalog.

Currently we're just handling all of the costs and passing them along to Google/AWS who have generously donated credits. At our current scale costs are pretty low. As things grow larger we imagine that other mechanisms will develop.

Storing 1TB is pretty cheap.

I think that the next steps in your situation would be to upload some dataset to the cloud (we can probably help with this) and then create a binder image that demonstrates computation on that data. That example might help to drive further conversations.

zaneselvans commented 5 years ago

We've been thinking about using Parquet for the larger datasets that aren't going to be easily distributed as data packages anyway. I've just spent some time playing around with outputting the EPA CEMS data from pandas to Parquet now. It's so fast!

As Parquet files, it looks like the EPA CEMS should be ~10GB on disk. Can it/Should it be partitioned into different files (e.g. by year and/or by state) or just one big file? Maybe this is a good excuse to experiment with a bigger-than-memory dataframe in Dask.

What are the mechanics of uploading or updating data? Do the Pangeo administrators control the entire data catalog, or do the individual datasets have their own maintainers? Would we just write it directly to a Google cloud storage bucket?

martindurant commented 5 years ago

Can it/Should it be partitioned into different files

The data may well be partitioned into "row groups" within the file already, depending on how exactly you wrote it.

Actually using columns to partition on can be very handy and save space too, but only if those columns do split the rows into a relatively small number of parts. When there are very many parts, you have additional overhead from many tasks (in Dask) and of many small file-reads and even getting file listings and parsing the metadata can become expensive. Obviously, choosing the columns to split on will depend on how you mean to access the data.

Specifically for Dask, and other parallel systems, having multiple files for a parquet data-set is the norm, and in fact you may wish to use Dask to do the writing. The recommendation is to keep the parts >>100MB, to avoid the overheads becoming significant, with an upper limit on size such that you have enough parts to keep all workers busy while not being too big to fill memory (remember that Pandas may require a several times the on-disc size while processing).

zaneselvans commented 5 years ago

I've been trying to get this bigger-than-memory output working and so far have been failing. The parquet files output, but when I try and read them back in, there's a type mismatch on all of the object columns -- they seem to be a mix of string and null types.

Our data processing pipeline for loading the data into the database returns a generator, which generates pandas dataframes ready to be put into the database, and I'm trying to use those to build up a dask dataframe, appending each new one as it comes, and then outputting the entire dask dataframe to parquet. But maybe this is a bad hack...

I haven't been able to find a canonical example of how one might do this. Do you have any pointers?

martindurant commented 5 years ago

You should put some code of how you are writing the data (which backend, etc). For pyarrow, I don't think you can enforce dtypes on the pieces right now, but fastparquet has the object_encoding= keyword (which defaults to strings-with-nulls).

zaneselvans commented 5 years ago

I got that previous version of it working by switching from pyarrow to fastparquet as you suggested. After testing on a subset of the EPA CEMS data it seemed to be working, so I left it to run when I went to sleep last night, and it failed without error somehow.

So now, I'm trying to use the dask dataframe's ability to append to an existing parquet dataset instead.

First I'm creating a "template" pandas dataframe, which is empty, but has the right columns and datatypes associated with them:

cems_dtypes = {
    'state': 'object',
    'plant_name': 'object',
    'plant_id_eia': 'uint16',
    'unitid': 'object',
    'gross_load_mw': 'float32',
    'steam_load_1000_lbs': 'float32',
    'so2_mass_lbs': 'float32',
    'so2_mass_measurement_code': 'object',
    'nox_rate_lbs_mmbtu': 'float32',
    'nox_rate_measurement_code': 'object',
    'nox_mass_lbs': 'float32',
    'nox_mass_measurement_code': 'object',
    'co2_mass_tons': 'float32',
    'co2_mass_measurement_code': 'object',
    'heat_content_mmbtu': 'float32',
    'facility_id': 'uint16',
    'unit_id_epa': 'uint32',
    'operating_datetime': 'datetime64',
    'operating_seconds': 'float32'
}
cems_df_template = pd.DataFrame(columns=cems_dtypes.keys())
cems_df_template = cems_df_template.astype(cems_dtypes)

Then I'm iterating through the dataframes generated by the the data processing pipeline, and using that template to impose datatypes on the pandas dataframe, which is then used to create a dask dataframe that creates or appends to the exisiting parquet dataset on disk:

# To deal with larger-than-memory datasets, the extract
# function returns a generator of raw dataframes
raw_dfs = pudl.extract.epacems.extract(epacems_years=[2017],
                                       #states=['CO','WY','MT'],
                                       states=['WY','MT'],
                                       #states=pudl.constants.cems_states,
                                       verbose=True)
# Similarly, the transform function returns a generator
# of dictionaries of dataframes, with (year, month, state)
# tuples as the keys
cems_chunks = pudl.transform.epacems.transform(raw_dfs)
for tfr_df_dict in cems_chunks:
    for yr_mo_st in tfr_df_dict:
        print(yr_mo_st)
        # cems_for_parquet converts a time interval into
        # number of seconds, since parquet can't do intervals
        # and (optionally) downcasts to the smallest float
        # or int types that can represent the data. But here
        # I'm not doing that,
        tmp_df = cems_for_parquet(tfr_df_dict[yr_mo_st],
                                  downcast=False)
        # Impose the types defined above:
        tmp_df = tmp_df.astype(cems_dtypes)
        # Create a dask dataframe from our new chunk
        tmp_dd = dd.from_pandas(tmp_df, chunksize=100000)
        print("Appending to Parquet")
        tmp_dd.to_parquet('parquet', compression='gzip',
                          engine='fastparquet', append=True)

And now as soon as it tries to append, I get:

ValueError: Appended divisions overlapping with the previous ones.
Previous: 18599 | New: 0

It seems like there must be a simple way to do this, and I'm just not finding it.

zaneselvans commented 5 years ago

Okay, after a couple of frustrating days, I think I have a better sense of how to use Parquet and Dask dataframes together. Many thanks to @TomAugspurger and his Scaling Pandas post.

I have a script that takes the original EPA CEMS zipfiles (all 12,000 of them) and creates one Parquet file per state for each year, with each year (1995-2017) having its own directory. The script takes about an hour to run on my laptop, and the resulting Parquet dataset takes up about 7GB on disk. I've been able to load it into a dask dataframe and do some simple calculations. However, it also ends up using all memory and getting killed by the OS. The Parquet files on disk are between 300KB and 30MB in size (depending on what state they represent). The data uncompressed is ~10x larger. With 24GB of memory I wouldn't think that would be a problem?

mrocklin commented 5 years ago

It depends on what kinds of computations you're trying to do, and on how large the result is.

On Sat, Nov 24, 2018 at 6:42 PM Zane Selvans notifications@github.com wrote:

Okay, after a couple of frustrating days, I think I have a better sense of how to use Parquet and Dask dataframes together. Many thanks to @TomAugspurger https://github.com/TomAugspurger and his Scaling Pandas https://tomaugspurger.github.io/modern-8-scaling post.

I have a script that takes the original EPA CEMS zipfiles (all 12,000 of them) and creates one Parquet file per state for each year, with each year (1995-2017) having its own directory. The script takes about an hour to run on my laptop, and the resulting Parquet dataset takes up about 7GB on disk. I've been able to load it into a dask dataframe and do some simple calculations. However, it also ends up using all memory and getting killed by the OS. The Parquet files on disk are between 300KB and 30MB in size (depending on what state they represent). The data uncompressed is ~10x larger. With 24GB of memory I wouldn't think that would be a problem?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/489#issuecomment-441404017, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszBWN2a1pOWn5KchpJgZmNvOpIEIfks5uydlfgaJpZM4YpUTj .

zaneselvans commented 5 years ago

At first I didn't realize that the results of the compute() step would be a pandas dataframe, rather than another dask dataframe, so it was way too large. I will work on getting a notebook together that does something interesting on a small subset of the data, pulling it in from parquet, and then maybe we can get it on the Pangeo instance and see what it's like to try and make it work for all of the data at once via Binder?

mrocklin commented 5 years ago

You may want persist(), which does what you thought compute() did.

On Tue, Nov 27, 2018 at 7:24 AM Zane Selvans notifications@github.com wrote:

At first I didn't realize that the results of the compute() step would be a pandas dataframe, rather than another dask dataframe, so it was way too large. I will work on getting a notebook together that does something interesting on a small subset of the data, pulling it in from parquet, and then maybe we can get it on the Pangeo instance and see what it's like to try and make it work for all of the data at once via Binder?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/489#issuecomment-442040044, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszE3EdnFup_H7Im2tzH_8hmCsez3Vks5uzS8IgaJpZM4YpUTj .

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

zaneselvans commented 5 years ago

We would still like to get a Pangeo instance up and running for energy data work, but have been working on getting the datasets packaged up appropriately (and, uh, grant writing) so haven't made a bunch of progress lately. Since the pangeo.pydata.org instance has been retired, what's the best way to play with / test things going forward?

jhamman commented 5 years ago

Sounds good @zaneselvans - let's keep this conversation going. For now, binder.pangeo.io is a good resource for ephemeral jupyter sessions. In the coming weeks, we'll be working on a new system of jupyter hubs that will more easily support use cases like yours.

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

zaneselvans commented 5 years ago

My god everything is going soooo slowly. We've gotten good feedback on the Sloan grant (thank you @jhamman for your letter!) so hopefully we will be moving forward with this full time soon.

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 5 years ago

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.