saurfang / spark-sas7bdat

Splittable SAS (.sas7bdat) Input Format for Hadoop and Spark SQL
http://spark-packages.org/package/saurfang/spark-sas7bdat
Apache License 2.0
89 stars 40 forks source link

Splitting Internally Compressed sas7bdat #38

Closed saurfang closed 5 years ago

saurfang commented 6 years ago

There appears to be an issue reading internally compressed sas7bdat as discussed in #32. This is a recap of what we know so far about the issue and what is required to identify the root cause and possible fix.

Background

sas7bdat is a binary data storage file format used by SAS. There is no public documentation about this file format and different versions of SAS appeared to have evolved the file format over the years. The best documentation of sas7bdat can be found at SAS7BDAT Database Binary Format. However, this shall be taken with a grain of salt since it does not accurately reflect the latest revision nor the internal compression used in sas7bdat. On a high level, sas7bdat stores data in pages and each page contains rows of serialized data. This is the basis for spark-sas7bdat package which splits dataset to process in parallel. Internally, spark-sas7bdat delegates deserialization to parso Java library, which does an amazing job deserializing sas7bdat file sequentially.

Problem

spark-sas7bdat contains unit tests to verify sas7bdat can indeed by split and read correctly as a DataFrame. However, there have been reports that it fails for many datasets in the wild. See #32.

It has been verified that parso can read these problematic files just fine (https://github.com/saurfang/spark-sas7bdat/issues/32#issuecomment-419593441). Therefore it is likely that the bug would be where we determine is the appropriate splits to divide the sas7bdat file for parallel processing. https://github.com/saurfang/spark-sas7bdat/blob/master/src/main/scala/com/github/saurfang/sas/mapred/SasRecordReader.scala since everything else is just a thin wrapper over Parso thanks to @mulya 's contribution in #10 .

Furthermore, it is likely this issue only happens in certain version of sas7bdat or in sas7bdat files that enable internally compression. Recall externally compressed file is only not splittable (e.g. gzip.) and we don't support parallel read in spark-sas7bdat.

Proposal

Build Test Case

We first need to collect dataset that exhibits the said issue.

@vivard has provided one: https://github.com/saurfang/spark-sas7bdat/issues/32#issuecomment-412535049 @nelson Where do you get your problematic dataset? Can you generate a dummy dataset that exhibits the same problem?

By setting a very low block size in unit test, we can force Spark to split the input data and hopefully trigger the error.

Debug Test Case

It can be fairly convoluted to debug the splitting logic in Spark and Hadoop. One potential way to debug this is to create a local multi-threaded Parso reader, without Spark, to replicate and validate the splitting logic.

See parso's unit test here where we read row by row from sas7bdat and write to csv. The idea would be to generalize this by looking at number of pages in the sas7bdat file, split pages into chunks, create separate input stream for each chunk, seek the input stream to the page starting location, and process rows from all pages in the chunk. The splitting logic can be refactored from here in this package. Since the issue is unlikely to be related to concurrency because Spark executor creates separate input streams, one can run the above logic sequentially for each chunk, which should be easier to debug.

This might help identify and isolate the issue. We might also discover functions, interfaces, and encapsulation that can be contributed back to Parso, which could greatly simplifies this package.

gitcoinbot commented 5 years ago

Issue Status: 1. Open 2. Started 3. Submitted 4. Done


This issue now has a funding of 0.4 ETH (92.12 USD @ $230.29/ETH) attached to it.

nelson2005 commented 5 years ago

To clarify, this problem also appears on non-compressed sas7bdat; that is, the issue still exists with SAS code like

options compress=no;

That being said, making compressed files splittable (#35 ) would be be most helpful. I'm only suggesting that this problem is possibly not related to SAS internal compression.

nelson2005 commented 5 years ago

Okay, I've really tried to figure out how to fund this and it's maddening. I installed MetaMask, which asks for all my permissions and a blood sample. It seems insanely complicated to set up some Ethereum system just to fund a bounty. @saurfang can I just send you some cash on paypal or something? I'd be happy to double what you started with. Perhaps you use weixin and can contact me OOB there. I haven't figured out how to send you a private message on github either- it seems my lameness knows no bounds. Thanks- tingbaide

saurfang commented 5 years ago

Sorry to hear that, @nelson2005. Cryptocurrency doesn't have a great onboarding UX yet 😢 Your experience is very valuable and I will pass it along or even contribute to a better solution.

Meanwhile you can message me on twitter or maybe on gitter which you can log in with your github account (and I believe you can click on my profile and do a private chat.) We can then figure out what's the best way to get your money into this issue. Thanks a lot for your patience!

gitcoinbot commented 5 years ago

💰 A crowdfund contribution worth 0.04000 DAI (0.04 USD @ $1.0/DAI) has been attached to this funded issue from @saurfang.💰

Want to chip in also? Add your own contribution here.

gitcoinbot commented 5 years ago

💰 A crowdfund contribution worth 199.80000 DAI (199.8 USD @ $1.0/DAI) has been attached to this funded issue from @saurfang.💰

Want to chip in also? Add your own contribution here.

saurfang commented 5 years ago

💰 A crowdfund contribution worth 199.80000 DAI (199.8 USD @ $1.0/DAI) has been attached to this funded issue from @saurfang.💰

Want to chip in also? Add your own contribution here.

Thanks @nelson2005 for the $200 contribution! Please consider this transaction as receipt.

kraj007 commented 5 years ago

Hi @saurfang , we have carried out test for different .sas7bdat file sizes to load into parquet format. first Dataset ~ 420 MB (4 partition ,4 tasks ), second dataset ~ 850MB(7 partitions , 7 tasks) considering spark's default partitions sized 128MB for optimal performance.

In Output parquet files observed that only first or one partitions having data and remaining partitions are having empty files. For small files < 128 MB , it works fine but as file size increases > 128MB we observed this issue Can you please help or guide us to resolve this issue.

saurfang commented 5 years ago

@kraj007 Sorry to hear that, but like I said in the other issue, I do not have any time to work on this project anymore. Your best bet would be to identify a dataset that can replicate your issue and does not contain any private information so you can make it available to download here. Hopefully, somebody might be able to pick it up and help you either because they are solving the issue for themselves or they are attracted by the bounty attached to this issue. In the latter case, you are also welcome to contribute to the bounty to increase the visibility and attractiveness.

nelson2005 commented 5 years ago

@saurfang I ran into the problem fairly frequently, maybe 1 out of 10 datasets. Unfortunately, I'm unable so contribute one now. :(

cristamd commented 5 years ago

If anyone is looking for a workaround, changing the SasInputFormat to always return false for isSplittable prevents the file from being split and causes it to return the correct data in the DataFrame. Obviously this has a massive performance impact for large files since the processing will only use one node. An example of the necessary changes is available here: https://github.com/PerkinElmer/spark-sas7bdat/tree/no-split

nelson2005 commented 5 years ago

@cristamd If I understand correctly, this is means the dataset will be read into a single partition, which is limited in size to 2GB

https://issues.apache.org/jira/browse/SPARK-1476

cristamd commented 5 years ago

@nelson2005 I tested with a 27GB SAS file and it appears you are correct, unfortunately. Thanks for the heads up.

Update: I tested this again since apparently my SAS file got corrupted while uploading it to S3 and with a valid SAS file I am at least able to do a count of the 28G file using one partition. I'll try another job that does a data export to make sure all the data makes it through.

Tagar commented 5 years ago

@nelson2005 @cristamd did you guys try spark-sas7bdat with https://github.com/saurfang/spark-sas7bdat/pull/44 ? It also upgrades parso to 2.0.10 which has some fixes for compressed char data in sas files.

thesuperzapper commented 5 years ago

@nelson2005 @cristamd Here are some pre-built jars for Spark 2.2, with #44 applied for convenience.

spark-sas7bdat_2.11-2.1.0_SNAPSHOT.zip

Tagar commented 5 years ago

@nelson2005 @cristamd Here are some pre-built jars for Spark 2.2, with #44 applied for convenience.

spark-sas7bdat_2.11-2.1.0_SNAPSHOT.zip

Thank you @thesuperzapper .. We're on Spark 2.3 .. let us know if you have a version precompiled for that? Will ask our users if they can test this.

Thanks!

thesuperzapper commented 5 years ago

@Tagar, I am pretty sure that Spark 2.3 should work with those files, tell me if not.

nelson2005 commented 5 years ago

@Tagar I have not had a chance to try #44 yet.

cristamd commented 5 years ago

@Tagar I was able to try both the no split logic and the split changes in #44 with a 27G SAS file and the system was able to process, sort, and save the file as a CSV with both options. Obviously only using one partition was much slower, but both jobs completed. I'm seeing some differences in the output at first glance, but I'll have to dig in a little more to find out if the data being parsed and then saved out is actually different. Hopefully I'll have a chance to do that tomorrow.

This is what I saw in the spark history server for the read stage when using the splitting logic: image

This is what I saw when using the non-splitting logic: image

The row count is the same for both, so I'm hopeful that the data will match as well.

PCaff commented 5 years ago

@Tagar It seems it's not quite there but it's very close! I'm able to get a row count from one of my data sets that would error out prior to the jars you shared above. However, it looks like the split size may be cutting off some rows at the beginning or end of the split. I cannot provide the data file, but I can try to help by providing some metadata. Row mismatch of: 148

Source: RowCount: 282124814 ColCount: 40 CreationDate: Jan 2016 Compressed Number of Deleted Rows: 53700 (Not sure if this affects anything)

Target: RowCount: 282124666 ColCount: 40

cristamd commented 5 years ago

I was finally able to run the comparison between the non-partitioned parser and the partitioned one in #44 and can confirm they produce the same dataframe for our 27GB SAS file. Nice work 👍

nelson2005 commented 5 years ago

@cristamd can you confirm that your 27GB file this was one of the sas7bdat files that failed to load with the pre #44 code?

Tagar commented 5 years ago

@PCaff, @thesuperzapper is driving these fixes. Hopefully he can help with this one.

I cannot provide the data file, but I can try to help by providing some metadata. Row mismatch of: 148

I think it's the only unsuccessful report ..

However, it looks like the split size may be cutting off some rows at the beginning or end of the split.

I think if you force this file to be read with just one partition so there are no splits and see if this fixes 148 lost records in output. If there are none, it's probably a parso issue and not spark-sas7bdat.

Also would be nice to see which records excactly didn't make it to the target.

@thesuperzapper I wonder if parso or spark-sas7bdat have an option to detect bad records? Not sure how this happens.

cristamd commented 5 years ago

@nelson2005 My 27G file didn't fail to load with the pre #44 code (job completed successfully), but it only read the first few pages of data so instead of a million plus rows it was only around 100,000 rows.

nelson2005 commented 5 years ago

@Tagar you can't read big files into one partition due to spark partition size limits. Additionally, I think it's been established that parso reads these files without incident, for example in #32

cristamd commented 5 years ago

@nelson2005 I was able to read the 27G file using one partition without any issues as long as the file was a valid SAS file. With a corrupted SAS file I got a bunch of memory errors. I think our cluster is running Spark 2.2.1 on YARN.

nelson2005 commented 5 years ago

@cristamd that was loading all the data into a single partition, or just a count() or some other operation? Does that mean there's no 2GB partition limit?

cristamd commented 5 years ago

@nelson2005 I read the 27GB SAS file with one partition, repartitioned it to 400 partitions and then wrote it out to a partitioned CSV file in S3. That worked fine without any issues. If you look at the second screenshot in my post from ~12 days ago you can see the Spark History Server output for the job with the single partition. Again, this was running Spark (2.2.1) via YARN on AWS EMR (5.12.1), so maybe the 2G partition limit applies elsewhere.

PCaff commented 5 years ago

@cristamd how did you force it to read into one partition? I would like to give this a go to help pin point the issue.

cristamd commented 5 years ago

@PCaff I made a small change to the code for the SasInputFormat to have isSplittable always return false. I have a branch on my fork of the repo at https://github.com/PerkinElmer/spark-sas7bdat/tree/no-split that already has the change if you want to check it out and compile it.

Tagar commented 5 years ago

I wonder if we should have an equivalent parameter maxPartitions that can be set to 1 to force one single partition.

minPartitions (Default: 0)

  • Int: The minimum number splits to perform. (Adherence is not guaranteed)

See comment here https://github.com/saurfang/spark-sas7bdat/pull/44#issuecomment-440037414

If we would have maxPartitions, like we have minPartitions, testing would be easier for such cases.

cc @thesuperzapper

thesuperzapper commented 5 years ago

@PCaff can you try with the latest jar at the bottom of #44.

And if it still has the issue, can you please post the stderr logs for a single executor (INFO level)? The pieces I am interested in are:

XX/X/XX XX:XX:XX INFO HadoopRDD: XXXXXXXXXXXXXXXXXXXXXX
XX/X/XX XX:XX:XX INFO SasRecordReader: XXXXXXXXXXXXXXXXXXXXXX
thesuperzapper commented 5 years ago

@Tagar I will see about moving to the new HadoopRDD API, which makes use of org.apache.hadoop.mapreduce rather than org.apache.hadoop.mapred meaning we can specify min and max split sizes. (rather than a hint for how many we want.)

Tagar commented 5 years ago

@thesuperzapper That would be a very good change. Thank you for all your improvements here.

PCaff commented 5 years ago

@thesuperzapper I cleansed it a little bit for obvious reasons. exec_logs.txt

thesuperzapper commented 5 years ago

@PCaff can you see if the error still happens with no splits?

Use the latest jar from: #44, and specify maxSplitSize larger than the file. (I am not 100% sure if it will let you do that)

If that doesn't work, I will make you a no split version to check.

PCaff commented 5 years ago

@thesuperzapper Sadly, it did not work. The file updated overnight and now the output is missing even more records (roughly 1 million).

Is it possible the split logic is not capturing the mix pages correctly?

Tagar commented 5 years ago

@PCaff were you able to force this to be a single split like @thesuperzapper suggested?

PCaff commented 5 years ago

@Tagar I was waiting for @thesuperzapper to provide the no split version he mentioned in his previous post.

Tagar commented 5 years ago

@PCaff got it. I wasn't sure if you tried to "specify maxSplitSize larger than the file" as it could force one single partition.

I understand now that you wait for @thesuperzapper to provide a separate no split build.

Thanks.

PCaff commented 5 years ago

@Tagar Oh yeah, I'm sorry. I tried that method, but with no success :(

thesuperzapper commented 5 years ago

@PCaff here is a version which will never split files, please check if it still has the issue:

spark-sas7bdat_2.11-2.1.0_SNAPSHOT_v6_NOSPLIT.zip

nelson2005 commented 5 years ago

FWIW, there are python and R implementations of sas7bdat readers in the wild that could be useful references regarding the file format.

PCaff commented 5 years ago

@thesuperzapper the no split jar did not work as well. Still have the same count of records after the dataframe is loaded.

Tagar commented 5 years ago

FWIW, there are python and R implementations of sas7bdat readers in the wild that could be useful references regarding the file format.

@nelson2005, https://github.com/epam/parso library is superior to other known sas7bdat format reader implementations, for example, it's the only one that supports reading compressed files.

Look for example here https://github.com/BioStatMatt/sas7bdat/issues/12#issuecomment-339378793 - it seems also not being actively maintained.

Tagar commented 5 years ago

@thesuperzapper the no split jar did not work as well. Still have the same count of records after the dataframe is loaded.

It's most likely an issue with parso reader then, which saurfang/spark-sas7bdat is based on.

cc parso library developers @printsev @FriedEgg @Yana-Guseva

printsev commented 5 years ago

Hi everyone, great to hear that the spark version is improved! So the next steps seem to be the following: 1) try with parso itself 2) if the number of records is less than it should be, create a new issue in parso, and we will try to help.

PS would be great if you could check the number of rows using the parso object API, without converting it to CSV to rule out potential issues in CSVWriter

thesuperzapper commented 5 years ago

@PCaff can you try reading that file with https://github.com/epam/parso directly?

kraj007 commented 5 years ago

FWIW, there are python and R implementations of sas7bdat readers in the wild that could be useful references regarding the file format.

I have tried sas7bdat python package. Its slow compared to spark package. It took hours to convert compressed sas file to csv for 3 GB file. But it works great for compressed small SAS files.

Also we can check for pandas.read_sas with DASK (http://docs.dask.org/en/latest/spark.html) which works well with pandas especially

Tagar commented 5 years ago

Didn't know pandas has sas7bdat reader too https://github.com/pandas-dev/pandas/tree/master/pandas/io/sas

Yep it would be much slower as implemented completely in Python and spark-sas7bdat obviously makes reads highly parallel.

Thanks.