twosigma / flint

A Time Series Library for Apache Spark
Apache License 2.0
995 stars 184 forks source link

Using flint with pyspark on yarn #19

Open dadokkio opened 6 years ago

dadokkio commented 6 years ago

Hi, I'm trying to use flint submitting a pyspark job on yarn.

>> ./bin/pyspark --master yarn --deploy-mode client --jars /opt/flint-assembly-0.2.0-SNAPSHOT.jar --py-files /opt/flint-assembly-0.2.0-SNAPSHOT.jar`
[..]
SparkSession available as 'spark'.
>>> import ts.flint
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ModuleNotFoundError: No module named 'ts'
>>> import sys
>>> sys.path
['', '/tmp/spark-1d453f8f-379a-4f22-a7e4-cabe9dad15c5/userFiles-0b6ed883-15b1-4893-acb8-977f74b09913/flint-assembly-0.2.0-SNAPSHOT.jar', '/tmp/spark-1d453f8f-379a-4f22-a7e4-cabe9dad15c5/userFiles-0b6ed883-15b1-4893-acb8-977f74b09913', '/usr/hdp/2.6.1.0-129/spark2/python/lib/py4j-0.10.4-src.zip', '/usr/hdp/2.6.1.0-129/spark2/python', '/usr/hdp/2.6.1.0-129/spark2', '/opt/miniconda3/envs/lhd_spark/lib/python36.zip', '/opt/miniconda3/envs/lhd_spark/lib/python3.6', '/opt/miniconda3/envs/lhd_spark/lib/python3.6/lib-dynload', '/opt/miniconda3/envs/lhd_spark/lib/python3.6/site-packages', '/opt/miniconda3/envs/lhd_spark/lib/python3.6/site-packages/setuptools-27.2.0-py3.6.egg']

Using same approach on master local works properly while on yarn seems to refer to invalid path and the import fails.

I was able to use the library following this similar topic extracting python code from the jar and copying it in my working directory.

Is that ok or there is a better way to proceed?

icexelloss commented 6 years ago

Hi, yeah that seems fine.

mattomatic commented 5 years ago

The issue is actually how the wheel file gets unpacked on the YARN worker nodes. If you unzip the wheel file you'll see that there isn't an init.py file underneath the top level "ts" directory -- so the worker nodes that try to import "ts.anythingelse" will fail.

msanders in ~/Downloads
> mv ts_flint-0.6.0-py3-none-any.whl flint.zip

msanders in ~/Downloads
> unzip flint.zip
Archive:  flint.zip
  inflating: ts/flint/__init__.py
  inflating: ts/flint/_version.py
  inflating: ts/flint/clocks.py
  inflating: ts/flint/context.py
  inflating: ts/flint/dataframe.py
  inflating: ts/flint/error.py
  inflating: ts/flint/functions.py
  inflating: ts/flint/group.py
  inflating: ts/flint/java.py
  inflating: ts/flint/readwriter.py
  inflating: ts/flint/serializer.py
  inflating: ts/flint/summarizers.py
  inflating: ts/flint/udf.py
  inflating: ts/flint/utils.py
  inflating: ts/flint/windows.py
  inflating: ts_flint-0.6.0.dist-info/top_level.txt
  inflating: ts_flint-0.6.0.dist-info/WHEEL
  inflating: ts_flint-0.6.0.dist-info/METADATA
  inflating: ts_flint-0.6.0.dist-info/RECORD

The fix is to tell pip that flint also provides a top level "ts" directory: https://github.com/twosigma/flint/pull/65

This results in happier situation for the workers:

msanders in ~/code/flint/python/dist on master
> unzip ts_flint-0+untagged.302.g5aa2ada.dirty-py3-none-any.whl
Archive:  ts_flint-0+untagged.302.g5aa2ada.dirty-py3-none-any.whl
  inflating: ts/__init__.py
  inflating: ts/flint/__init__.py
  inflating: ts/flint/_version.py
  inflating: ts/flint/clocks.py
  inflating: ts/flint/context.py
  inflating: ts/flint/dataframe.py
  inflating: ts/flint/error.py
  inflating: ts/flint/functions.py
  inflating: ts/flint/group.py
  inflating: ts/flint/java.py
  inflating: ts/flint/readwriter.py
  inflating: ts/flint/serializer.py
  inflating: ts/flint/summarizers.py
  inflating: ts/flint/udf.py
  inflating: ts/flint/utils.py
  inflating: ts/flint/windows.py
  inflating: ts_flint-0+untagged.302.g5aa2ada.dirty.dist-info/METADATA
  inflating: ts_flint-0+untagged.302.g5aa2ada.dirty.dist-info/WHEEL
  inflating: ts_flint-0+untagged.302.g5aa2ada.dirty.dist-info/top_level.txt
  inflating: ts_flint-0+untagged.302.g5aa2ada.dirty.dist-info/RECORD