saurfang / spark-sas7bdat

Splittable SAS (.sas7bdat) Input Format for Hadoop and Spark SQL
http://spark-packages.org/package/saurfang/spark-sas7bdat
Apache License 2.0
88 stars 40 forks source link

Support reading multiple SAS files, and merging of schema. #45

Open thesuperzapper opened 5 years ago

thesuperzapper commented 5 years ago

We should allow multiple sas files to be read into a single dataframe, merging their schema if possible.

This is a lot easier with Parso 2.0.10, as we can specify which column names we want to read, and in what order, when calling readNext().

My recommendation is that we copy how core spark merges schema, which is effectively taking the union of all columns, and filling nulls for rows from files without a given column, and throw an error for columns with the same name, but incompatible types, (e.g. string & Int).

The only issue with this is that as we infer types from SAS column formatters, if the user has inferDecimal enabled, and they have changed column precision between two files, the merge will fail.

Parquet Example:

spark.read.option("mergeSchema", "true").parquet("table*.parquet")
nelson2005 commented 5 years ago

SAS column formatters are for display-only... I'd strongly suggest not using them for inferring numeric datatypes with the exception of date/timestamp. SAS has only two datatypes... char and double. Trying to add much magic structure on that on top of the randomness with which people assign column formats is bound to devolve into a festering pool of lameness.

In that vein I'd suggest removing the numeric type inference (excepting date/timestamp) which masqerade as adding functionality that doesn't actually exist in SAS. I'm happy to talk about it more, but feel like this feature will be a gift that keeps on giving without much payoff.

Tagar commented 5 years ago

I see your point @nelson2005 and it makes a lot of sense.

Since non-double data type discover is not on default currently, I think if a developer leverages formatters schema discovery, he should be aware of its limitations etc.

So I think it's good to have this functionality in as long as its disabled by default and we have a proper disclaimer in documentation...

Spark for example has schema inference for text data source (csv) and while it's not perfect https://github.com/apache/spark/blob/f982ca07e80074bdc1e3b742c5e21cf368e4ede2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala#L99 it works very well in many cases.

thesuperzapper commented 5 years ago

@nelson2005, the main issue is that sas7bdat sometimes stores with integer precision errors, e.g. 1.999999, and while SAS itself seems to correct that, the file itself still stores 1.999999.

But yea, we chose to disable non-date inference by default for the reasons you provided.

nelson2005 commented 5 years ago

I'm not sure I would call it an integer precision error, since integer is not a SAS data type.

Agreed that SAS has lots of easter eggs there that don't translate well to the Spark world. Fuzzing numeric comparisons such that 1.999999 == 2 is one of them. Since formats are a display-only concept, I'd think getting the same number that's displayed in SAS to show up in Spark might be a big task. For example, suppose the user has 1.99999 formatted as 8.3. What number would that convert to in Spark? DecimalType(8,3)? Will that display the same thing to the user in Spark and SAS?

At any rate, I think since there schema inference is much easier here than for CSV, since the SAS type system lacks the richness of Spark datatypes. A simple first cut might be to require all of the globbed files to have identical schemas (char, date, or numeric)... after the user converts them to, say, Parquet, they can use the schema merging that's already available there.

nelson2005 commented 5 years ago

@saurfang @thesuperzapper @Tagar I'd be willing to put up a bonus on this feature, say $100 USD.

dmoore247 commented 4 years ago

Hmmm, one approach is N writes to the Delta Lake format (delta.io) with .option("mergeSchema","true") My use case didn't call for merge schema, it did call for multiple parallel writes. This is my PySpark code with the merge schema option... I'd rather see this built into the reader.

cols = ["ID",..., "FILE_PATH" ]
files = ["x1.sas7bdat","x2.sas7bdat","x3.sas7bdat","x4.sas7bdat","x5.sas7bdat","x6.sas7bdat","x7.sas7bdat","x8.sas7bdat"]

#setup function to load one file using spark
def sas_read_write_delta(file_name):
  print(file_name)
  (
    spark
      .read
      .format("com.github.saurfang.sas.spark")
      .load(path+file_name, 
            forceLowercaseNames=True, 
            inferLong=True, 
            metadataTimeout=60)
      .withColumn("FILE_PATH", input_file_name())
      .select(cols)
      .write
        .format("delta")
        .mode("append")
        .option("mergeSchema","true")
        .saveAsTable(target_table)
  )

# run (4) loads in parallel, each load runs in parallel by splitting the source file
# Delta Lake tables support concurrent writes
if __name__ == '__main__':
    from multiprocessing.pool import ThreadPool
    pool = ThreadPool(4)
    pool.map(
      lambda file_name: sas_read_write_delta(file_name), files)