pangeo-data / pangeo

Pangeo website + discussion of general issues related to the project.
http://pangeo.io
703 stars 189 forks source link

Hello Astronomy #255

Closed mrocklin closed 5 years ago

mrocklin commented 6 years ago

There seems to be some interest in these tools from within the Python for Astronomy community. In the past couple weeks there have been a few productive conversations with the following people in the community:

I'm not sure if everyone knows of each others' interest. I thought I'd raise an issue for general discussion here. Alternatively, it may also be better to raise an issue on the AstroPy tracker that is more likely to be visible to the astronomy community.

jhamman commented 6 years ago

Hello astronomy folks! I'm interested to hear where we overlap in our current development work. For many of us, we use/develop a software stack that looks something like:

With the exception of xarray, I suspect there is a fair bit of overlap with your tools but I'm not sure of that.

SimonKrughoff commented 6 years ago

@jhamman I've just started to look into things, but my notional stack is:

I just started playing with dask last week, so I'm super new to it, but I'm excited to learn about dask_kubernetes. One of the problems I was trying to figure out is how to let users spin up on demand dask clusters.

I am trying to use the above stack to do interactive visualizations of the Gaia dataset. It has about 1.7 billion points, so will not fit in memory on most machines, which is an interesting scale.

mrocklin commented 6 years ago

@SimonKrughoff do you have any interest in uploading your Parquet files of the Gaia dataset to Google Cloud Storage (GCS)? That might make it easier for us all to play around with it. I suspect that storage costs here could be supported by the pangeo effort during this period.

SimonKrughoff commented 6 years ago

The full dataset is around 500GB. I didn't make the files, so I'll have to check with the people who made them to make sure they are ok with me sharing them. I don't think it will be a problem.

rabernat commented 6 years ago

Sure, no problem if we want to put this in a Pangeo bucket.

sjperkins commented 6 years ago

Also, hello from Radio Astronomy and more specifically the MeerKAT telescope, which in turn is a precursor to the larger Square Kilometre Array telescope.

Our katdal access layer, written by @ludwigschwardt, exposes observational data as dask arrays. Additionally, we're putting together Codex Africanus which aims to expose radio astronomy algorithms via both NumPy and Dask interfaces.

A brief overview of our stack might be:

rabernat commented 6 years ago

@sjperkins this sounds very cool! Yours is the first big astronomy project I'm aware of that uses xarray.

I hope that, by collecting as many such use cases as possible, we can identify certain usage patterns and needs that are common across many scientific fields. Large array storage is emerging as one such need. It has been informative to learn how various projects handle this.

stale[bot] commented 6 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 6 years ago

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

SimonKrughoff commented 6 years ago

@rabernat moving from gitter to here, so we have it in one place. We had talked about parquet files before for contributing Gaia data. It is much easier for me to give you the CSVs (they are publicly downloadable). What is the process for doing that?

rabernat commented 6 years ago

The process is, we give you storage admin privileges on our project, you run gsutil cp data-*.csv gs://pangeo-data/LSST (or whatever path you feel is appropriate).

I would actually be surprised if Gaia DR2 is not already in cloud storage somewhere.

martindurant commented 6 years ago

DR1 is certainly available over HTTP as CSV, FITS and VOtable. Note that dask's HTTP file-system backend will soon gain rudimentary glob/ls support.

SimonKrughoff commented 6 years ago

OK, so the source files are available here. They are ~550GB gzipped. I have not found them on GCS. I think we would probably want to add some of the other catalogs too: radial velocities, light curves, and variability classifications come to mind, but the source catalog is the largest, I think.

Would it be better to store these data uncompressed? I have done a spot check and got ~2x compression, so not the 2TB I mentioned on gitter, but still big enough to be interesting.

mrocklin commented 6 years ago

We might also consider storing them as Parquet files. I suspect that they will be much more efficient to access in this format.

On Thu, Aug 30, 2018 at 3:01 PM, Simon Krughoff notifications@github.com wrote:

OK, so the source files are available here http://cdn.gea.esac.esa.int/Gaia/gdr2/gaia_source/csv/. They are ~550GB gzipped. I have not found them on GCS. I think we would probably want to add some of the other catalogs too: radial velocities, light curves, and variability classifications come to mind, but the source catalog is the largest, I think.

Would it be better to store these data uncompressed?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/255#issuecomment-417431212, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszIY1SbOd-MMt-f-3YusHHKZpmIauks5uWDaQgaJpZM4T-gu- .

SimonKrughoff commented 6 years ago

@mrocklin that shouldn't be hard to do as long as I'm not doing any smart sharding.

mrocklin commented 6 years ago

I suspect that @martindurant would have thoughts on the most efficient way to store this data in Parquet format.

On Thu, Aug 30, 2018 at 3:04 PM, Simon Krughoff notifications@github.com wrote:

@mrocklin https://github.com/mrocklin that shouldn't be hard to do as long as I'm not doing any smart sharding.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/255#issuecomment-417432116, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszBHDffA4GDoH03iiwmslczK_VP_Oks5uWDdIgaJpZM4T-gu- .

martindurant commented 6 years ago

To answer the first specific question: storing as compressed CSV is fine, but it means no random-access within a file, and so you get one dask partition per file. The files only appear to be 5-10MB, so that's fine (but they are many in number). In fact, you would generally want much larger individual files, since there is a fixed overhead of operating on each, both in initiating the download and running any processing, so it would be better to have files big enough to mitigate that. For compute and storage both on google, we recommend >~100MB. It is not unusual to see individual uncompressed CSV files >10GB.

The majority of the columns are float, and so probably will not store particularly efficiently in any format. NULL values are generally very efficiently stored, and that might account for a good fraction of the values here. Naturally, int32 and float32 take up less space than their 64-bit counterparts, and whether that is appropriate for each column depends on the data. Parquet allows you to selectively compress different columns with different algorithms, but this is rarely done. Parquet would offer the advantage of min/max on columns within each chunk, so potentially enable short-circuits when searching for a range of parameters, and internal chunks to allow for a tradeoff between compression efficiency and random-access.

mrocklin commented 6 years ago

For completely random data parquet stores it in about half the space (eight bytes rather than sixteen ASCII digits), though presumably compression would help to narrow this difference. I think that the main advantage here is in performant access, not in storage size though.

In [1]: import pandas as pd

In [2]: import numpy as np

In [3]: x = np.random.random((100000, 100))

In [4]: df = pd.DataFrame(x)

In [5]: df.columns = list(map(str, df.columns))

In [6]: df.to_csv('foo.csv')

In [7]: !du -hs foo.csv
185M    foo.csv

In [8]: df.to_parquet('foo.parquet')

In [9]: !du -hs foo.csv
185M    foo.csv

In [10]: !du -hs foo.parquet
98M foo.parquet

In [11]: %time _ = pd.read_csv('foo.csv')
CPU times: user 2.8 s, sys: 128 ms, total: 2.93 s
Wall time: 2.93 s

In [12]: %time _ = pd.read_parquet('foo.parquet')
CPU times: user 90.2 ms, sys: 68.2 ms, total: 158 ms
Wall time: 157 ms

In [13]: %time _ = pd.read_parquet('foo.parquet', columns=['2', '10', '50'])
CPU times: user 28.9 ms, sys: 4.04 ms, total: 33 ms
Wall time: 32 ms

That's unfair though because I'm operating on my local disk, which has GB bandwidth, rather than GCS, which will be closer to 200MB/s, which is still faster than the read_csv bandwidth, but not an order of magnitude.

martindurant commented 6 years ago

(I don't think most of the floats really have the full precision of 16 characters) But yes, agree very much that reading binary blocks of float/int data will be much faster than parsing text in CSV.

Naively loading and saving to parquet for the very first file produced the following sizes: csv: 12,415,259 csv.gz: 5,350,635 parquet(uncomp): 7,456,886 parquet(snappy): 6,329,340 parquet(gzip): 5,430,924

where the same compression is applied to all columns, and the footer metadata size ~19k for all versions.

Note that some columns like phot_variability_flag will lend themselves to dictionary storage, which will save a lot.

rabernat commented 6 years ago

My $0.02. If you’re going to go to the trouble of putting your data in cloud storage, you should really use a cloud optimized format like parquet. The performance benefits can be huge.

Sent from my iPhone

On Aug 30, 2018, at 4:12 PM, Martin Durant notifications@github.com wrote:

(I don't think most of the floats really have the full precision of 16 characters) But yes, agree very much that reading binary blocks of float/int data will be much faster than parsing text in CSV.

Naively loading and saving to parquet for the very first file produced the following sizes: csv: 12,415,259 csv.gz: 5,350,635 parquet(uncomp): 7,456,886 parquet(snappy): 6,329,340 parquet(gzip): 5,430,924

where the same compression is applied to all columns, and the footer metadata size ~19k for all versions.

Note that some columns like phot_variability_flag will lend themselves to dictionary storage, which will save a lot.

— You are receiving this because you modified the open/close state. Reply to this email directly, view it on GitHub, or mute the thread.

mrocklin commented 6 years ago

Yeah, I think that really the question is "is the astronomy community comfortable enough with binary tabular formats to use something modern, or is it better to stay with CSV for accessibility and cultural reasons". Both are fine options. Non-technical concerns often trump technical ones.

On Thu, Aug 30, 2018 at 4:21 PM, Ryan Abernathey notifications@github.com wrote:

My $0.02. If you’re going to go to the trouble of putting your data in cloud storage, you should really use a cloud optimized format like parquet. The performance benefits can be huge.

Sent from my iPhone

On Aug 30, 2018, at 4:12 PM, Martin Durant notifications@github.com wrote:

(I don't think most of the floats really have the full precision of 16 characters) But yes, agree very much that reading binary blocks of float/int data will be much faster than parsing text in CSV.

Naively loading and saving to parquet for the very first file produced the following sizes: csv: 12,415,259 csv.gz: 5,350,635 parquet(uncomp): 7,456,886 parquet(snappy): 6,329,340 parquet(gzip): 5,430,924

where the same compression is applied to all columns, and the footer metadata size ~19k for all versions.

Note that some columns like phot_variability_flag will lend themselves to dictionary storage, which will save a lot.

— You are receiving this because you modified the open/close state.

Reply to this email directly, view it on GitHub, or mute the thread.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/255#issuecomment-417454352, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszCx69P6b0sM7JoCzJ_UpQPKw3b8iks5uWEkxgaJpZM4T-gu- .

SimonKrughoff commented 6 years ago

You've all convinced me. I'm currently producing parquet files from the CSVs. Once I have one parquet file per CSV, I'll look into merging them into larger files.

I think the astronomy community is quickly realizing that adoption of industry standards is a way bigger payoff than maintaining legacy utilities and legacy file formats (though FITS continues to be a mainstay).

martindurant commented 6 years ago

In case you were curious, since the Gaia DR1 products are available in uncompressed binary FITS tables, I converted one of these to parquet and found that the size was half and the read speed more than double, again without any care in the conversion, and on my local disc. A good deal of the slowness comes from converting big- to little-endian...

mrocklin commented 6 years ago

FWIW I would probably transform this data in the following way:

import dask.dataframe as dd
df = dd.read_csv('path/to/*.csv.gz', compression='gzip')
df = df.repartition(npartitions=...)  # Optional: if we wanted to change the partition size
df.to_parquet('path/to/myfile.parquet')

Doing this in one step might be nice to provide a consistent single Parquet dataset, even if it is made up of many files.

SimonKrughoff commented 6 years ago

I'm doing basically that. I used pandas.read_csv instead of dask.dataframe.read_csv.

martindurant commented 6 years ago

The suggestion for using the dask variety, is that you can repartition multiple original files into bigger chunks in parquet, because that will lead to better performance on read later on.

SimonKrughoff commented 6 years ago

Oh, OK. I hadn't realized that. I'll look into that.

SimonKrughoff commented 6 years ago

I just finished bundling up the Gaia DR2 data. It turns out that dealing with 60K files is a major pain. I should have made bigger files, but since I have these, I thought I'd put them here if someone wants to grab them. I've mirrored the Gaia archive directory structure from here. I'm making available three versions of these files:

I hope these can be made to be, at least minimally, useful. I'll start looking into how to merge the files so they are better suited for cloud hosting.

rabernat commented 6 years ago

Excellent. Would you like us to grant you credentials to our GCS storage so you can start uploading and playing around?

martindurant commented 6 years ago

All three links appear to be single massive files with everything (553, 690, 679GB, resp.). That doesn't allow for parallel access-in-place.

rabernat commented 6 years ago

If the goal is to read these items as a single dataset, you will want many fewer shards than 60K. For example, for dask to read them with dask.dataframe.read_parquet, I am pretty sure there will have to be an http call for each file. Even with low latency, 60K really adds up.

Maybe you actually want to put them all in a single parquet file?

rabernat commented 6 years ago

I actually don't know enough about parquet to be making recommendations here, so probably disregard what I said.

SimonKrughoff commented 6 years ago

@martindurant sorry, I should have been more explicit. Each of the three files is a tarball of a complete copy of the Gaia DR2. Inside that there are many individual datasets (each containing lots of parquet files). I agree 60K files is too many. I'll spend some time this week trying to reduce the shear number of files in each dataset.

Even thought the dataset containing the source measurements is big, I'm hoping some of the other, smaller datasets will be of use.

P.S. I gzipped the tarball which was a complete waste of time since all the files are compressed anyway. I can make another version that doesn't have this overhead.

martindurant commented 6 years ago

Each of the three files is a tarball of a complete copy of the Gaia DR2.

Yes indeed - my point is, that this is not how it should be saved to the cloud, since the only way to work with it would be to download and extract the whole thing locally.

It is surprising that the parquet version ends up larger than the CSV-gzip one given the discussion above. As discussed above, parquet enables you to make optimizations such as categorizing columns (e.g., phot_variable_flag, which is almost always NOT_AVAILABLE) or choosing data types appropriate for the column (float64 -> float32 or int-with-nulls or bool-with-nulls; int64 -> int32) and it's worth putting in the effort beforehand to get those things right. Of course, you are in the right place to know what precision should be used for each column.

mrocklin commented 6 years ago

@martindurant my guess is that your expertise would be welcome in determining how best to store this data in parquet format. Would you be willing to download a small subset of the CSV data and then write a Python script that converts it into efficiently encoded Parquet data? I suspect that @SimonKrughoff could use some help here.

mrocklin commented 6 years ago

You could then pass that script back to @SimonKrughoff who could apply it to the full dataset and then upload to GCS with the credentials that @rabernat intends to provide

martindurant commented 6 years ago

Is there a prescription of the precision expected in each column? I probably should not guess from the number of digits in the CSV files.

But yes, happy to help out, I can try to get something together in the next day or so.

SimonKrughoff commented 6 years ago

I wasn't intending that those tarballs would be used directly. I was just trying to make the files available in an easy to download form so that someone could help me put them in GCS in an unpacked form (e.g. tar zxvf gaia_dr2_parquet.tar.gz | upload_to_gsc.sh or something). I don't have creds to do that, but would be happy to put them someplace directly.

Indeed I could use some help with the parquet generation. As to precision, frequently single precision is fine, but for some things like RA/Dec positions, we know we need double precision. The safest thing is to treat all columns as double precision, but I could try to help discern what the appropriate precision is.

martindurant commented 6 years ago

The following is a script that I came up with, turning the first 100 files into 10 partitions of sizes 27-57MB. You would want to scale this up to the 61,237 files in the full dataset and maybe 2000 partitions. Note that, given the right GCS credentials, you can .to_parquet('gcs://place/Gaia2.parq') and include your credentials in somewhere that can be automatically found or in the storage_options= kwarg.

import requests
import dask.dataframe as dd
import re

cats = ['solution_id', 'ref_epoch', 'frame_rotator_object_type',
        'phot_proc_mode', 'phot_variable_flag']

float32 = ['parallax_over_error', 'ra_dec_corr', 'ra_parallax_corr',
           'ra_pmra_corr', 'ra_pmdec_corr', 'dec_parallax_corr',
           'dec_pmra_corr', 'dec_pmdec_corr', 'parallax_pmra_corr',
           'parallax_pmdec_corr', 'pmra_pmdec_corr', 'astrometric_gof_al',
           'astrometric_chi2_al', 'astrometric_weight_al',
           'mean_varpi_factor_al', 'astrometric_sigma5d_max',
           'phot_g_mean_flux_over_error', 'phot_g_mean_mag',
           'phot_bp_mean_flux_over_error', 'phot_bp_mean_mag',
           'phot_rp_mean_flux_over_error', 'phot_rp_mean_mag',
           'phot_bp_rp_excess_factor', 'bp_rp', 'bp_g', 'g_rp',
           'rv_template_teff', 'rv_template_logg', 'rv_template_fe_h',
           'teff_val', 'teff_percentile_lower', 'teff_percentile_upper',
           'a_g_val', 'a_g_percentile_lower', 'a_g_percentile_upper',
           'e_bp_min_rp_val', 'e_bp_min_rp_percentile_lower',
           'e_bp_min_rp_percentile_upper', 'radius_val',
           'radius_percentile_lower', 'radius_percentile_upper',
           'lum_val', 'lum_percentile_lower', 'lum_percentile_upper']

int32 = ['astrometric_n_obs_al', 'astrometric_n_obs_ac',
         'astrometric_n_good_obs_al', 'astrometric_n_bad_obs_al',
         'astrometric_params_solved', 'astrometric_matched_observations',
         'visibility_periods_used', 'frame_rotator_object_type',
         'matched_observations', 'phot_g_n_obs', 'phot_bp_n_obs',
         'phot_rp_n_obs', 'rv_nb_transits']

dtype = {k: 'category' for k in cats}
dtype.update({k: 'float32' for k in float32})
dtype.update({k: 'int32' for k in int32})

url = "http://cdn.gea.esac.esa.int/Gaia/gdr2/gaia_source/csv/"

def ls_http(url):
    ex = re.compile(r"""<a\s+(?:[^>]*?\s+)?href=(["'])(.*?)\1""")
    r = requests.get(url)
    links = ex.findall(r.text)
    out = set()
    for u, l in links:
        if l.startswith('http'):
            if l.startswith(url):
                out.add(l)
        else:
            if l not in ['..', '../']:
                # Ignore FTP-like "parent"
                out.add('/'.join([url.rstrip('/'), l]))
    return list(sorted(out))

all_files = ls_http(url)[:100]

df = dd.read_csv(all_files, dtype=dtype, compression='gzip', blocksize=None)
d2 = df.repartition(npartitions=10, force=True)
d2.to_parquet('Gaia.parq', compression='SNAPPY')
SimonKrughoff commented 6 years ago

@martindurant Thanks for the starter script. That is very useful. I'm now trying to build new parquet files in larger chunks. I'd like to do this in multiple processes. Can I merge the parquet datasets as a post-processing step? Also, I don't have SNAPPY in the environment I'm using. Is GZIP going to be ok?

martindurant commented 6 years ago

I'd like to do this in multiple processes.

Since this is using Dask, you already have parallelism. In theory, you could even set the job up on a distributed cluster, if all the nodes have direct access to the data, e.g., on a NFS mount. Probably you will be fine here using the standard threaded scheduler.

Is GZIP going to be ok?

It takes somewhat more CPU power to decompress, and in this case results in only slightly smaller file-sizes; but yes, it's fine.

mrocklin commented 6 years ago

Can I merge the parquet datasets as a post-processing step?

Note that the repartition step in @martindurant 's script may already give you what you want here

Also, I don't have SNAPPY in the environment I'm using. Is GZIP going to be ok?

You might consider installing snappy or some other efficient compression system. It makes a pretty significant difference.

I'd like to do this in multiple processes.

dask.config.set(scheduler='processes')
SimonKrughoff commented 6 years ago

@martindurant @mrocklin Thanks both for the help! This is amazing. I did as you suggested and merged the csvs into a smaller number of files and I'm using a cluster to repartition. image

rabernat commented 6 years ago

I think Simon has seen the light. 💡

Sent from my iPhone

On Sep 21, 2018, at 5:30 PM, Simon Krughoff notifications@github.com wrote:

@martindurant @mrocklin Thanks both for the help! This is amazing. I did as you suggested and merged the csvs into a smaller number of files and I'm using a cluster to repartition.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

SimonKrughoff commented 6 years ago

I certainly have, unfortunately, my workers ran out of memory and were killed (I assume by the k8s OOM culler). What is the best way to avoid that. Is it the size of the CSV files that drives the memory need, or can I get around it by setting a blocksize?

mrocklin commented 6 years ago

I noticed that things were under pressure from the dashboard (memory plot was orange, also some orange read/write blocks in the task stream). It looks like things were being released well from memory though during processing, which was a confusing coincidence (the colored sections of the progress bars were mostly transparent, indicating released results).

I might check the info tab and then click on one of the workers to get a sense for how large your individual chunks of data are relative to the memory limit of each worker. If they're a significant fraction each (something like 10%) then you might consider widening this gap by allocating workers with higher memory limits (you would have to edit the worker-template.yml file in your session to increase worker resource limits for memory). It could be something else though, that's just a guess.

On Sat, Sep 22, 2018 at 2:28 PM, Simon Krughoff notifications@github.com wrote:

I certainly have, unfortunately, my workers ran out of memory and were killed (I assume by the k8s OOM culler). What is the best way to avoid that. Is it the size of the CSV files that drives the memory need, or can I get around it by setting a blocksize?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pangeo-data/pangeo/issues/255#issuecomment-423763876, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszA7S30lDwVAVRzPbPXzxZioszwcVks5udoE2gaJpZM4T-gu- .

SimonKrughoff commented 6 years ago

I upped my worker memory limits to 12GB, and I am still getting memory pressure. Is there something I can do to reduce the memory usage even if it's a little less efficient?

mrocklin commented 6 years ago

Is there something I can do to reduce the memory usage even if it's a little less efficient?

It would depend on what's going on. In your situation I would investigate the dashboard more heavily to determine what's going on (docs here), and then make some change based on those results. I'm busy the next couple days, but would be happy to jump on a screencast later this week if it's still helpful (others are also probably qualified to do this sooner).

SimonKrughoff commented 6 years ago

Thanks for the pointer. I'll take a look.

SimonKrughoff commented 6 years ago

Thanks for the pointer. That video is really useful. First off, the logs look dire. Lots of warnings and errors like the following:

distributed.worker - ERROR - failed during get data with tcp://10.32.9.20:45501 -> tcp://10.34.12.121:42698 Traceback (most recent call last): File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/comm/tcp.py", line 242, in write yield future File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() tornado.iostream.StreamClosedError: Stream is closed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/worker.py", line 690, in get_data compressed = yield comm.write(msg, serializers=serializers) File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(*exc_info) File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/comm/tcp.py", line 246, in write convert_stream_closed_error(self, e) File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc)) distributed.comm.core.CommClosedError: in <closed TCP>: TimeoutError: [Errno 110] Connection timed out
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.15 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.55 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.57 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.61 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.65 GB -- Worker memory limit: 6.44 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 4.67 GB -- Worker memory limit: 6.44 GB
distributed.worker - ERROR - failed during get data with tcp://10.32.9.20:45501 -> tcp://10.38.9.171:38952 Traceback (most recent call last): File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/comm/tcp.py", line 242, in write yield future File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() tornado.iostream.StreamClosedError: Stream is closed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/worker.py", line 690, in get_data compressed = yield comm.write(msg, serializers=serializers) File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run value = future.result() File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run yielded = self.gen.throw(*exc_info) File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/comm/tcp.py", line 246, in write convert_stream_closed_error(self, e) File "/opt/lsst/software/stack/python/miniconda3-4.5.4/envs/lsst-scipipe-fcd27eb/lib/python3.6/site-packages/distributed/comm/tcp.py", line 124, in convert_stream_closed_error raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc)) distributed.comm.core.CommClosedError: in <closed TCP>: TimeoutError: [Errno 110] Connection timed out

I'm assuming that is from saturating the underlying shared filesystem.

The other thing I notice is that the from_delayed function has a lot of tasks in memory. That seems bad. Here is a screenshot while the job is running: image

So it seems like I may actually be dealing with filesystem/configuration issues.