coiled / benchmarks

BSD 3-Clause "New" or "Revised" License
32 stars 17 forks source link

[TPC-H] PySpark queries fail with `SparkConnectGrpcException` #1365

Closed hendrikmakait closed 9 months ago

hendrikmakait commented 9 months ago

Running any PySpark query at scale 1000 fails with SparkConnectGrpcException.

Example traceback for pytest --benchmark tests/tpch/test_pyspark.py::test_query_1 --scale 1000:

_______________________________________________________________________________________________________________________________________________________________________________________________________________________________________ test_query_1 _______________________________________________________________________________________________________________________________________________________________________________________________________________________________________

spark = <pyspark.sql.connect.session.SparkSession object at 0x16874fbd0>, dataset_path = 's3://coiled-runtime-ci/tpc-h/scale-1000/'

    def test_query_1(spark, dataset_path):
        register_table(spark, dataset_path, "lineitem")

        query = """select
                l_returnflag,
                l_linestatus,
                sum(l_quantity) as sum_qty,
                sum(l_extendedprice) as sum_base_price,
                sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
                sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
                avg(l_quantity) as avg_qty,
                avg(l_extendedprice) as avg_price,
                avg(l_discount) as avg_disc,
                count(*) as count_order
            from
                lineitem
            where
                l_shipdate <= date('1998-09-02')
            group by
                l_returnflag,
                l_linestatus
            order by
                l_returnflag,
                l_linestatus
        """
>       spark.sql(query).show()  # TODO: find better blocking method

tests/tpch/test_pyspark.py:41:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:817: in show
    print(self._show_string(n, truncate, vertical))
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:634: in _show_string
    ).toPandas()
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:1372: in toPandas
    return self._session.client.to_pandas(query)
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:679: in to_pandas
    table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req)
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:982: in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:963: in _execute_and_fetch_as_iterator
    self._handle_error(error)
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:1055: in _handle_error
    self._handle_rpc_error(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <pyspark.sql.connect.client.SparkConnectClient object at 0x1683bbc90>, rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNKNOWN
    details = "Stream removed"
    debug... received from peer  {created_time:"2024-02-08T09:59:30.129973+01:00", grpc_status:2, grpc_message:"Stream removed"}"
>

    def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn:
        """
        Error handling helper for dealing with GRPC Errors. On the server side, certain
        exceptions are enriched with additional RPC Status information. These are
        unpacked in this function and put into the exception.

        To avoid overloading the user with GRPC errors, this message explicitly
        swallows the error context from the call. This GRPC Error is logged however,
        and can be enabled.

        Parameters
        ----------
        rpc_error : grpc.RpcError
           RPC Error containing the details of the exception.

        Returns
        -------
        Throws the appropriate internal Python exception.
        """
        logger.exception("GRPC Error received")
        # We have to cast the value here because, a RpcError is a Call as well.
        # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
        status = rpc_status.from_call(cast(grpc.Call, rpc_error))
        if status:
            for d in status.details:
                if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                    info = error_details_pb2.ErrorInfo()
                    d.Unpack(info)
                    raise convert_exception(info, status.message) from None

            raise SparkConnectGrpcException(status.message) from None
        else:
>           raise SparkConnectGrpcException(str(rpc_error)) from None
E           pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
E               status = StatusCode.UNKNOWN
E               details = "Stream removed"
E               debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-02-08T09:59:30.129973+01:00", grpc_status:2, grpc_message:"Stream removed"}"
E           >

/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:1095: SparkConnectGrpcException
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured stdout setup -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Spark connection established. Spark version 3.4.1.
╭────────────────────────────── Spark Dashboards ──────────────────────────────╮
│                                                                              │
│ Spark UI:     https://cluster-vtygr.dask.host/spark?token=_C4hNY3DTEP-bvMp   │
│                                                                              │
│ Spark Master:                                                                │
│ https://cluster-vtygr.dask.host/spark-master?token=_C4hNY3DTEP-bvMp          │
│                                                                              │
╰──────────────────────────────────────────────────────────────────────────────╯
+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
+---+---+-------+

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured stderr setup -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
2024-02-08 09:52:13,962 - coiled - INFO - Resolving your local Python environment...
2024-02-08 09:52:45,972 - coiled - INFO - Creating Cluster (name: tpch-pyspark-1000-657044e4, https://cloud.coiled.io/clusters/378434?account=dask-benchmarks ). This usually takes 1-2 minutes...
2024-02-08 09:56:27,343 - coiled - ERROR -    | Worker Process         | tpch-pyspark-1000-657044e4-worker-3bb876269c   | error      at 09:56:25 (CET) | Instance Stopped: Instance did not phone home before timeout, but other instances did
E0208 09:57:37.258360000 7906217984 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
E0208 09:57:42.398123000 7906217984 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
E0208 09:57:47.538162000 7906217984 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
E0208 09:57:52.663522000 7906217984 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
E0208 09:57:57.791457000 7906217984 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
E0208 09:58:02.913073000 7906217984 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Captured log setup ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INFO     coiled:cluster.py:908 Resolving your local Python environment...
INFO     coiled:cluster.py:1281 Creating Cluster (name: tpch-pyspark-1000-657044e4, https://cloud.coiled.io/clusters/378434?account=dask-benchmarks ). This usually takes 1-2 minutes...
ERROR    coiled:states.py:140    | Worker Process         | tpch-pyspark-1000-657044e4-worker-3bb876269c   | error      at 09:56:25 (CET) | Instance Stopped: Instance did not phone home before timeout, but other instances did
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured stderr call -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
E0208 09:59:30.115428000 6293024768 hpack_parser.cc:999]               Error parsing 'content-type' metadata: invalid value
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured stderr teardown ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2024-02-08 09:59:31,027 - coiled - INFO - Cluster 378434 deleted successfully.
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured log teardown -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
INFO     coiled:core.py:811 Cluster 378434 deleted successfully.
hendrikmakait commented 9 months ago

Using the nightly build from grpcio changes the traceback but doesn't fix the problem:

Traceback:

_______________________________________________________________________________________________________________________________________________________________________________________________________________________________________ test_query_1 _______________________________________________________________________________________________________________________________________________________________________________________________________________________________________

spark = <pyspark.sql.connect.session.SparkSession object at 0x176e47290>, dataset_path = 's3://coiled-runtime-ci/tpc-h/scale-1000/'

    def test_query_1(spark, dataset_path):
        register_table(spark, dataset_path, "lineitem")

        query = """select
                l_returnflag,
                l_linestatus,
                sum(l_quantity) as sum_qty,
                sum(l_extendedprice) as sum_base_price,
                sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
                sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
                avg(l_quantity) as avg_qty,
                avg(l_extendedprice) as avg_price,
                avg(l_discount) as avg_disc,
                count(*) as count_order
            from
                lineitem
            where
                l_shipdate <= date('1998-09-02')
            group by
                l_returnflag,
                l_linestatus
            order by
                l_returnflag,
                l_linestatus
        """
>       spark.sql(query).show()  # TODO: find better blocking method

tests/tpch/test_pyspark.py:41:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:817: in show
    print(self._show_string(n, truncate, vertical))
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:634: in _show_string
    ).toPandas()
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/dataframe.py:1372: in toPandas
    return self._session.client.to_pandas(query)
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:679: in to_pandas
    table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req)
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:982: in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req):
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:963: in _execute_and_fetch_as_iterator
    self._handle_error(error)
/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:1055: in _handle_error
    self._handle_rpc_error(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <pyspark.sql.connect.client.SparkConnectClient object at 0x176c85290>, rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNAVAILABLE
    details = "Received http2 hea...ated_time:"2024-02-08T10:30:53.416109+01:00", grpc_status:14, grpc_message:"Received http2 header with status: 504"}"
>

    def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn:
        """
        Error handling helper for dealing with GRPC Errors. On the server side, certain
        exceptions are enriched with additional RPC Status information. These are
        unpacked in this function and put into the exception.

        To avoid overloading the user with GRPC errors, this message explicitly
        swallows the error context from the call. This GRPC Error is logged however,
        and can be enabled.

        Parameters
        ----------
        rpc_error : grpc.RpcError
           RPC Error containing the details of the exception.

        Returns
        -------
        Throws the appropriate internal Python exception.
        """
        logger.exception("GRPC Error received")
        # We have to cast the value here because, a RpcError is a Call as well.
        # https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
        status = rpc_status.from_call(cast(grpc.Call, rpc_error))
        if status:
            for d in status.details:
                if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                    info = error_details_pb2.ErrorInfo()
                    d.Unpack(info)
                    raise convert_exception(info, status.message) from None

            raise SparkConnectGrpcException(status.message) from None
        else:
>           raise SparkConnectGrpcException(str(rpc_error)) from None
E           pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
E               status = StatusCode.UNAVAILABLE
E               details = "Received http2 header with status: 504"
E               debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-02-08T10:30:53.416109+01:00", grpc_status:14, grpc_message:"Received http2 header with status: 504"}"
E           >

/opt/homebrew/Caskroom/mambaforge/base/envs/tpch/lib/python3.11/site-packages/pyspark/sql/connect/client.py:1095: SparkConnectGrpcException

Nightly installed via pip install --pre --upgrade --force-reinstall --extra-index-url \ https://packages.grpc.io/archive/2024/02/038215b504b9027ac85527f5fdcd85c76b7e3a1f-42921616-8960-4c25-bcf2-9e79dcff3df0/python \ grpcio grpcio-{tools,health-checking,reflection,testing}

hendrikmakait commented 9 months ago

It looks like grpc-status is not part of the nighthlies, so installing doesn't help.

milesgranger commented 9 months ago

Fixed by https://github.com/coiled/platform/pull/4623