sakamomo554101 / sample_data_quality

0 stars 0 forks source link

Deequを試しに使ってみる #1

Closed sakamomo554101 closed 2 years ago

sakamomo554101 commented 2 years ago

https://github.com/awslabs/python-deequ/blob/master/tutorials/basic_example.ipynb

上記を参考にしつつ、試しにコードを書いてみる

sakamomo554101 commented 2 years ago

https://dev.classmethod.jp/articles/deequ/

セットアップが参考になりそう。

sakamomo554101 commented 2 years ago

下記を入れれば良い?

sakamomo554101 commented 2 years ago

https://pydeequ.readthedocs.io/en/latest/

とりあえずドキュメントを読んでみる(PyDeequ)

sakamomo554101 commented 2 years ago

https://pydeequ.readthedocs.io/en/latest/README.html#contributing-developer-setup

developer向けだと、Sparkとかのセットアップが必要そう。 んー、、じゃあ、pydeequのみを入れるとどうなるんだ・・。

sakamomo554101 commented 2 years ago

下記を試した。

from pyspark.sql import SparkSession, Row
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

以下のエラー

Please set env variable SPARK_VERSION
JAVA_HOME is not set
Traceback (most recent call last):
  File "/home/test/test.py", line 4, in <module>
    spark = (SparkSession
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/session.py", line 228, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/usr/local/lib/python3.9/site-packages/pyspark/context.py", line 392, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/lib/python3.9/site-packages/pyspark/context.py", line 144, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/usr/local/lib/python3.9/site-packages/pyspark/context.py", line 339, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/usr/local/lib/python3.9/site-packages/pyspark/java_gateway.py", line 108, in launch_gateway
    raise RuntimeError("Java gateway process exited before sending its port number")
RuntimeError: Java gateway process exited before sending its port number
sakamomo554101 commented 2 years ago

PySparkをローカルで実行できるような調査が必要そう。 https://qiita.com/yoshiyama_hana/items/3acb13deebb5c5f3daa1

sakamomo554101 commented 2 years ago

https://github.com/awslabs/python-deequ/blob/9bcc6bc69f450b5459866448ebcbc1f8d65d65a2/tests/conftest.py#L22

上記みると、localのSparkセッションを取ろうとしてる感じか。 テストコードを見つつ、試すか。

sakamomo554101 commented 2 years ago

https://github.com/yk-st/pyspark_settings/blob/main/Dockerfile pysparkの環境構築だが、参考になりそう

sakamomo554101 commented 2 years ago

下記のようにappNameを指定するケースがあるんだが、これをローカルで指定すれば良い? ※appNameってなんぞ?

 SparkSession.builder \
    .appName("chapter1") 

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.builder.appName.html なるほど、ただの命名だからローカルとか関係ないな。

sakamomo554101 commented 2 years ago

https://kazuhira-r.hatenablog.com/entry/2021/09/11/224140

この記事は結構参考になりそう。 (PySparkを使って、何かしら処理を動かす場合)

また、spark-submitで、処理コードをSparkのクラスター上で実行する感じになりそう

sakamomo554101 commented 2 years ago

spark-shellを使って、インタラクティブにSpark上で処理実行もできる感じか。

sakamomo554101 commented 2 years ago

https://rinoguchi.net/2020/04/pyspark-samples.html お?pysparkでローカルでそのまま実行できる感じなのか?

sakamomo554101 commented 2 years ago

https://spark.apache.org/docs/latest/api/python/

sakamomo554101 commented 2 years ago

お、下記コマンドでJDK入れたら、エラーが解消された。 これ、Sparkは動いてるのか・・?

sdk install java 11.0.10.hs-adpt
sakamomo554101 commented 2 years ago

Sparkって、Log4j使ってるのか。一応注意した方がよさそう。

sakamomo554101 commented 2 years ago

https://github.com/awslabs/python-deequ/blob/master/tutorials/basic_example.ipynb

とりあえず、上記処理が一通り想定通りには動いていそう。

sakamomo554101 commented 2 years ago

うーむ、下記のnotebookを実行したいんだが、単純にsagemaker_pysparkを入れて実行すると、下記のエラーとなる。 https://github.com/awslabs/python-deequ/blob/master/tutorials/analyzers.ipynb

Traceback (most recent call last):
  File "/home/test/test.py", line 1, in <module>
    from pyspark.sql import SparkSession, Row
  File "/usr/local/lib/python3.9/site-packages/pyspark/__init__.py", line 51, in <module>
    from pyspark.context import SparkContext
  File "/usr/local/lib/python3.9/site-packages/pyspark/context.py", line 31, in <module>
    from pyspark import accumulators
  File "/usr/local/lib/python3.9/site-packages/pyspark/accumulators.py", line 97, in <module>
    from pyspark.serializers import read_int, PickleSerializer
  File "/usr/local/lib/python3.9/site-packages/pyspark/serializers.py", line 71, in <module>
    from pyspark import cloudpickle
  File "/usr/local/lib/python3.9/site-packages/pyspark/cloudpickle.py", line 145, in <module>
    _cell_set_template_code = _make_cell_set_template_code()
  File "/usr/local/lib/python3.9/site-packages/pyspark/cloudpickle.py", line 126, in _make_cell_set_template_code
    return types.CodeType(
TypeError: an integer is required (got type bytes)
sakamomo554101 commented 2 years ago

https://github.com/aws/sagemaker-spark/blob/master/sagemaker-pyspark-sdk/README.rst うーむ、、上記みると、大分古いな・・。

python3.6あたりまではいけそう。

sakamomo554101 commented 2 years ago

https://github.com/aws/sagemaker-spark/issues/144 上記で同じissueが上がっているが、放置されてそう。

sakamomo554101 commented 2 years ago

AWSデータサイエンスのサンプルコードである下記を参考にするか。 https://github.com/data-science-on-aws/data-science-on-aws/blob/652cc92580b56dda02938f87e1bb42fbbef333db/05_explore/preprocess-deequ-pyspark.py

sakamomo554101 commented 2 years ago

そもそも、下記はなんで必要なんだ? ローカルのjarを読めばいいのでは?

classpath = ":".join(sagemaker_pyspark.classpath_jars()) # aws-specific jars
sakamomo554101 commented 2 years ago

セットアップ手順

dockerのベースイメージをpython:3.9.12とする。

sakamomo554101 commented 2 years ago

とりあえず上記セットアップ手順でpydeequの処理が動いた(validation)ので、docker fileにまとめる。

あと、スクリプトが処理が続いたまま?(sparkのセッションが維持されている?)っぽいので、 クローズ処理を探す。

sakamomo554101 commented 2 years ago

https://github.com/apache/spark/blob/ec2bfa566ed3c796e91987f7a158e8b60fbd5c42/python/pyspark/sql/session.py#L357

上記のようなやり方で、isStoppedはとれたが、spark.stop()を呼んだあとだと、context情報がなくなってるっぽくて、呼べなくなる(エラーとなる)

sakamomo554101 commented 2 years ago

sparkSession.stopを呼んでも、threadは無くならん・・

sakamomo554101 commented 2 years ago

下記の処理を呼ぶと、threadが止まらなくなってる。

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3) \
        .hasMin("b", lambda x: x == 0) \
        .isComplete("c")  \
        .isUnique("a")  \
        .isContainedIn("a", ["foo", "bar", "baz"]) \
        .isNonNegative("b")) \
    .run()
sakamomo554101 commented 2 years ago

https://pydeequ.readthedocs.io/en/latest/README.html#wrapping-up

上記やると処理が止まるようになった。

sakamomo554101 commented 2 years ago

https://pydeequ.readthedocs.io/en/latest/README.html#set-up-a-pyspark-session 上記以降で、下記を試したが、一部実行できない。(エラーとなる)

profile error

Traceback (most recent call last):
  File "/home/sample_deequ/src/sample_deequ.py", line 37, in <module>
    result = ColumnProfilerRunner(spark).onData(df).run()
  File "/usr/local/lib/python3.9/site-packages/pydeequ/profiles.py", line 121, in run
    run = self._ColumnProfilerRunBuilder.run()
  File "/usr/local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o70.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.immutable.List.flatMap(List.scala:366)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:834)

suggest error

Traceback (most recent call last):
  File "/home/sample_deequ/src/sample_deequ.py", line 98, in <module>
    main()
  File "/home/sample_deequ/src/sample_deequ.py", line 49, in main
    ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()
  File "/usr/local/lib/python3.9/site-packages/pydeequ/suggestions.py", line 81, in run
    result = self._ConstraintSuggestionRunBuilder.run()
  File "/usr/local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o80.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.immutable.List.flatMap(List.scala:366)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.profileAndSuggest(ConstraintSuggestionRunner.scala:203)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.run(ConstraintSuggestionRunner.scala:102)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunBuilder.run(ConstraintSuggestionRunBuilder.scala:226)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:834)
sakamomo554101 commented 2 years ago

んー、詳細はsparkのコード見ればいいんだが、おそらくsparkのバージョンがpydeequに対してマッチしてないのかも。 下記みると、sparkのバージョンは3.0.2なんだよな。(現状は3.2.1を入れてる) https://pydeequ.readthedocs.io/en/latest/README.html#setup-apache-spark

pysparkを3.0.2にしたら、ちゃんとprofileもsuggestionも動いた。

sakamomo554101 commented 2 years ago

2 でサンプルコードをマージ。

notebook環境も試しに作ってみる。

sakamomo554101 commented 2 years ago

https://zenn.dev/ushknn/articles/19e9aa500cb1e7 上記を参考にする。

sakamomo554101 commented 2 years ago

notebook環境は試しに作った

sakamomo554101 commented 2 years ago

3 でnotebook環境も動かした。

sakamomo554101 commented 2 years ago

https://github.com/apache/spark/tree/master/python#python-packaging なるほど、PySpark自体はやはりSparkのIFになっていて、SparkのJarファイルが必要な感じか。

sakamomo554101 commented 2 years ago

対応したので、クローズ。