DataThirstLtd / Databricks-Connect-PySpark

A guide of how to build good Data Pipelines with Databricks Connect using best practices
21 stars 22 forks source link

Don't work with pandas udf #6

Open amoyrand opened 3 years ago

amoyrand commented 3 years ago

Hello I'm trying to replicate your example in my own project. But I have an issue with python udf: always run into this error ModuleNotFoundError: No module named 'pipelines'

I simply changed your code as is:

amazon.py:

# Example ETL with no parameters - see etl() function

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_timestamp
from pipelines.utils import transformations, configmanagement as cm
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

spark = SparkSession.builder.getOrCreate()

def extract_Amazon(filePath):
    return spark.read.format("parquet").load(filePath)

def transform_Amazon(df):
    df = df.withColumn("meta_timestamp", lit(current_timestamp()))
    df = transformations.addDummyColumn(df)
    return df

def load_Amazon(df):
  spark.sql("DROP TABLE IF EXISTS amazon")
  df.write.format("parquet").mode("overwrite").saveAsTable("amazon")
  return

def addone(df):
    df['price'] = df['price'] + 1
    return df

def etl():
  df = extract_Amazon("/databricks-datasets/amazon/test4K")
  df = transform_Amazon(df)

  schema = StructType([
        StructField("brand", StringType(), True),
        StructField("price", DoubleType(), True),
    ])

  df = df.select('brand', 'price').groupBy("brand").applyInPandas(transformations.addone, schema)
  print(df.show())
  # load_Amazon(df)

and it gives me this error:

pyspark.sql.utils.PythonException: An exception was thrown from a UDF: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pipelines''. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pipelines'

During handling of the above exception, another exception occurred:

pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pipelines'

Any idea how to solve this ?

simondmorias commented 3 years ago

the pipelines folder is not in your path. See main.py, basically it can't find the module.

amoyrand commented 3 years ago

in the main, I have the lines:

   dirname = os.path.abspath(os.path.dirname(__file__))
    sys.path.insert(0, (os.path.join(dirname, 'pipelines')))

But still get the ModuleNotFoundError: No module named 'pipelines' error...

simondmorias commented 3 years ago

Have you renamed the pipelines folder, or moved the location relative to main? What version of Python are you using?

From: amoyrand @.> Sent: 31 March 2021 08:33 To: DataThirstLtd/Databricks-Connect-PySpark @.> Cc: Simon D'Morias @.>; Comment @.> Subject: Re: [DataThirstLtd/Databricks-Connect-PySpark] Don't work with pandas udf (#6)

in the main, I have the lines:

dirname = os.path.abspath(os.path.dirname(file))

sys.path.insert(0, (os.path.join(dirname, 'pipelines')))

But still get the ModuleNotFoundError: No module named 'pipelines' error...

— You are receiving this because you commented. Reply to this email directly, view it on GitHubhttps://github.com/DataThirstLtd/Databricks-Connect-PySpark/issues/6#issuecomment-810844776, or unsubscribehttps://github.com/notifications/unsubscribe-auth/ADBMOSAXIIG4AO4KLHS6SJDTGLFZ3ANCNFSM42CMRHCQ.

amoyrand commented 3 years ago

Hello @simondmorias . I finally had this working. thanks for you tips.

I'm now facing another problem:

I'm using sedona with databricks. when running my code in a notebook everything goes well (I installed the thrid party jars on the cluster)

but when running with databricks-connect, i'm getting a TypeError: 'JavaPackage' object is not callable when running;

spark = SparkSession. \
              builder. \
              appName('appName'). \
              config("spark.serializer", KryoSerializer.getName). \
              config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
              config('spark.jars.packages',
                     'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
                     'org.datasyslab:geotools-wrapper:geotools-24.0'). \
              getOrCreate()

SedonaRegistrator.registerAll(spark)

I guess that the jars are not well imported in local.

Did you ever experienced this? do you know how to import local jars with databricks-connect ?

Thank you

simondmorias commented 3 years ago

On your local machine run databricks-connect get-jar-dir - add the jars there.

amoyrand commented 3 years ago

Hello. I got the registerAll working but then have another issue with databricks connect:

from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import KryoSerializer, SedonaKryoRegistrator

sparkSession = SparkSession. \
    builder. \
    master("local[*]").\
    appName('appName'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.0'). \
    getOrCreate()

SedonaRegistrator.registerAll(sparkSession)

print(sparkSession.sql('describe function st_point').show())

print(sparkSession.sql("SELECT st_point(41.40338, 2.17403) AS geometry").show())

here I can describe the UDF st_point but when trying to use it, it fails with:

Undefined function: 'st_point'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

full log here: https://filebin.net/yzy0tn58myzso8l4/log.txt?t=cntz46u4

Any idea what happens here ?

Thanks a lot for your help

simondmorias commented 3 years ago

I would post on StackOverflow - that is more of a general Spark problem rather than this container.

VellStef commented 1 year ago

@amoyrand how did you solve the original problem where pipelines were not detected ?