capitalone / datacompy

Pandas, Polars, and Spark DataFrame comparison for humans and more!
https://capitalone.github.io/datacompy/
Apache License 2.0
467 stars 125 forks source link

Python 3.11 support #227

Closed fdosani closed 6 months ago

fdosani commented 1 year ago

Add in Python 3.11 support. There might be some issues with Spark and 3.11, but would need some looking into. updates requires to setup.cfg, testing suite (potentially), and GH Actions.

luzhangyi319 commented 12 months ago

@fdosani Some one working on this issue? If not, can you please assign it to me? Thanks.

fdosani commented 12 months ago

Done! Appreciate your help here. Feel free to ask questions or comments in the issue.

luzhangyi319 commented 12 months ago

Done! Appreciate your help here. Feel free to ask questions or comments in the issue.

@fdosani Can you please share more details of this issue? A guidance to reproduce this issue would be great. Thanks.

fdosani commented 12 months ago

We currently only support up until Python 3.10. I think there are some limitation with Python 3.11 and Spark support. This would be more of a investigation. You could installed 3.11 and see if the testing etc works, but the last time I tried there were some incompatibility issues. That would be where I'd start. New env with 3.11 and see if we can get datacompy working for Spark in particular.

luzhangyi319 commented 11 months ago

@fdosani I am trying to run pytest in a virtualenv. I followed the developer instruction pip install -e .[dev] to install the dependencies. But got error zsh: no matches found: .[dev]. Any ideas what is the issue please?

fdosani commented 11 months ago

try: pip install -e ."[dev]"

luzhangyi319 commented 11 months ago

@fdosani Ran the test cases in Python 3.11, the dependencies are installed by command line pip install -e ."[tests]". All the Spark test cases passed except two fugue test cases failed. Here is the test results.

========================================================================================================== short test summary info =========================================================================================================== FAILED tests/test_fugue.py::test_is_match_spark - py4j.protocol.Py4JJavaError: An error occurred while calling o199.getResult. FAILED tests/test_fugue.py::test_report_spark - py4j.protocol.Py4JJavaError: An error occurred while calling o701.getResult. ================================================================================================ 2 failed, 140 passed, 129 warnings in 47.79s ================================================================================================

I further tested them in Python 3.10. The test results are the same. The error logs of the two failed cases are the same as Python 3.11. It looks like this project supports Python3.11. The two failed test cases are caused by another existing issue. Here is the error logs if you need them to create another issue.

def test_report_spark(
    spark_session,
    simple_diff_df1,
    simple_diff_df2,
    no_intersection_diff_df1,
    no_intersection_diff_df2,
    large_diff_df1,
    large_diff_df2,
):
    simple_diff_df1.iteritems = simple_diff_df1.items  # pandas 2 compatibility
    simple_diff_df2.iteritems = simple_diff_df2.items  # pandas 2 compatibility
    no_intersection_diff_df1.iteritems = (
        no_intersection_diff_df1.items
    )  # pandas 2 compatibility
    no_intersection_diff_df2.iteritems = (
        no_intersection_diff_df2.items
    )  # pandas 2 compatibility
    large_diff_df1.iteritems = large_diff_df1.items  # pandas 2 compatibility
    large_diff_df2.iteritems = large_diff_df2.items  # pandas 2 compatibility

    df1 = spark_session.createDataFrame(simple_diff_df1)
    df2 = spark_session.createDataFrame(simple_diff_df2)
    comp = Compare(simple_diff_df1, simple_diff_df2, join_columns="aa")
  a = report(df1, df2, ["aa"])

tests/test_fugue.py:357:


datacompy/fugue.py:290: in report res = _distributed_compare( datacompy/fugue.py:599: in _distributed_compare objs = fa.as_array( venv/lib/python3.11/site-packages/triad/utils/dispatcher.py:111: in call return self.run_top(*args, kwds) venv/lib/python3.11/site-packages/triad/utils/dispatcher.py:268: in run_top return list(itertools.islice(self.run(*args, *kwargs), 1))[0] venv/lib/python3.11/site-packages/triad/utils/dispatcher.py:261: in run yield self._func(args, kwargs) venv/lib/python3.11/site-packages/fugue/dataframe/api.py:77: in as_array return as_fugue_df(df).as_array(columns=columns, type_safe=type_safe) venv/lib/python3.11/site-packages/fugue_spark/dataframe.py:150: in as_array return sdf.as_local().as_array(type_safe=type_safe) venv/lib/python3.11/site-packages/fugue/dataframe/dataframe.py:90: in as_local return self.as_local_bounded() venv/lib/python3.11/site-packages/fugue_spark/dataframe.py:99: in as_local_bounded res = PandasDataFrame(self.as_pandas(), self.schema) venv/lib/python3.11/site-packages/fugue_spark/dataframe.py:131: in as_pandas return to_pandas(self.native) venv/lib/python3.11/site-packages/fugue_spark/_utils/convert.py:149: in to_pandas return df.toPandas() venv/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:131: in toPandas batches = self._collect_as_arrow(split_batches=self_destruct) venv/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:284: in _collect_as_arrow jsocket_auth_server.getResult() venv/lib/python3.11/site-packages/py4j/java_gateway.py:1322: in call return_value = get_return_value( venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:179: in deco return f(*a, **kw)


answer = 'xro702', gateway_client = <py4j.clientserver.JavaClient object at 0x123c8f550>, target_id = 'o701', name = 'getResult'

def get_return_value(answer, gateway_client, target_id=None, name=None):
    """Converts an answer received from the Java gateway into a Python object.

    For example, string representation of integers are converted to Python
    integer, string representation of objects are converted to JavaObject
    instances, etc.

    :param answer: the string returned by the Java gateway
    :param gateway_client: the gateway client used to communicate with the Java
        Gateway. Only necessary if the answer is a reference (e.g., object,
        list, map)
    :param target_id: the name of the object from which the answer comes from
        (e.g., *object1* in `object1.hello()`). Optional.
    :param name: the name of the member from which the answer comes from
        (e.g., *hello* in `object1.hello()`). Optional.
    """
    if is_error(answer)[0]:
        if len(answer) > 1:
            type = answer[1]
            value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
            if answer[1] == REFERENCE_TYPE:
              raise Py4JJavaError(

"An error occurred while calling {0}{1}{2}.\n". format(target_id, ".", name), value) E py4j.protocol.Py4JJavaError: An error occurred while calling o701.getResult. E : org.apache.spark.SparkException: Exception thrown in awaitResult: E at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56) E at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) E at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:98) E at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:94) E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75) E at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52) E at java.base/java.lang.reflect.Method.invoke(Method.java:580) E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) E at py4j.Gateway.invoke(Gateway.java:282) E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at py4j.commands.CallCommand.execute(CallCommand.java:79) E at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) E at py4j.ClientServerConnection.run(ClientServerConnection.java:106) E at java.base/java.lang.Thread.run(Thread.java:1583) E Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 5) (192.168.1.240 executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available E at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174) E at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229) E at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224) E at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133) E at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303) E at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276) E at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147) E at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133) E at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140) E at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124) E at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30) E at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96) E at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) E at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102) E at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451) E at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928) E at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282) E E Driver stacktrace: E at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779) E at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) E at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) E at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) E at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242) E at scala.Option.foreach(Option.scala:407) E at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242) E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048) E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971) E at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) E at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984) E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398) E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493) E at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:4274) E at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) E at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:4278) E at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:4254) E at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334) E at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) E at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332) E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) E at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) E at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) E at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332) E at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:4254) E at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:4253) E at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:140) E at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) E at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:142) E at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:137) E at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:114) E at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:108) E at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:69) E at scala.util.Try$.apply(Try.scala:213) E at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:69) E Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available E at org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174) E at org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229) E at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224) E at org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133) E at org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303) E at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276) E at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:147) E at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:133) E at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream(PythonArrowInput.scala:140) E at org.apache.spark.sql.execution.python.BasicPythonArrowInput.writeIteratorToArrowStream$(PythonArrowInput.scala:124) E at org.apache.spark.sql.execution.python.ArrowPythonRunner.writeIteratorToArrowStream(ArrowPythonRunner.scala:30) E at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.$anonfun$writeIteratorToStream$1(PythonArrowInput.scala:96) E at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) E at org.apache.spark.sql.execution.python.PythonArrowInput$$anon$1.writeIteratorToStream(PythonArrowInput.scala:102) E at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451) E at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928) E at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

venv/lib/python3.11/site-packages/py4j/protocol.py:326: Py4JJavaError

fdosani commented 11 months ago

Are you able to dive into what the issue might be and potential fixes?

luzhangyi319 commented 11 months ago

@fdosani Thanks for the feedback. As I mentioned in my last comment, the unit test results are exactly the same between Python 3.11 and Python 3.10. I believe the failed test cases are not caused by Python version upgrade. I am glad to dive into the issue. But can you please create another issue for these failed unit test cases and assign it to me? I think the issue is not related to Python 3.11 support.

gliptak commented 11 months ago

https://github.com/fugue-project/fugue might not have Python 3.11 support

luzhangyi319 commented 11 months ago

https://github.com/fugue-project/fugue might not have Python 3.11 support

Python 3.10 has the same issue.

fdosani commented 6 months ago

Fixed in #263