innosat-mats / MATS-L1-processing

Python code for calibrating MATS images
MIT License
0 stars 1 forks source link

Changes in the read_parquette_functions.py #138

Closed donal-mur closed 1 year ago

donal-mur commented 1 year ago

📋 Background

As we know the reading of data takes time and sometime/often times out A while ago Björn and I make som changes so that we could filter already at the dataset stage to minimise the data transfer - This filter seems to have been moved in to the .to_table function which I suspect means that all the data is first transferred to my machine and then filtered. Even the date information is in the filter.

table = ds.dataset(
    path,
    filesystem=filesystem,
).to_table(filter=filterlist)

❓ Questions

@skymandr , @OleMartinChristensen

donal-mur commented 1 year ago

Sorry I check my old code the syntax was different but we were doing. the filtering in the to_table part. So It has not been changed just rearranged

The question remains where is the data selection done, at the source or in out machines?

OleMartinChristensen commented 1 year ago

the commit 322af9d769d9778db3f44531d7601d15deec559c was the last change and as you said it looks like the filtering was always done in to_table..

OleMartinChristensen commented 1 year ago

The issue now (if I understand it) is that you anyways need to sort through all the dataset in order to do the filtering.

donal-mur commented 1 year ago

yes I see that Still wondering why things are so slow. Was nearly impossible to load data from the cottage - have only 10Mb där

donal-mur commented 1 year ago

Beginging to sound like we need a fast database to find the files --Parquett is too slow @joakimmoller

skymandr commented 1 year ago

Beginging to sound like we need a fast database to find the files --Parquett is too slow @joakimmoller

I don't think all data is transferred, only the columns used in the filter, but it is correct that you do need to open all files and that the compare is performed locally, with the look-ups you currently do.

The core of the matter isn't that Parquet is too slow, however, but that you don't use the "index" (or "partitioning"). Any remote database will be slow if you don't use the index when making look-ups.

This "index" is hierarchical based on TMHeaderTime and is put in the path. PyArrow has support for reading and filtering based on the partitioning (see example in the L1A-repository), or you can do a "poor mans filter" by changing the path you read from to restrict the data access.

Two things come to mind:

My suggestion is therefore:

EDIT: The more I think about it, the more I think it is better to leave the partitioning as it is. It is fairly straight-forward to implement a simple filter based on that, by comparing year * 10000 + month * 100 + day which makes 2023-04-11 into 20230411 (or possibly year * 1000000 + month * 10000 + day * 100 + hour, making e.g. 2023041115). A simple buffer on min_time could be added to compensate for the fact that EXPDate will occur before the corresponding TMHeaderTime. Thanks to @joakimmoller for pointing this out, and see his example below!

OleMartinChristensen commented 1 year ago

I will implement the poor mans date filter you suggested @skymandr today. That should solve the immediate issues. Then we can have a discussion on this some time in the future.

joakimmoller commented 1 year ago

When not actively using the partitions pyarrow has to do lookups on every file in the full data sets, not actually reading all data but investigate if records in your filter can be found in a particular parquetfile. (Lots of lookups, this is time consuming and costly)

By using the partitions we limit the lookups:

import pyarrow as pa
from pyarrow import fs
import pyarrow.dataset as ds
import boto3
import pandas as pd

start = pd.to_datetime("2022-12-06T00:00z").to_pydatetime()
stop = pd.to_datetime("2022-12-07T00:00z").to_pydatetime()

session = boto3.session.Session(profile_name="mats")
credentials = session.get_credentials()

s3 = fs.S3FileSystem(
    secret_key=credentials.secret_key,
    access_key=credentials.access_key,
    region=session.region_name,
    session_token=credentials.token,
)

partitioning = ds.partitioning(
    schema=pa.schema(
        [
            ("year", pa.int16()),
            ("month", pa.int8()),
            ("day", pa.int8()),
            ("hour", pa.int8()),
        ]
    ),
)

dataset = ds.dataset(
    "ops-payload-level0-v0.3/CCD/",
    partitioning=partitioning,
    filesystem=s3,
)

partition_filter = (
    ds.field("year") * 1000000
    + ds.field("month") * 10000
    + ds.field("day") * 100
    + ds.field("hour")
    >= start.year * 1000000
    + start.month * 10000
    + start.day * 100
    + start.hour
) & (
    ds.field("year") * 1000000
    + ds.field("month") * 10000
    + ds.field("day") * 100
    + ds.field("hour")
    <= stop.year * 1000000
    + stop.month * 10000
    + stop.day * 100
    + start.hour
)

table = dataset.to_table(
    filter=partition_filter & (
        ds.field("EXPDate")
        >= start
    )
    & (
        ds.field("EXPDate")
        <= stop
    )
)

Retrieving the table-data in this example needs 2.5 minutes with partitioning and more than 15 minutes without - didn't have the patience to wait longer.

EDIT: The script without partitioning executed for 75 min and exited with a Exception "OSError: AWS Error NETWORK_CONNECTION during GetObject operation: curlCode: 28, Timeout was reached". I.e. more than 30x speedup with partitioning. 🚀

joakimmoller commented 1 year ago

If this this isn't fast enough for larger tasks - lets say hourly temperature averages for the whole mission. I'd recommend python DASK (https://www.dask.org/get-started.)

In short that means you run several computing nodes on a dask cluster (ec2-instances in AWS) that run tasks in parallell to solve your data extraction. The cost is the the total CPU-time for running the cluster.

OleMartinChristensen commented 1 year ago

Today I got this error before even collecting data... But the error is spurrious and not reproducable

path = "ops-payload-level1a-v0.5"
s3 = fs.S3FileSystem(
        secret_key=credentials.secret_key,
        access_key=credentials.access_key,
        region=session.region_name,
        session_token=credentials.token)

filesystem = s3

partitioning = ds.partitioning(
        schema=pa.schema(
            [
                ("year", pa.int16()),
                ("month", pa.int8()),
                ("day", pa.int8()),
                ("hour", pa.int8()),
            ]
        ),
    )
dataset = ds.dataset(
        path,
        filesystem=filesystem,
        partitioning=partitioning,
    )

Traceback (most recent call last):
  File "/home/olemar/Projects/Universitetet/MATS/MATS-analysis/OleM/Donal_stuff/TheMoon.py", line 36, in <module>
    ccd_data = read_MATS_data(starttime, endtime,filter=None,level='1a',version='0.5')
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/olemar/Projects/Universitetet/MATS/MATS-utility-functions/src/mats_utils/rawdata/read_data.py", line 25, in read_MATS_data
    ccd_data = read_ccd_data_in_interval(start_date, end_date, f"ops-payload-level{level}-v{version}", s3,filter=filter)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/olemar/Projects/Universitetet/MATS/MATS-L1-processing/src/mats_l1_processing/read_parquet_functions.py", line 232, in read_ccd_data_in_interval
    dataset = ds.dataset(
              ^^^^^^^^^^^
  File "/home/olemar/miniconda3/envs/instrument_analysis/lib/python3.11/site-packages/pyarrow/dataset.py", line 752, in dataset
    return _filesystem_dataset(source, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/olemar/miniconda3/envs/instrument_analysis/lib/python3.11/site-packages/pyarrow/dataset.py", line 444, in _filesystem_dataset
    fs, paths_or_selector = _ensure_single_source(source, filesystem)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/olemar/miniconda3/envs/instrument_analysis/lib/python3.11/site-packages/pyarrow/dataset.py", line 411, in _ensure_single_source
    file_info = filesystem.get_file_info(path)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: When getting information for bucket 'ops-payload-level1a-v0.5': AWS Error NETWORK_CONNECTION during HeadBucket operation: curlCode: 28, Timeout was reached
skymandr commented 1 year ago

Today I got this error before even collecting data... But the error is spurrious and not reproducable

I can't be 100% sure, but think this is due to latency between client and server, rather than within the server. It may be due to your network connection, but might also mean that the server is too busy (though I would perhaps have expected a different error in the latter case; the HeadBucket request is by its nature small, it basically just confirms that the bucket you want to request exists).

OleMartinChristensen commented 1 year ago

Seems to happen more often when I run in debug mode..

skymandr commented 1 year ago

I cannot reproduce this locally, but I have only tried it in vanilla iPython, no debug features or such. These are my results:

In [32]: %timeit ds.dataset(path, filesystem=filesystem, partitioning=partitioning)
10.3 s ± 181 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

What the line does is download the "index" and schema for the dataset, which takes a few seconds on my connection, since it is fairly large, but I don't get any timeouts. I'm not sure of the inner workings of this function in PyArrow, but I would expect HeadBucket to be one of the first things the program does, and that should not not be a large or complicated request, so not sure why you should get a timeout there, unless you are doing it over a mobile network from a moving train in Norrland or something... (You're not, are you?)

skymandr commented 1 year ago

As I said, I think the problem is client side, but you can increase the timeouts when you initialise the S3-connection. See request_timeout and connect_timeout here: https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html