nedbat / coveragepy

The code coverage tool for Python
https://coverage.readthedocs.io
Apache License 2.0
3.02k stars 434 forks source link

Coverage of pyspark user defined function #658

Open nedbat opened 6 years ago

nedbat commented 6 years ago

Originally reported by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I have a case where I have some pyspark codes in my code base and I am trying to test them. When doing that, I find that any python UDF I can with spark does not get covered even though I am running it. Note that I am running it in the local spark mode.

Reproducible example:

#!python

def get_new_col(spark, df):
    def myadd(x, y):
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

    spark.udf.register('myadd', myadd)
    return df.selectExpr(['*', 'myadd(x, y) as newcol'])

def run():
    try:
        import findspark
        findspark.init()
    except ImportError:
        pass
    import pyspark
    spark = pyspark.sql.SparkSession.Builder().master("local[2]").getOrCreate()
    df = spark.createDataFrame([
        [1.0, 1.0],
        [1.0, 2.0],
        [1.0, 2.0]
    ], ['x', 'y'])

    outdf = get_new_col(spark, df)
    outdf.show()
    outdf.printSchema()
    assert outdf.columns == (df.columns + ['newcol'])

    spark.stop()

if __name__ == '__main__':
    run()

This says the UDF was not covered even though it did run.

Here are the logs when I run it:

#!python
$ coverage run example.py
2018-05-04 14:58:29 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-05-04 14:58:30 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
+---+---+------+
|  x|  y|newcol|
+---+---+------+
|1.0|1.0|   2.0|
|1.0|2.0|   3.0|
|1.0|2.0|   3.0|
+---+---+------+

root
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- newcol: string (nullable = true)

Relevant packages: Python 3.6.4 :: Anaconda, Inc. coverage (4.5.1)

Edit 1: Simplified the reproducible example to remove unittest and pytest.


nedbat commented 6 years ago

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Any thoughts on this Ned ?

I'm not sure if I'm doing something wrong for the subprocess thing. Or is the subprocess work only if the coverage run's python process creates the subprocess ?

nedbat commented 6 years ago

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I am running it with my root anaconda - so, I think this is the right one. (Considering it was giving me an error of "invalid config path '1'" when i gave COVERAGE_PROCESS_START=1 - i believe it is the right one.

And I do not have any .coveragerc file (just default configs)

nedbat commented 6 years ago

@AbdealiJK Thanks for doing all this! One thing that looks wrong to me: the COVERAGE_PROCESS_START environment variable needs to refer to the location of the .covergerc file to use:

export COVERAGE_PROCESS_START=/path/to/.coveragerc

And you need to create a .pth file in the Python environment that is running the subprocesses.

nedbat commented 6 years ago

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I read over http://coverage.readthedocs.io/en/latest/subprocess.html and tried:

EDIT: Realized that setting the env variable to 1 was causing some issues as the value is taken as the coverage configuration file to use. export COVERAGE_PROCESS_START= fixed that error but it didnt cover the UDF :(

What I did:

But this didnt increase my coverage. Inside the function, when I do print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')}) I can see {'COVERAGE_PROCESS_START': ''} which does seem correct.

For debugging I even tried:

    def myadd(x, y):
        import coverage
        cov = coverage.Coverage(config_file=None)
        cov.start()
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

but the coverage did not increase.

nedbat commented 6 years ago

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Ned, I am trying to see if I can understand what spark exactly does so we can figure this out. Here are the steps:

I ran the following in the background:

#!bash

while sleep 0.1; do echo date=$(date) py=$(ps aux | grep pytho[n] | wc -l) java=$(ps aux | grep jav[a] | wc -l) cov=$(ps aux | grep coverag[e] | wc -l); done

And verified that the sequence is:

So, the question I think can boil down to how to make all these python processes use coverage.

EDIT: The processes are:

/Users/abdealijk/anaconda3/bin/python /Users/abdealijk/anaconda3/bin/coverage run example.py
/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/bin/java -cp /usr/local/hadoop/spark/conf/:/usr/local/hadoop/spark/jars/*:/usr/local/hadoop/hadoop/etc/hadoop/ -Xmx1g org.apache.spark.deploy.SparkSubmit --conf spark.master=local[1] pyspark-shell
/Users/abdealijk/anaconda3/bin/python -m pyspark.daemon
nedbat commented 6 years ago

Issue #657 is also about PySpark.

nedbat commented 6 years ago

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Hm, it may be a bit complicated to setup (spark can get messy to install)

To reproduce, install:

For Spark you could try: https://medium.freecodecamp.org/installing-scala-and-apache-spark-on-mac-os-837ae57d283f

I have not had much luck getting it to work with brew though - but my setup is a little more complicated than just spark. It's never worked for me in one shot :P We can talk on gitter or IRC if you like if you run into issues trying to reproduce.

A quick note if you're not familiar with this system, PySpark uses Py4J which calls internal Java routines. So the df.selectExpr you see it actually calling a Java function internally. And that Java function goes back to call the registered UDF with spark.udf.register().

Hence the function is definitely running in a different process inside that JVM I believe.

It's Python Shell > JVM > Python Shell

nedbat commented 6 years ago

Thanks for the report. I've never used PySpark. Before I try to reproduce this, what packages do I need to install to be able to run this code? I'm on a Mac, with Python 3.6. Give me the complete details of what I need to do to see the problem.

ketgo commented 4 years ago

Hi,

Any update on this issue? I am facing the same problem when I run pytest-cov to test python methods that use @udf decorated nested methods.

Thanks!

nedbat commented 3 years ago

Do these help?

kristen-cape commented 3 years ago

I'm running into the same problem. I tried your two suggestions, @nedbat , and had no luck.

I'm invoking coverage.py via pytest-cov. I'm using Python 3.8 and have the following pyspark, pytest, pytest-cov, and coverage modules

Name: pyspark
Version: 2.4.7

Name: pytest-cov
Version: 2.11.1

Name: coverage
Version: 5.4
RaccoonForever commented 2 years ago

Hello, running into the same problem too:

pyspark==3.2.1 pytest==7.1.1 pytest-cov==3.0.0

nedbat commented 2 years ago

If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)

AndrewLane commented 2 years ago

If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)

@nedbat I tried to provide that here: https://github.com/AndrewLane/repro-coverage-issue-with-pyspark-udf

nedbat commented 2 years ago

@AndrewLane experimenting a bit with this, my guess is that the code is running in a subprocess, but that process is started in a way that doesn't get coverage started on it, perhaps because it's started from Java.

nedbat commented 2 years ago

Here is what I have tried. I created a doit.sh file in my copy of the repo to configure subprocess measurement, and to simplify re-running it:

cd /home
rm -f debug.txt
pip install pytest==7.0.1 coverage==6.2
echo "import coverage; coverage.process_startup()" > /usr/local/lib/python3.6/site-packages/00coverage.pth
export COVERAGE_PROCESS_START=$(pwd)/.coveragerc
coverage run -m pytest tests/test.py --disable-pytest-warnings

I created a .coveragerc file in /home:

[run]
parallel = true
source = ./src

I changed code.py to add some debugging like this:

import os, sys
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

with open("debug.txt", "a") as f:
    print(f"outside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)

def translate(english):
    with open("debug.txt", "a") as f:
        print(f"inside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)

    if english == "1":
        return "uno"
    elif english == "2":
        return "dos"
    else:
        return f"Cannot translate {english}"

translation_udf = udf(lambda english: translate(english), StringType())

def transform_data(df):
    return df.withColumn("spanish", translation_udf(col("english")))

When I run (with source /home/doit.sh), I have a debug.txt that looks like this:

outside: 14: trace: <coverage.CTracer object at 0x7fb34c1483f0>

outside: 172: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 183: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 180: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 177: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 188: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 177: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 188: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 183: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 136: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 136: trace: <coverage.CTracer object at 0x7fc301112030>

and the data files are:

root@5fcaa35b647e:/home# ls -al .coverage*
-rw-r--r-- 1 root root 53248 May  7 12:50 .coverage
-rw-r--r-- 1 root root 53248 May  7 13:19 .coverage.5fcaa35b647e.118.982518
-rw-r--r-- 1 root root 53248 May  7 13:19 .coverage.5fcaa35b647e.14.449311
-rw-r--r-- 1 root root    37 May  7 12:46 .coveragerc

That should mean that process id's 118 and 14 recorded data, but the debug output doesn't show those ids. Also, the id of the CTracer object is the same for many different processes, so maybe we are dealing with something similar to #310?

RaccoonForever commented 2 years ago

Hi Ned, sorry for the lack of info.

I fastly made a repo to reproduce the lack of coverage on UDFs.

https://github.com/RaccoonForever/py-cov-potential-issue

Don't look at code, it was just to reproduce :) Thanks for your help!

RaccoonForever commented 2 years ago

Does someone have a workaround ? :'(

shsab commented 2 years ago

I have the same issue when testing pySpark UDFs using pytest. But as a workaround, the UDFs are python functions and I can create tests that specifically test that function only.

RechaviaAmit commented 8 months ago

python 3.8 pyspark==3.2.3 coverage ==6.5.0

Although this is not fully operational, I've uncovered something that might provide insight into how to make it work. example:

file: module1.py

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

spark = SparkSession.builder \
    .appName("SimpleSparkUDF") \
    .getOrCreate()

data = [("John", 25)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

def add_one_udf(age):
    import os
    import coverage
    import module2

    # set up coverage   
    content = f"""
    [run]
    branch = True
    cover_pylib = False
    concurrency = multiprocessing,thread
    parallel = True
    data_file = {os.getcwd()}/.coverage
    """

    with open("coveragerc_temp", "w") as file:
        file.write(content.strip())

    os.environ["COVERAGE_PROCESS_START"] = "coveragerc_temp"

    cov = coverage.process_startup()

    print("This line isn't covered by coverage.py")
    module2.foo()    

    cov.stop()
    cov.save()

    coverage_data_files = [
        f"{current_directory}/{name}" for name in os.listdir(current_directory) if name.startswith(".coverage")
    ]

    # send back the .coverage files to my local machine
    ubprocess.run(["scp", "-o", "StrictHostKeyChecking=no", *coverage_data_files, os.environ["localhost"])

    return age + 1

add_one_udf_spark = udf(add_one_udf, IntegerType())

result = df.withColumn("AgePlusOne", add_one_udf_spark(df["Age"]))

result.show()

file: module2.py

def foo():
    print("This line is covered by coverage.py")
    print("This line is covered by coverage.py")
    print("This line is covered by coverage.py")

On my local host, I received 1 .coverage file. After mapping the paths to my local machine and executing coverage combine + coverage report, I can easily see that the lines from module2.py are covered (75% of the lines, excluding the function signature). However, it seems like module1.py isn't covered at all. Additionally, I tried to debug it with the trace flag in the debug section, and module1.py isn't mentioned at all.

Does anyone have insights into why module1.py isn't covered at all, unlike module2.py?

nedbat commented 8 months ago

Does anyone have insights into why module1.py isn't covered at all, unlike module2.py?

You have to show us the steps you used to run the code, and how you used coverage on it. module1 starts coverage then calls foo(). This is why the three lines in the body of foo() are covered. Are you using coverage in any other way? If not, then module1 would not be measured.

RechaviaAmit commented 8 months ago

@nedbat thanks for the fast response! No, in this example, I don't use coverage in any other way.

This is why the three lines in the body of foo() are covered.

I totally understand why these 3 lines are covered.

If not, then module1 would not be measured.

I'm not sure I get It, why isn't module1 covered, but module2 is covered? I'm initiating the coverage.process_startup() command on module1 within the PySpark UDF.

nedbat commented 8 months ago

Coverage can't measure code that ran before it was started. This is why the def line in module2 isn't covered. It will only measure code executed after coverage has been started, and additionally, only in functions called, not in the current function.

You are using the API in a really unusual way: process_startup() isn't meant for you to call from your code: it's for something that runs very early in the process, like sitecustomize or a .pth file.

nedbat commented 8 months ago

The best way to help get pyspark support is to provide a complete runnable example, with detailed explicit instructions.

RechaviaAmit commented 8 months ago

and additionally, only in functions called, not in the current function.

You are correct; this was my issue. Thank you very much!

For those attempting to edit the sitecustomize within the Spark executor, it didn't work for me. Please share if someone manages to collect coverage from the UDF itself using alternative methods.

The best way to help get pyspark support is to provide a complete runnable example, with detailed explicit instructions.

You are correct, unfortunately, it can be challenging to reproduce the Spark environment as it runs on distributed machines.