saurfang / spark-sas7bdat

Splittable SAS (.sas7bdat) Input Format for Hadoop and Spark SQL
http://spark-packages.org/package/saurfang/spark-sas7bdat
Apache License 2.0
89 stars 40 forks source link

Unable to load sasdata set into Spark 2.3.0 #53

Closed SrikaranJangidi closed 3 years ago

SrikaranJangidi commented 4 years ago

Hi We have a spark cluster with Spark 2.3.0. The jar spark-sas7bdat-2.1.0-s_2.11.jar is not working for Spark 2.3.0 and it seems it will work for Spark 2.2.0 Please suggest if there is a work around for Spark 2.3.0 or we have to downgrade spark version to Spark 2.2.0

thesuperzapper commented 4 years ago

@SrikaranJangidi This package should work with all versions of Spark 2, can you describe the error you are getting. (Logs, etc)

SrikaranJangidi commented 4 years ago

@thesuperzapper Thanks for responding. Below is the information. Please let us know if more information is needed.

We are trying to read this file: datetime.sas7bdat which is available on this GitHub page: https://github.com/saurfang/spark-sas7bdat and getting this error: Code used:

create spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("yarn").appName("sort data")\ .config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11")\ .config('spark.driver.memory','3g')\ .config('spark.executor.memory','3g')\ .config('spark.executor.cores','2')\ .config('spark.executor.instances', '1')\ .getOrCreate()

spark.read.format("com.github.saurfang.sas.spark").load("/hdfs/path/datetime.sas7bdat",inferLong=True)

Error LOG:

Py4JJavaError Traceback (most recent call last)

in () ----> 1 df =spark.read.format("com.github.saurfang.sas.spark").load("/user/rtripa1/datetime.sas7bdat",inferLong=True, forceLowercaseNames=True) /opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py in load(self, path, format, schema, **options) 164 self.options(**options) 165 if isinstance(path, basestring): --> 166 return self._df(self._jreader.load(path)) 167 elif path is not None: 168 if type(path) != list: /opt/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/anaconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o163.load. : java.util.concurrent.TimeoutException: Timed out after 60 sec while reading file metadata, file might be corrupt. (Change timeout with 'metadataTimeout' paramater) at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:189) at com.github.saurfang.sas.spark.SasRelation.(SasRelation.scala:62) at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:43) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
thesuperzapper commented 4 years ago

@SrikaranJangidi that error is usually associated with a corrupt file, can you:

SrikaranJangidi commented 4 years ago

@thesuperzapper I checked with my developer and below is his response. He tried with another file as well. Please advise.

The file is not corrupted as I am able to read it using pandas.

import pandas as pd df = pd.read_sas('datetime.sas7bdat',format='sas7bdat')

Also, I tried to read another file: ag121a_supp.sample which is of size 715kb and still getting the same error. This file is also available on GitHub: https://github.com/saurfang/spark-sas7bdat/tree/master/src/test/resources

Py4JJavaError: An error occurred while calling o120.load. : java.util.concurrent.TimeoutException: Timed out after 60 sec while reading file metadata, file might be corrupt. (Change timeout with 'metadataTimeout' paramater) at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:189) at com.github.saurfang.sas.spark.SasRelation.(SasRelation.scala:62) at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:43) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/anaconda3/lib/python3.7/site-packages/pyspark/jars/spark-sas7bdat-2.1.0-s_2.11.jar pyspark-shell'

import findspark findspark.init("/opt/cloudera/parcels/SPARK2/lib/spark2/")

create spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("yarn").appName("sup_data")\ .config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11")\ .config('spark.driver.memory','3g')\ .config('spark.executor.memory','3g')\ .config('spark.executor.cores','2')\ .config('spark.executor.instances', '1')\ .getOrCreate()

df = spark.read.format("com.github.saurfang.sas.spark").load("ag121a_supp_sample.sas7bdat")

The file is not corrupted as I am able to read it using pandas.

import pandas as pd df = pd.read_sas('datetime.sas7bdat',format='sas7bdat')

Also, I tried to read another file: ag121a_supp.sample which is of size 715kb and still getting the same error. This file is also available on GitHub: https://github.com/saurfang/spark-sas7bdat/tree/master/src/test/resources

Py4JJavaError: An error occurred while calling o120.load. : java.util.concurrent.TimeoutException: Timed out after 60 sec while reading file metadata, file might be corrupt. (Change timeout with 'metadataTimeout' paramater) at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:189) at com.github.saurfang.sas.spark.SasRelation.(SasRelation.scala:62) at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:43) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/anaconda3/lib/python3.7/site-packages/pyspark/jars/spark-sas7bdat-2.1.0-s_2.11.jar pyspark-shell'

import findspark findspark.init("/opt/cloudera/parcels/SPARK2/lib/spark2/")

create spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("yarn").appName("sup_data")\ .config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11")\ .config('spark.driver.memory','3g')\ .config('spark.executor.memory','3g')\ .config('spark.executor.cores','2')\ .config('spark.executor.instances', '1')\ .getOrCreate()

df = spark.read.format("com.github.saurfang.sas.spark").load("ag121a_supp_sample.sas7bdat") The file is not corrupted as I am able to read it using pandas.

import pandas as pd df = pd.read_sas('datetime.sas7bdat',format='sas7bdat')

Also, I tried to read another file: ag121a_supp.sample which is of size 715kb and still getting the same error. This file is also available on GitHub: https://github.com/saurfang/spark-sas7bdat/tree/master/src/test/resources

Py4JJavaError: An error occurred while calling o120.load. : java.util.concurrent.TimeoutException: Timed out after 60 sec while reading file metadata, file might be corrupt. (Change timeout with 'metadataTimeout' paramater) at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:189) at com.github.saurfang.sas.spark.SasRelation.(SasRelation.scala:62) at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:43) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42) at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

import os os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/anaconda3/lib/python3.7/site-packages/pyspark/jars/spark-sas7bdat-2.1.0-s_2.11.jar pyspark-shell'

import findspark findspark.init("/opt/cloudera/parcels/SPARK2/lib/spark2/")

create spark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("yarn").appName("sup_data")\ .config("spark.jars.packages","saurfang:spark-sas7bdat:2.1.0-s_2.11")\ .config('spark.driver.memory','3g')\ .config('spark.executor.memory','3g')\ .config('spark.executor.cores','2')\ .config('spark.executor.instances', '1')\ .getOrCreate()

df = spark.read.format("com.github.saurfang.sas.spark").load("ag121a_supp_sample.sas7bdat")

Srikaran

SrikaranJangidi commented 4 years ago

Hi. Any update on this please?

Thanks Srkaran.

chonterry commented 4 years ago

Hi, I am running into this issue as well.

java.util.concurrent.TimeoutException: Timed out after 60 sec while reading file metadata

Thanks, Terry

nelson2005 commented 4 years ago

Make sure the parso library is available, thought I wouldn't expect that's the cause of this particular error. FWIW I haven't had any problems with this library on Spark 2.2, 2.3, or 2.4.

chonterry commented 4 years ago

Hello - Thanks for the comment re: parso library inclusion. After I included parso, I now get the following:

py4j.protocol.Py4JJavaError: An error occurred while calling o66.load. : java.lang.ClassCastException: java.util.Arrays$ArrayList cannot be cast to java.util.Set

chonterry commented 4 years ago

Hi - The above was caused by using parso version < 2.0.10. It is imperative that 2.0.10 is being used. I missed that requirement. The issue above has been resolved after referencing parso 2.0.10.

Also, initially I did not reference parso jar in the execution. Therefore it was producing:

java.util.concurrent.TimeoutException: Timed out after 60 sec while reading file metadata

After referencing parso, the above exception went away.

dmoore247 commented 4 years ago

importing parso 2.0.10, with spark 2.4.5 and restarting fixed this type of error for me. This works really well now!

Speccles96 commented 4 years ago

Can anyone tell me how to download and import Parso? I am getting this same issue and think it is because I don't have parso installed properly

thesuperzapper commented 4 years ago

@Speccles96, just download the jar from maven and pass it with spark-submit --jars

You can find the link at the top of the README.md under "requirements"

EDIT: or if your spark cluster has internet, you can pass the maven coordinates with spark-submit --packages

Speccles96 commented 4 years ago

@thesuperzapper Maybe I'm not fully understanding, but how would I run spark-submit --jars in my python script? What I am doing is starting up a jupyter notebook and running the code below. I put the parso-2.0.10.jar in my Java\lib path with all of the other .jars

Spark Version:2.4.6 Scala Version:2.12.2 Java Version:1.8.0_261

import findspark
findspark.init()
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df=spark.read.format('com.github.saurfang.sas.spark')\
.load(r'D:\IvyDB\opprcd\opprcd2019.sas7bdat')

I am trying to replicate what was done in this article http://blog.rubypdf.com/2018/10/12/how-two-read-sas-data-with-pyspark/

thesuperzapper commented 3 years ago

@saurfang can you close this?

saurfang commented 3 years ago

@thesuperzapper I added you as a collaborator and you shall feel free to close any issues that you feel are already addressed or stale. ty!