eakmanrq / sqlframe

Turning PySpark Into a Universal DataFrame API
https://sqlframe.readthedocs.io/en/stable/
MIT License
173 stars 2 forks source link

Improve DuckDB .csv Loading #11

Closed djouallah closed 1 month ago

djouallah commented 1 month ago

is this supported when using duckdb ?

from sqlframe.duckdb.session import DuckDBSession
user_schema="""
 I  STRING,UNIT  STRING,XX  STRING,VERSION  STRING,SETTLEMENTDATE  STRING,RUNNO  STRING,DUID  STRING,
 INTERVENTION  STRING,DISPATCHMODE  STRING,AGCSTATUS  STRING,INITIALMW  STRING,TOTALCLEARED  STRING,RAMPDOWNRATE  STRING,
 RAMPUPRATE  STRING,LOWER5MIN  STRING,LOWER60SEC  STRING,LOWER6SEC  STRING,RAISE5MIN  STRING,RAISE60SEC  STRING,RAISE6SEC  STRING,
 MARGINAL5MINVALUE  STRING,MARGINAL60SECVALUE  STRING,MARGINAL6SECVALUE  STRING,MARGINALVALUE  STRING,VIOLATION5MINDEGREE  STRING,
 VIOLATION60SECDEGREE  STRING,VIOLATION6SECDEGREE  STRING,VIOLATIONDEGREE  STRING,LOWERREG  STRING,RAISEREG  STRING,AVAILABILITY  STRING,
 RAISE6SECFLAGS  STRING,RAISE60SECFLAGS  STRING,RAISE5MINFLAGS  STRING,RAISEREGFLAGS  STRING,LOWER6SECFLAGS  STRING,LOWER60SECFLAGS STRING,
 LOWER5MINFLAGS  STRING,LOWERREGFLAGS  STRING,RAISEREGAVAILABILITY  STRING,RAISEREGENABLEMENTMAX  STRING,RAISEREGENABLEMENTMIN  STRING,
 LOWERREGAVAILABILITY  STRING,LOWERREGENABLEMENTMAX  STRING,LOWERREGENABLEMENTMIN  STRING,RAISE6SECACTUALAVAILABILITY  STRING,
 RAISE60SECACTUALAVAILABILITY  STRING,RAISE5MINACTUALAVAILABILITY  STRING,RAISEREGACTUALAVAILABILITY  STRING,
 LOWER6SECACTUALAVAILABILITY  STRING,LOWER60SECACTUALAVAILABILITY  STRING,LOWER5MINACTUALAVAILABILITY  STRING,
 LOWERREGACTUALAVAILABILITY  STRING
 """
df = spark.read.format("csv")\
     .option("header","true") \
     .schema(user_schema)\
     .load(['file1.csv','file2.csv'])\
     .filter("unit='DUNIT' and version =3 and I='D'")\
     .drop('xx')\
     .drop('I')\
     .withColumn('SETTLEMENTDATE',f.to_timestamp('SETTLEMENTDATE','yyyy/MM/dd HH:mm:ss'))\
     .withColumn("file", f.regexp_extract(f.input_file_name(), r"Daily_Reports\/([^\W'\.']+\.CSV)", 1))\
     .withColumn("PRIORITY", f.lit(1))
df_cols = list(set(df.columns) - {'SETTLEMENTDATE','DUID','file','UNIT','transactionId','PRIORITY'})
for col_name in df_cols:
    df = df.withColumn(col_name, f.col(col_name).cast('double'))
df = df.withColumn('DATE', f.to_date(f.col('SETTLEMENTDATE')))\
      .withColumn('YEAR', f.year(f.col('SETTLEMENTDATE')))
eakmanrq commented 1 month ago

Thanks for sharing your example @djouallah. SQLFrame currently focuses more on the transformation (read/writing to table) and data analysis use cases. This is more in the ETL space since you are reading from a file. That being said SQLFrame does have basic support which you can see some tests for here: https://github.com/eakmanrq/sqlframe/blob/296ca4cb4ed0c6d1d9c6fec2edb968eabd465324/tests/integration/engines/test_engine_reader.py

The key is that if this were to be supported SQLFrame would need a place to write intermediate tables to with the loaded data since it couldn't be represented in pure SQL due to it's size. Curious if other user want this and I could prioritize this feature.

eakmanrq commented 1 month ago

Oh and here is how you would adjust your imports for DuckDB:

from sqlframe.duckdb.session import DuckDBSession
from sqlframe.duckdb import functions as f

session = DuckDBSession()

df = session.read.load("<filepath>", format=csv)\
...
djouallah commented 1 month ago

nevermind, I think i get it, let's just read the csv as lazy dataframe and do transformation using spark API, but i am getthing an error

ParserException: Parser Error: SELECT clause without selection list

any reproducible notebook

https://colab.research.google.com/drive/1ZZJJ1ymOFN9NPudC39XEhaS_hsmwrFp5?usp=sharing

import duckdb
from sqlframe.duckdb.session import DuckDBSession
from sqlframe.duckdb import functions as f
session = DuckDBSession()
raw =duckdb.sql(F"""from read_csv({files_to_upload_full_Path},
    Skip=1,header =0,all_varchar=1,
    columns={{
    'I': 'VARCHAR','UNIT': 'VARCHAR','XX': 'VARCHAR','VERSION': 'VARCHAR','SETTLEMENTDATE': 'VARCHAR','RUNNO': 'VARCHAR',
    'DUID': 'VARCHAR','INTERVENTION': 'VARCHAR','DISPATCHMODE': 'VARCHAR','AGCSTATUS': 'VARCHAR','INITIALMW': 'VARCHAR',
    'TOTALCLEARED': 'VARCHAR','RAMPDOWNRATE': 'VARCHAR','RAMPUPRATE': 'VARCHAR','LOWER5MIN': 'VARCHAR',
    'LOWER60SEC': 'VARCHAR','LOWER6SEC': 'VARCHAR','RAISE5MIN': 'VARCHAR','RAISE60SEC': 'VARCHAR',
    'RAISE6SEC': 'VARCHAR','MARGINAL5MINVALUE': 'VARCHAR','MARGINAL60SECVALUE': 'VARCHAR',
    'MARGINAL6SECVALUE': 'VARCHAR','MARGINALVALUE': 'VARCHAR','VIOLATION5MINDEGREE': 'VARCHAR',
    'VIOLATION60SECDEGREE': 'VARCHAR','VIOLATION6SECDEGREE': 'VARCHAR','VIOLATIONDEGREE': 'VARCHAR',
    'LOWERREG': 'VARCHAR','RAISEREG': 'VARCHAR','AVAILABILITY': 'VARCHAR','RAISE6SECFLAGS': 'VARCHAR',
    'RAISE60SECFLAGS': 'VARCHAR','RAISE5MINFLAGS': 'VARCHAR','RAISEREGFLAGS': 'VARCHAR',
    'LOWER6SECFLAGS': 'VARCHAR','LOWER60SECFLAGS': 'VARCHAR','LOWER5MINFLAGS': 'VARCHAR',
    'LOWERREGFLAGS': 'VARCHAR','RAISEREGAVAILABILITY': 'VARCHAR','RAISEREGENABLEMENTMAX': 'VARCHAR',
    'RAISEREGENABLEMENTMIN': 'VARCHAR','LOWERREGAVAILABILITY': 'VARCHAR','LOWERREGENABLEMENTMAX': 'VARCHAR',
    'LOWERREGENABLEMENTMIN': 'VARCHAR','RAISE6SECACTUALAVAILABILITY': 'VARCHAR',
    'RAISE60SECACTUALAVAILABILITY': 'VARCHAR','RAISE5MINACTUALAVAILABILITY': 'VARCHAR',
    'RAISEREGACTUALAVAILABILITY': 'VARCHAR','LOWER6SECACTUALAVAILABILITY': 'VARCHAR',
    'LOWER60SECACTUALAVAILABILITY': 'VARCHAR','LOWER5MINACTUALAVAILABILITY': 'VARCHAR','LOWERREGACTUALAVAILABILITY': 'VARCHAR'
    }},
    filename =1,null_padding = true,ignore_errors=1,auto_detect=false) """)

df = session.table("raw")\
     .filter("unit='DUNIT' and version =3 and I='D'")\
     .drop('xx')\
     .drop('I')\
     .withColumn('SETTLEMENTDATE',f.to_timestamp('SETTLEMENTDATE','yyyy/MM/dd HH:mm:ss'))\
     .withColumn("PRIORITY", f.lit(1))
df_cols = list(set(df.columns) - {'SETTLEMENTDATE','DUID','file','UNIT','transactionId','PRIORITY'})
for col_name in df_cols:
    df = df.withColumn(col_name, f.col(col_name).cast('double'))
df = df.withColumn('DATE', f.to_date(f.col('SETTLEMENTDATE')))\
      .withColumn('YEAR', f.year(f.col('SETTLEMENTDATE')))
df.show()
eakmanrq commented 1 month ago

Thanks for the additional details. Btw if you create something in a duckdb connection outside of SQLFrame just make sure to pass that in as the connection to use. So something like this:

import duckdb
from sqlframe.duckdb.session import DuckDBSession
from sqlframe.duckdb import functions as f

conn = duckdb.connect()
conn.sql("<whatever>")
session = DuckDBSession(conn=conn)

For your specific issue it looks like there is actually a bug in SQLGlot in that it doesn't respect the addition arguments you have like skip and header. So let me get that fixed and I will see where that gets us.

eakmanrq commented 1 month ago

Thanks @djouallah for sharing the colab notebook which made it easy for me to repro locally.

I just released 1.1.0 which fixed a variety of issues. I've got a version of your pipeline working but I think the remaining issue is the columns provided. I think the column list you provide doesn't match the underlying data and therefore I am getting casting errors trying to cast string to double (that have characters in them). Can you take a look and see if there is a bug in the pipeline itself?

import duckdb
from sqlframe.duckdb.session import DuckDBSession
from sqlframe.duckdb import functions as F

files_to_upload_full_path = "/Users/eakmanrq/repos/sqlframe/env/PUBLIC_DAILY_202404210000_20240422040503.CSV"
session = DuckDBSession()
df = session.read.load(
    files_to_upload_full_path, 
    schema="I VARCHAR, UNIT VARCHAR, XX VARCHAR, VERSION VARCHAR, SETTLEMENTDATE VARCHAR, RUNNO VARCHAR, DUID VARCHAR, INTERVENTION VARCHAR, DISPATCHMODE VARCHAR, AGCSTATUS VARCHAR, INITIALMW VARCHAR, TOTALCLEARED VARCHAR, RAMPDOWNRATE VARCHAR, RAMPUPRATE VARCHAR, LOWER5MIN VARCHAR, LOWER60SEC VARCHAR, LOWER6SEC VARCHAR, RAISE5MIN VARCHAR, RAISE60SEC VARCHAR, RAISE6SEC VARCHAR, MARGINAL5MINVALUE VARCHAR, MARGINAL60SECVALUE VARCHAR, MARGINAL6SECVALUE VARCHAR, MARGINALVALUE VARCHAR, VIOLATION5MINDEGREE VARCHAR, VIOLATION60SECDEGREE VARCHAR, VIOLATION6SECDEGREE VARCHAR, VIOLATIONDEGREE VARCHAR, LOWERREG VARCHAR, RAISEREG VARCHAR, AVAILABILITY VARCHAR, RAISE6SECFLAGS VARCHAR, RAISE60SECFLAGS VARCHAR, RAISE5MINFLAGS VARCHAR, RAISEREGFLAGS VARCHAR, LOWER6SECFLAGS VARCHAR, LOWER60SECFLAGS VARCHAR, LOWER5MINFLAGS VARCHAR, LOWERREGFLAGS VARCHAR, RAISEREGAVAILABILITY VARCHAR, RAISEREGENABLEMENTMAX VARCHAR, RAISEREGENABLEMENTMIN VARCHAR, LOWERREGAVAILABILITY VARCHAR, LOWERREGENABLEMENTMAX VARCHAR, LOWERREGENABLEMENTMIN VARCHAR, RAISE6SECACTUALAVAILABILITY VARCHAR, RAISE60SECACTUALAVAILABILITY VARCHAR, RAISE5MINACTUALAVAILABILITY VARCHAR, RAISEREGACTUALAVAILABILITY VARCHAR, LOWER6SECACTUALAVAILABILITY VARCHAR, LOWER60SECACTUALAVAILABILITY VARCHAR, LOWER5MINACTUALAVAILABILITY VARCHAR, LOWERREGACTUALAVAILABILITY VARCHAR",
    format="csv", skip=1, header=1, all_varchar=1, filename=1, null_padding=True, ignore_errors=1, auto_detect=False)\
     .drop('xx')\
     .drop('I')\
     .withColumn('SETTLEMENTDATE',F.to_timestamp('SETTLEMENTDATE','%Y/%m/%d %H:%M:%S'))\
     .withColumn("PRIORITY", F.lit(1))
df_cols = list(set(df.columns) - {'settlementdate','duid','file','unit','transactionid','priority'})
for col_name in df_cols:
    df = df.withColumn(col_name, F.col(col_name).cast('string'))
df = df.withColumn('DATE', F.to_date(F.col('SETTLEMENTDATE')))\
      .withColumn('YEAR', F.year(F.col('SETTLEMENTDATE')))
df.show()
djouallah commented 1 month ago

yes sorry my fault, forgot to add filter, but it does not seems to be working

.filter("unit='DUNIT' and version =3 and i='D'")

eakmanrq commented 1 month ago

Adding that filter seems to work for me on 1.1.0. You can't view the SQL due to a SQLGlot bug but it should execute fine. Let me know if that is not the case for you.

import duckdb
from sqlframe.duckdb.session import DuckDBSession
from sqlframe.duckdb import functions as F

files_to_upload_full_path = "/Users/eakmanrq/repos/sqlframe/env/PUBLIC_DAILY_202404210000_20240422040503.CSV"
session = DuckDBSession()
df = session.read.load(
    files_to_upload_full_path, 
    schema="I VARCHAR, UNIT VARCHAR, XX VARCHAR, VERSION VARCHAR, SETTLEMENTDATE VARCHAR, RUNNO VARCHAR, DUID VARCHAR, INTERVENTION VARCHAR, DISPATCHMODE VARCHAR, AGCSTATUS VARCHAR, INITIALMW VARCHAR, TOTALCLEARED VARCHAR, RAMPDOWNRATE VARCHAR, RAMPUPRATE VARCHAR, LOWER5MIN VARCHAR, LOWER60SEC VARCHAR, LOWER6SEC VARCHAR, RAISE5MIN VARCHAR, RAISE60SEC VARCHAR, RAISE6SEC VARCHAR, MARGINAL5MINVALUE VARCHAR, MARGINAL60SECVALUE VARCHAR, MARGINAL6SECVALUE VARCHAR, MARGINALVALUE VARCHAR, VIOLATION5MINDEGREE VARCHAR, VIOLATION60SECDEGREE VARCHAR, VIOLATION6SECDEGREE VARCHAR, VIOLATIONDEGREE VARCHAR, LOWERREG VARCHAR, RAISEREG VARCHAR, AVAILABILITY VARCHAR, RAISE6SECFLAGS VARCHAR, RAISE60SECFLAGS VARCHAR, RAISE5MINFLAGS VARCHAR, RAISEREGFLAGS VARCHAR, LOWER6SECFLAGS VARCHAR, LOWER60SECFLAGS VARCHAR, LOWER5MINFLAGS VARCHAR, LOWERREGFLAGS VARCHAR, RAISEREGAVAILABILITY VARCHAR, RAISEREGENABLEMENTMAX VARCHAR, RAISEREGENABLEMENTMIN VARCHAR, LOWERREGAVAILABILITY VARCHAR, LOWERREGENABLEMENTMAX VARCHAR, LOWERREGENABLEMENTMIN VARCHAR, RAISE6SECACTUALAVAILABILITY VARCHAR, RAISE60SECACTUALAVAILABILITY VARCHAR, RAISE5MINACTUALAVAILABILITY VARCHAR, RAISEREGACTUALAVAILABILITY VARCHAR, LOWER6SECACTUALAVAILABILITY VARCHAR, LOWER60SECACTUALAVAILABILITY VARCHAR, LOWER5MINACTUALAVAILABILITY VARCHAR, LOWERREGACTUALAVAILABILITY VARCHAR",
    format="csv", skip=1, header=1, all_varchar=1, filename=1, null_padding=True, ignore_errors=1, auto_detect=False)\
     .filter("unit='DUNIT' and version =3 and i='D'")\
     .drop('xx')\
     .drop('I')\
     .withColumn('SETTLEMENTDATE',F.to_timestamp('SETTLEMENTDATE','%Y/%m/%d %H:%M:%S'))\
     .withColumn("PRIORITY", F.lit(1))
df_cols = list(set(df.columns) - {'settlementdate','duid','file','unit','transactionid','priority'})
for col_name in df_cols:
    df = df.withColumn(col_name, F.col(col_name).cast('string'))
df = df.withColumn('DATE', F.to_date(F.col('SETTLEMENTDATE')))\
      .withColumn('YEAR', F.year(F.col('SETTLEMENTDATE')))
df.show()
djouallah commented 1 month ago

version should be 3 ?

image
eakmanrq commented 1 month ago

Ah good call. This should be fixed in 1.1.1. Can you please give it a try?

djouallah commented 1 month ago

perfect !!! last point, filename=1 does not seems to be working

djouallah commented 1 month ago

last question how to export that df to arrow or pandas or how it works ?

djouallah commented 1 month ago

sorry, did not notice there is a site for documentation :) https://sqlframe.readthedocs.io/en/latest/duckdb/#example-usage

djouallah commented 1 month ago

thank for the filename, it works now, now export to pandas is broken

from deltalake.writer import write_deltalake
write_deltalake("sqlframe",df.toPandas(), mode="append")
image
eakmanrq commented 1 month ago

Ok thanks will look at this tonight

eakmanrq commented 1 month ago

@djouallah Can you give 1.1.3 a try?

djouallah commented 1 month ago

ok it is better but, the df types become object and i can't use it to write delta table xx.dtypes

unit                                    object
version                                 object
settlementdate                  datetime64[ns]
runno                                   object
duid                                    object
intervention                            object
dispatchmode                            object
agcstatus                               object
initialmw                               object
totalcleared                            object
rampdownrate                            object
rampuprate                              object
lower5min                               object
lower60sec                              object
lower6sec                               object
raise5min                               object
raise60sec                              object
raise6sec                               object
marginal5minvalue                       object
marginal60secvalue                      object
marginal6secvalue                       object
marginalvalue                           object
violation5mindegree                     object
violation60secdegree                    object
violation6secdegree                     object
violationdegree                         object
lowerreg                                object
raisereg                                object
availability                            object
raise6secflags                          object
raise60secflags                         object
raise5minflags                          object
raiseregflags                           object
lower6secflags                          object
lower60secflags                         object
lower5minflags                          object
lowerregflags                           object
raiseregavailability                    object
raiseregenablementmax                   object
raiseregenablementmin                   object
lowerregavailability                    object
lowerregenablementmax                   object
lowerregenablementmin                   object
raise6secactualavailability             object
raise60secactualavailability            object
raise5minactualavailability             object
raiseregactualavailability              object
lower6secactualavailability             object
lower60secactualavailability            object
lower5minactualavailability             object
lowerregactualavailability              object
filename                                object
priority                                 int64
date                                    object
year                                     int64
dtype: object

when trying to use delta lake writer

from deltalake.writer import write_deltalake
write_deltalake("sqlframe",xx, mode="append")
SchemaMismatchError                       Traceback (most recent call last)
[<ipython-input-14-c2b45991ba34>](https://localhost:8080/#) in <cell line: 2>()
      1 from deltalake.writer import write_deltalake
----> 2 write_deltalake("sqlframe",xx, mode="append")

[/usr/local/lib/python3.10/dist-packages/deltalake/writer.py](https://localhost:8080/#) in write_deltalake(table_or_uri, data, schema, partition_by, mode, file_options, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, name, description, configuration, overwrite_schema, schema_mode, storage_options, partition_filters, predicate, large_dtypes, engine, writer_properties, custom_metadata)
    556 
    557         if table is None:
--> 558             write_deltalake_pyarrow(
    559                 table_uri,
    560                 schema,

SchemaMismatchError: Invalid data type for Delta Lake: Null

another error df.sql() does not work

FileNotFoundError                         Traceback (most recent call last)
[<ipython-input-18-a6bed05ee79d>](https://localhost:8080/#) in <cell line: 1>()
----> 1 df.sql()

7 frames
[/usr/local/lib/python3.10/dist-packages/sqlglot/helper.py](https://localhost:8080/#) in open_file(file_name)
    255 def open_file(file_name: str) -> t.TextIO:
    256     """Open a file that may be compressed as gzip and return it in universal newline mode."""
--> 257     with open(file_name, "rb") as f:
    258         gzipped = f.read(2) == b"\x1f\x8b"
    259 

FileNotFoundError: [Errno 2] No such file or directory: ''
djouallah commented 1 month ago

I know spark infamously don't support for to_arrow() but i will really appreciate if you can add it the duckdb backend, I much prefer not deal with pandas dataframe at all

eakmanrq commented 1 month ago

The sql error is expected this is a bug in SQLGlot. Trying to prioritize issues within SQLFrame right now.

For the deltalake writer issue I think I may end up solving part of it with this issue: https://github.com/eakmanrq/sqlframe/issues/25

I think your issue is I don't know datatypes when I go to create the pandas dataframe but if I used temp views in DuckDB I could. So I will look into that.

djouallah commented 1 month ago

any news ?

eakmanrq commented 1 month ago

I'm hoping I can take a look towards the end of the week. I will keep you posted.

eakmanrq commented 1 month ago

For the pandas issue, I tried a few different things but I couldn't seem to get the correct data types back. I started wondering if this is a DuckDB issue and it looks like it may actually be a Pandas issue: https://github.com/duckdb/duckdb/issues/10305

As suggested in that issue, can you try doing df.toPandas().convert_dtypes() and see what you get?

djouallah commented 1 month ago

thanks I understand the issue now, fields where all the values are null don't have a type which make arrow not happy

table = pa.Table.from_pandas(xx)
table

marginal5minvalue: null marginal60secvalue: null marginal6secvalue: null marginalvalue: null violation5mindegree: null violation60secdegree: null violation6secdegree: null violationdegree: null

eakmanrq commented 1 month ago

Hmm yeah I see. I the think the underlying issue is with duckdb/pandas right?

djouallah commented 1 month ago

I think I will close it, thanks for your help