fugue-project / tutorials

Tutorials for Fugue - A unified interface for distributed computing. Fugue executes SQL, Python, and Pandas code on Spark and Dask without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
111 stars 19 forks source link

Recipe for loading multiple files #170

Closed rdmolony closed 1 year ago

rdmolony commented 1 year ago

Hey @kvnkho

I'm back using fugue again. I was wondering what the canonical fugue method is for loading multiple csvs into fugue? I can write this up as a recipe after if you like

I have a directory of csvs that I want to load, where the header row is on the 2nd line & data is on the 5th

EDIT: I've experimented with a few different methods, in example 4 I explicitely infer the schema & pass this to FugueWorkflow().load ...

import linecache
import tempfile
import typing
import pathlib

from fugue import FugueWorkflow
import pandas as pd
import pyarrow as pa

# --- setup ---
def create_temporary_file(prefix: str, suffix: str) -> pathlib.Path:
    return pathlib.Path(
        tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, delete=False).name
    )

sample_logger_1 = create_temporary_file(prefix="campbell_scientific_", suffix=".csv")
sample_logger_1.write_text(
'''"SITE_NAME"
"TIMESTAMP","RECORD","WS_80m_90deg_Avg","WS_80m_90deg_Std","WS_80m_90deg_3sGust_Max","WS_80m_90deg_Max","WS_80m_270deg_Avg","WS_80m_270deg_Std","WS_80m_270deg_3sGust_Max","WS_80m_270deg_Max","WS_65m_90deg_Avg","WS_65m_90deg_Std","WS_65m_90deg_3sGust_Max","WS_65m_90deg_Max","WS_65m_270deg_Avg","WS_65m_270deg_Std","WS_65m_270deg_3sGust_Max","WS_65m_270deg_Max","WS_50m_90deg_Avg","WS_50m_90deg_Std","WS_50m_90deg_3sGust_Max","WS_50m_90deg_Max","WS_50m_270deg_Avg","WS_50m_270deg_Std","WS_50m_270deg_3sGust_Max","WS_50m_270deg_Max","WS_30m_90deg_Avg","WS_30m_90deg_Std","WS_30m_90deg_3sGust_Max","WS_30m_90deg_Max","WS_30m_270deg_Avg","WS_30m_270deg_Std","WS_30m_270deg_3sGust_Max","WS_30m_270deg_Max","Dir_78m_90deg_avg","Dir_78m_90deg_std","Dir_63m_90deg_avg","Dir_63m_90deg_std","Dir_28m_90deg_avg","Dir_28m_90deg_std","Batt_Volt_Min","Press_Avg","Temp_C80_Avg","Temp_C15_Avg","Hum_Avg"
"TS","RN","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","Volts","mB","Deg C","Deg C","%"
"","","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","WVc","WVc","WVc","WVc","WVc","WVc","Min","Avg","Avg","Avg","Avg"
"2012-05-31 12:20:00",1,1.383,0.6,2.75,3.37,1.368,0.439,2.673,2.74,1.332,0.478,2.75,2.75,1.242,0.379,2.74,2.79,1.162,0.535,2.337,2.75,1.159,0.354,2.34,2.39,1.27,0.614,2.337,2.75,1.322,0.416,2.157,2.24,240.3,46,242,45.39,222,33.45,13.79,1009,13.84,14.08,65.67
"2012-05-31 12:30:00",2,1.183,0.449,1.923,2.13,1.135,0.324,1.94,1.99,0.948,0.524,1.923,2.13,1.068,0.303,1.723,1.74,0.701,0.547,1.923,2.13,0.913,0.308,1.673,1.74,0.771,0.539,1.717,2.13,0.997,0.28,1.657,1.74,282,26.79,264.3,30.25,278.5,62.87,13.73,1009,14.04,14.45,64.51
"2012-05-31 12:40:00",3,1.762,0.502,2.75,2.75,1.784,0.458,2.873,2.94,1.512,0.486,2.543,2.75,1.633,0.402,2.44,2.49,1.484,0.423,2.13,2.13,1.508,0.343,2.457,2.49,1.561,0.416,2.337,2.75,1.605,0.311,2.09,2.19,351.9,27.08,342.4,25.69,321.4,29.48,13.81,1009,14.14,14.35,63.45
"2012-05-31 12:50:00",4,1.926,0.434,2.957,3.37,1.957,0.405,2.923,2.99,1.871,0.446,2.75,2.75,1.883,0.384,2.84,2.94,1.826,0.511,3.163,3.37,1.833,0.441,3.173,3.24,1.762,0.489,3.163,3.37,1.762,0.432,2.99,3.09,15.54,18.22,15.95,18.5,20.55,27.99,13.82,1009,14.19,14.39,63.79
"2012-05-31 13:00:00",5,1.606,0.424,2.543,2.75,1.616,0.366,2.54,2.59,1.83,0.406,2.543,2.75,1.81,0.365,2.873,2.79,1.956,0.385,2.75,2.75,1.895,0.345,2.69,2.74,2.012,0.431,2.75,2.75,1.965,0.366,2.857,2.94,40,26.7,44.79,18.04,37.4,9.99,13.82,1009,14.27,14.43,62.13
'''
)

sample_logger_2 = create_temporary_file(prefix="campbell_scientific_", suffix=".csv")
sample_logger_2.write_text(
'''"SITENAME"
"TIMESTAMP","RECORD","WS_80m_90deg_Avg","WS_80m_90deg_Std","WS_80m_90deg_3sGust_Max","WS_80m_90deg_Max","WS_80m_270deg_Avg","WS_80m_270deg_Std","WS_80m_270deg_3sGust_Max","WS_80m_270deg_Max","WS_65m_90deg_Avg","WS_65m_90deg_Std","WS_65m_90deg_3sGust_Max","WS_65m_90deg_Max","WS_65m_270deg_Avg","WS_65m_270deg_Std","WS_65m_270deg_3sGust_Max","WS_65m_270deg_Max","WS_50m_90deg_Avg","WS_50m_90deg_Std","WS_50m_90deg_3sGust_Max","WS_50m_90deg_Max","WS_50m_270deg_Avg","WS_50m_270deg_Std","WS_50m_270deg_3sGust_Max","WS_50m_270deg_Max","WS_30m_90deg_Avg","WS_30m_90deg_Std","WS_30m_90deg_3sGust_Max","WS_30m_90deg_Max","WS_30m_270deg_Avg","WS_30m_270deg_Std","WS_30m_270deg_3sGust_Max","WS_30m_270deg_Max","Dir_78m_90deg_avg","Dir_78m_90deg_std","Dir_63m_90deg_avg","Dir_63m_90deg_std","Dir_28m_90deg_avg","Dir_28m_90deg_std","Batt_Volt_Min","Press_Avg","Temp_C80_Avg","Temp_C15_Avg","Hum_Avg"
"TS","RN","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","Volts","mB","Deg C","Deg C","%"
"","","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","WVc","WVc","WVc","WVc","WVc","WVc","Min","Avg","Avg","Avg","Avg"
"2012-05-31 13:10:00",6,1.11,0.315,1.51,1.51,1.082,0.166,1.473,1.49,1.144,0.355,1.717,2.13,1.115,0.231,1.673,1.74,0.944,0.565,1.923,2.13,0.969,0.381,1.79,1.84,0.98,0.48,1.717,2.13,1.041,0.301,1.657,1.69,128.2,50.79,115.9,50.08,70.53,18.46,13.82,1008,14.62,14.54,61.17
"2012-05-31 13:20:00",7,0.769,0.517,1.717,2.13,0.894,0.348,1.69,1.74,0.694,0.482,1.51,1.51,0.781,0.274,1.523,1.59,0.513,0.548,1.923,2.13,0.674,0.277,1.49,1.49,0.786,0.463,1.51,1.51,0.852,0.284,1.757,1.79,193.1,42,183.3,56.55,99.9,36.49,13.8,1008,14.75,15.04,60.12
"2012-05-31 13:30:00",8,0.347,0.439,1.097,1.51,0.522,0.249,1.09,1.09,0.431,0.47,1.303,1.51,0.606,0.2,1.273,1.29,0.661,0.434,1.303,1.51,0.715,0.231,1.44,1.44,1.063,0.333,1.51,1.51,1.058,0.201,1.507,1.54,231.6,73.03,313.8,54.89,54.87,12.13,13.8,1008,15.12,14.86,60.72
"2012-05-31 13:40:00",9,0.787,0.552,1.717,2.13,0.839,0.406,1.673,1.74,0.615,0.512,1.51,1.51,0.825,0.306,1.573,1.59,0.746,0.503,1.51,1.51,0.793,0.317,1.49,1.54,0.949,0.347,1.51,1.51,0.931,0.197,1.34,1.39,28.53,48.27,2.072,13.8,44.24,7.636,13.64,1008,15.29,14.82,59.53
"2012-05-31 13:50:00",10,1.163,0.396,2.13,2.13,1.184,0.278,2.04,2.09,1.156,0.412,1.923,2.13,1.161,0.246,2.123,2.14,1.065,0.475,2.13,2.13,1.129,0.326,2.09,2.14,1.044,0.421,1.923,2.13,1.075,0.301,1.723,1.74,24.75,19.63,28.06,19.57,31,22.96,13.54,1008,14.92,14.94,60.76
'''
)

csv_filepaths = [sample_logger_1, sample_logger_2]

# --- example 1 ---
# `fugue` doesn't treat `header` the same way as `pandas`
try:
    with FugueWorkflow() as dag:
        df = dag.load(str(csv_filepaths[0]), header=1)
except:
    pass

# --- example 2 ---
# this works!
def read_campbell_scientific_textfile(filepath: str) -> pd.DataFrame:
    headers = [c.strip('"') for c in linecache.getline(filepath, lineno=2).strip().split(",")]
    return pd.read_csv(filepath, names=headers, skiprows=5)

with FugueWorkflow() as dag:
    df = dag.create(
        read_campbell_scientific_textfile, params={"filepath": str(csv_filepaths[0])}
    )

# --- example 3 ---
# this also works!
def load_campbell_scientific_textfile_to_parquet(filepath: str) -> pd.DataFrame:
    headers = [c.strip('"') for c in linecache.getline(filepath, lineno=2).strip().split(",")]
    output = create_temporary_file(prefix="campbell_scientific_", suffix=".parquet")
    pd.read_csv(filepath, names=headers, skiprows=5, parse_dates=[0]).to_parquet(output)
    return output

parquet_filepaths = [
    load_campbell_scientific_textfile_to_parquet(str(filepath))
    for filepath in csv_filepaths
]
parquet_glob = pathlib.Path(tempfile.gettempdir()) / "campbell_scientific_*.parquet" 

with FugueWorkflow() as dag:
    df = dag.load(parquet_glob)
    df.show()

# --- example 4 ---

def infer_fugue_schema(
    filepath: str, # Path to a campbell scientific text file
) -> str: # A string associating column names with their corresponding data types (ex: 'TIMESTAMP:datetime, WS_80m_90deg_Avg:float') 
    headers = [c.strip('"') for c in linecache.getline(filepath, lineno=2).strip().split(",")]
    sample = pd.read_csv(filepath, names=headers, skiprows=5, parse_dates=[0], nrows=100)
    schema = pa.Table.from_pandas(sample).schema
    pyarrow_schema = {
        column_name: str(schema.field(column_name).type) for column_name in sample.columns
    }
    map_to_fugue = {"timestamp[ns]": "datetime"}
    return ",".join(
        column_name + ":" + map_to_fugue.get(dtype, dtype) for column_name, dtype in pyarrow_schema.items()
    )

inferred_schema = infer_fugue_schema(str(csv_filepaths[0]))
with FugueWorkflow() as dag:
    df = dag.load(csv_glob, columns=inferred_schema, skiprows=4)
    df.show()

# --- teardown ---
[fp.unlink() for fp in csv_filepaths]
[fp.unlink() for fp in parquet_filepaths]

I want to load multiple text files & persist the initial read as parquet, what would you recommend?

rdmolony commented 1 year ago

I reckon a recipe on this topic might make help clarify how to get data in & out of fugue. I've always found it a little confusing understanding how to get data into fugue & how the execution engine impacts this

kvnkho commented 1 year ago

So with CSV specifically, it's a bit problematic because you normally need a bunch of keyword arguments that may not be unified across execution engines. That is why you feel like you need a custom creator. If you stick in Pandas, you won't notice this, but the problem is compatibility with other execution engines. This is why parquet is the preferred file format.

The other thing I want to bring up is that DuckDB should be way faster than Pandas for loading a bunch of files with wildcard. We can use DuckDB to preprocess this and write out the parquet file for further processing. With the current setup of the CSV, it's a bit hard to work with, so I would just make a parquet file like this:

from fugue_sql import fsql

headers = [c.strip('"') for c in linecache.getline(str(sample_logger_1), lineno=2).strip().split(",")]

res = fsql("""
LOAD "/var/folders/w2/91_v34nx0xs2npnl3zsl9tmm0000gn/T/campbell_scientific_*.csv" (header=TRUE, skip=3 , infer_schema=TRUE)
YIELD DATAFRAME AS result
""").run("duckdb")

df= res['result'].as_pandas()
df.columns = headers
df.to_parquet("/tmp/test.parquet")

And then use the parquet for downstream stuff. You might be able to get it working on SparkSQL also with a few tweaks for big data. I just didn't try because Spark didn't have access to the tempdir for me. If you use Spark, just write out the parquet with Spark too.

The Creator you made will work for sure, it's just tied to Pandas though (unavoidably) unless you take in an engine and fork the logic of that function. If the purpose is just to read a bunch of small files and collect then, this setup should be the most helpful for preprocessing.

Does that answer you?

goodwanghan commented 1 year ago

@rdmolony I think this dataset is special, it is not in standard csv format. So special handling like what you did:

def read_campbell_scientific_textfile(filepath: str) -> pd.DataFrame:
    headers = [c.strip('"') for c in linecache.getline(filepath, lineno=2).strip().split(",")]
    return pd.read_csv(filepath, names=headers, skiprows=5)

it makes perfect sense. (skiprows should be 4 in your case)

Reading CSV is already very challenging for any backends, unifying them is harder, adding special handling is almost impossible. So using creator is a great way to get data into fugue.

@kvnkho has shown how to get data out of Fugue using yield. You can yield multiple dataframes in one workflow or fugue sql. Programmatically, you can do

dag = FugueWorkflow():
df = dag.create(
        read_campbell_scientific_textfile, params={"filepath": str(csv_filepaths[0])}
    )
df.yield_dataframe_as("result")

dag.run()["result"].as_pandas()

Notice, we may deprecate the with statement for using FugueWorkflow soon, it's a confusing design. Please use dag = FugueWorkflow() ... dag.run() instead.

rdmolony commented 1 year ago

Thanks a lot @kvnkho & @goodwanghan for your detailed explanations. You've cleared it up. I really like the DuckDB option Kevin, though I personally prefer the functional API for now!

I found adding a step to infer a Fugue schema from sample data to be helpful in loading data with Fugue as I can use skiprows in pandas or skip in DuckDB alongside columns in the functional API to load this csv without a pandas intermediate step.

Do you think infer_fugue_schema or similar is something that Fugue would be interested in supporting? Or is it better in general for users to just use custom creators for non-standard data formats like above?

# See below for full solution!

# --- fsql ---

inferred_duckdb_schema = infer_duckdb_schema(str(csv_filepaths[0]))

# I couldn't pass a fugue schema to `columns`; 
# looks like a conflict between fugue `columns` & `DuckDB` `COLUMNS`
result = fsql(f"""
LOAD '{csv_glob}' (HEADER=TRUE, SKIP=3, COLUMNS={inferred_duckdb_schema}, infer_schema=TRUE)
YIELD DATAFRAME AS result
SAVE OVERWRITE '{parquet_file}'
""").run("duckdb")

# --- functional api ---

inferred_fugue_schema = infer_fugue_schema(str(csv_filepaths[0]))

load_csvs_to_parquet = FugueWorkflow(engine="duckdb")
raw_sensors = load_csvs_to_parquet.load(
    csv_glob, columns=inferred_fugue_schema, skip=4, header=False
)
raw_sensors.save(parquet_file)
load_csvs_to_parquet.run(engine="duckdb")
sample = pd.read_parquet(parquet_file)
Full solution ```python import tempfile import pathlib from fugue import FugueWorkflow from fugue_sql import fsql import pandas as pd # --- setup ---- def create_temporary_file( prefix: str, # The output file name will begin with the prefix suffix: str, # The output file name will end with the suffix ) -> pathlib.Path: # Path to the newly created temporary output file return pathlib.Path( tempfile.NamedTemporaryFile(prefix=prefix, suffix=suffix, delete=False).name ) def infer_fugue_schema( filepath: str, # Path to a campbell scientific text file ) -> str: # A string associating column names with their corresponding data types (ex: 'TIMESTAMP:datetime, WS_80m_90deg_Avg:float') headers = list(pd.read_csv(filepath, skiprows=1, nrows=1).columns) sample = pd.read_csv( filepath, names=headers, skiprows=4, parse_dates=[0], nrows=100, ) map_pandas_dtype_to_fugue = { "datetime64[ns]": "datetime", "float64": "double", "int64": "long" } dtypes = { column_name: map_pandas_dtype_to_fugue[str(dtype)] for column_name, dtype in sample.dtypes.to_dict().items() } return ", ".join( column_name + ":" + dtype for column_name, dtype in dtypes.items() ) def infer_duckdb_schema( filepath: str, # Path to a campbell scientific text file ) -> str: # A string associating column names with their corresponding data types (ex: 'TIMESTAMP:datetime, WS_80m_90deg_Avg:float') headers = list(pd.read_csv(filepath, skiprows=1, nrows=1).columns) sample = pd.read_csv( filepath, names=headers, skiprows=4, parse_dates=[0], nrows=100, ) map_pandas_dtype_to_duckdb = { "datetime64[ns]": "TIMESTAMP", "float64": "DOUBLE", "int64": "SMALLINT" } dtypes = { column_name: map_pandas_dtype_to_duckdb[str(dtype)] for column_name, dtype in sample.dtypes.to_dict().items() } return dtypes campbell_scientific_textfiles = [ '''"SITENAME" "TIMESTAMP","RECORD","WS_80m_90deg_Avg","WS_80m_90deg_Std","WS_80m_90deg_3sGust_Max","WS_80m_90deg_Max","WS_80m_270deg_Avg","WS_80m_270deg_Std","WS_80m_270deg_3sGust_Max","WS_80m_270deg_Max","WS_65m_90deg_Avg","WS_65m_90deg_Std","WS_65m_90deg_3sGust_Max","WS_65m_90deg_Max","WS_65m_270deg_Avg","WS_65m_270deg_Std","WS_65m_270deg_3sGust_Max","WS_65m_270deg_Max","WS_50m_90deg_Avg","WS_50m_90deg_Std","WS_50m_90deg_3sGust_Max","WS_50m_90deg_Max","WS_50m_270deg_Avg","WS_50m_270deg_Std","WS_50m_270deg_3sGust_Max","WS_50m_270deg_Max","WS_30m_90deg_Avg","WS_30m_90deg_Std","WS_30m_90deg_3sGust_Max","WS_30m_90deg_Max","WS_30m_270deg_Avg","WS_30m_270deg_Std","WS_30m_270deg_3sGust_Max","WS_30m_270deg_Max","Dir_78m_90deg_avg","Dir_78m_90deg_std","Dir_63m_90deg_avg","Dir_63m_90deg_std","Dir_28m_90deg_avg","Dir_28m_90deg_std","Batt_Volt_Min","Press_Avg","Temp_C80_Avg","Temp_C15_Avg","Hum_Avg" "TS","RN","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","Volts","mB","Deg C","Deg C","%" "","","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","WVc","WVc","WVc","WVc","WVc","WVc","Min","Avg","Avg","Avg","Avg" "2012-05-31 13:10:00",6,1.11,0.315,1.51,1.51,1.082,0.166,1.473,1.49,1.144,0.355,1.717,2.13,1.115,0.231,1.673,1.74,0.944,0.565,1.923,2.13,0.969,0.381,1.79,1.84,0.98,0.48,1.717,2.13,1.041,0.301,1.657,1.69,128.2,50.79,115.9,50.08,70.53,18.46,13.82,1008,14.62,14.54,61.17 "2012-05-31 13:20:00",7,0.769,0.517,1.717,2.13,0.894,0.348,1.69,1.74,0.694,0.482,1.51,1.51,0.781,0.274,1.523,1.59,0.513,0.548,1.923,2.13,0.674,0.277,1.49,1.49,0.786,0.463,1.51,1.51,0.852,0.284,1.757,1.79,193.1,42,183.3,56.55,99.9,36.49,13.8,1008,14.75,15.04,60.12 "2012-05-31 13:30:00",8,0.347,0.439,1.097,1.51,0.522,0.249,1.09,1.09,0.431,0.47,1.303,1.51,0.606,0.2,1.273,1.29,0.661,0.434,1.303,1.51,0.715,0.231,1.44,1.44,1.063,0.333,1.51,1.51,1.058,0.201,1.507,1.54,231.6,73.03,313.8,54.89,54.87,12.13,13.8,1008,15.12,14.86,60.72 "2012-05-31 13:40:00",9,0.787,0.552,1.717,2.13,0.839,0.406,1.673,1.74,0.615,0.512,1.51,1.51,0.825,0.306,1.573,1.59,0.746,0.503,1.51,1.51,0.793,0.317,1.49,1.54,0.949,0.347,1.51,1.51,0.931,0.197,1.34,1.39,28.53,48.27,2.072,13.8,44.24,7.636,13.64,1008,15.29,14.82,59.53 "2012-05-31 13:50:00",10,1.163,0.396,2.13,2.13,1.184,0.278,2.04,2.09,1.156,0.412,1.923,2.13,1.161,0.246,2.123,2.14,1.065,0.475,2.13,2.13,1.129,0.326,2.09,2.14,1.044,0.421,1.923,2.13,1.075,0.301,1.723,1.74,24.75,19.63,28.06,19.57,31,22.96,13.54,1008,14.92,14.94,60.76 ''', '''"SITENAME" "TIMESTAMP","RECORD","WS_80m_90deg_Avg","WS_80m_90deg_Std","WS_80m_90deg_3sGust_Max","WS_80m_90deg_Max","WS_80m_270deg_Avg","WS_80m_270deg_Std","WS_80m_270deg_3sGust_Max","WS_80m_270deg_Max","WS_65m_90deg_Avg","WS_65m_90deg_Std","WS_65m_90deg_3sGust_Max","WS_65m_90deg_Max","WS_65m_270deg_Avg","WS_65m_270deg_Std","WS_65m_270deg_3sGust_Max","WS_65m_270deg_Max","WS_50m_90deg_Avg","WS_50m_90deg_Std","WS_50m_90deg_3sGust_Max","WS_50m_90deg_Max","WS_50m_270deg_Avg","WS_50m_270deg_Std","WS_50m_270deg_3sGust_Max","WS_50m_270deg_Max","WS_30m_90deg_Avg","WS_30m_90deg_Std","WS_30m_90deg_3sGust_Max","WS_30m_90deg_Max","WS_30m_270deg_Avg","WS_30m_270deg_Std","WS_30m_270deg_3sGust_Max","WS_30m_270deg_Max","Dir_78m_90deg_avg","Dir_78m_90deg_std","Dir_63m_90deg_avg","Dir_63m_90deg_std","Dir_28m_90deg_avg","Dir_28m_90deg_std","Batt_Volt_Min","Press_Avg","Temp_C80_Avg","Temp_C15_Avg","Hum_Avg" "TS","RN","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","meters/second","Volts","mB","Deg C","Deg C","%" "","","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","Avg","Std","Max","Max","WVc","WVc","WVc","WVc","WVc","WVc","Min","Avg","Avg","Avg","Avg" "2012-05-31 12:20:00",1,1.383,0.6,2.75,3.37,1.368,0.439,2.673,2.74,1.332,0.478,2.75,2.75,1.242,0.379,2.74,2.79,1.162,0.535,2.337,2.75,1.159,0.354,2.34,2.39,1.27,0.614,2.337,2.75,1.322,0.416,2.157,2.24,240.3,46,242,45.39,222,33.45,13.79,1009,13.84,14.08,65.67 "2012-05-31 12:30:00",2,1.183,0.449,1.923,2.13,1.135,0.324,1.94,1.99,0.948,0.524,1.923,2.13,1.068,0.303,1.723,1.74,0.701,0.547,1.923,2.13,0.913,0.308,1.673,1.74,0.771,0.539,1.717,2.13,0.997,0.28,1.657,1.74,282,26.79,264.3,30.25,278.5,62.87,13.73,1009,14.04,14.45,64.51 "2012-05-31 12:40:00",3,1.762,0.502,2.75,2.75,1.784,0.458,2.873,2.94,1.512,0.486,2.543,2.75,1.633,0.402,2.44,2.49,1.484,0.423,2.13,2.13,1.508,0.343,2.457,2.49,1.561,0.416,2.337,2.75,1.605,0.311,2.09,2.19,351.9,27.08,342.4,25.69,321.4,29.48,13.81,1009,14.14,14.35,63.45 "2012-05-31 12:50:00",4,1.926,0.434,2.957,3.37,1.957,0.405,2.923,2.99,1.871,0.446,2.75,2.75,1.883,0.384,2.84,2.94,1.826,0.511,3.163,3.37,1.833,0.441,3.173,3.24,1.762,0.489,3.163,3.37,1.762,0.432,2.99,3.09,15.54,18.22,15.95,18.5,20.55,27.99,13.82,1009,14.19,14.39,63.79 "2012-05-31 13:00:00",5,1.606,0.424,2.543,2.75,1.616,0.366,2.54,2.59,1.83,0.406,2.543,2.75,1.81,0.365,2.873,2.79,1.956,0.385,2.75,2.75,1.895,0.345,2.69,2.74,2.012,0.431,2.75,2.75,1.965,0.366,2.857,2.94,40,26.7,44.79,18.04,37.4,9.99,13.82,1009,14.27,14.43,62.13 ''' ] csv_filepaths = [ create_temporary_file(prefix="campbell_scientific_", suffix=".csv") for _ in range(2) ] # NOTE: Remove indents in multiline strings " " for fp, content in zip(csv_filepaths, campbell_scientific_textfiles): fp.write_text(content.replace(" ", "")) csv_glob = pathlib.Path(tempfile.gettempdir()) / "campbell_scientific*.csv" parquet_file = pathlib.Path(tempfile.gettempdir()) / "campbell_scientific.parquet" # --- fsql --- inferred_duckdb_schema = infer_duckdb_schema(str(csv_filepaths[0])) result = fsql(f""" LOAD '{csv_glob}' (HEADER=TRUE, SKIP=3, COLUMNS={inferred_duckdb_schema}, infer_schema=TRUE) YIELD DATAFRAME AS result SAVE OVERWRITE '{parquet_file}' """).run("duckdb") # --- functional api --- inferred_fugue_schema = infer_fugue_schema(str(csv_filepaths[0])) load_csvs_to_parquet = FugueWorkflow(engine="duckdb") raw_sensors = load_csvs_to_parquet.load( csv_glob, columns=inferred_fugue_schema, skip=4, header=False ) raw_sensors.save(parquet_file) load_csvs_to_parquet.run(engine="duckdb") sample = pd.read_parquet(parquet_file) ```
kvnkho commented 1 year ago

Hey @rdmolony ,

Sorry for the late reply. I understand the intention here, but I am hesitant to include it until I see the use case more in the wild. Our goal is to be as minimal as possible for the Fugue interface, and the Fugue loading does have a kwarg infer_schema=TRUE so this is kind of confusing. It seems the issue here is that the file being ingested as multiple headers.

I will look more into that COLUMNS thing.

As an immediate answer though, you can consider contributing your example under the Recipes here: https://fugue-tutorials.readthedocs.io/tutorials/applications/recipes/index.html

You can make a page that is something like, how to read multi-header files. What do you think?

rdmolony commented 1 year ago

Sure thing Kevin, that makes sense to me ;)