flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.22k stars 551 forks source link

[BUG] Failed to run spark task if using fast-register #3904

Open pingsutw opened 11 months ago

pingsutw commented 11 months ago

Describe the bug

Failed to run the spark task with below errors

Traceback (most recent call last):

      File "/usr/local/lib/python3.9/dist-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/test.py", line 33, in hello_spark
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1250, in reduce
        vals = self.mapPartitions(func).collect()
      File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1197, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
        return_value = get_return_value(
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
        return f(*a, **kw)
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(

Message:

    An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 0.0 because task 1 (partition 1)
cannot run anywhere due to node and executor excludeOnFailure.
Most recent failure:
Lost task 1.1 in stage 0.0 (TID 3) (10.244.0.54 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
AttributeError: Can't get attribute 'f' on <module 'test' from '/usr/lib/python3.9/test/__init__.py'>

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)

Expected behavior

It works if using non-fast register, so it should be able to work if using fast register.

pyflyte run --remote flyte-example/test.py wf <- fails pyflyte register --non-fast --version v11 flyte-example/test.py <- works

Additional context to reproduce

workflow.py

import datetime
import random
from operator import add

import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
from flytekit.image_spec.image_spec import ImageSpec

spark_image = ImageSpec(registry="pingsutw")

@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        }
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
    container_image=spark_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Sparkfk wifth Partitions: {}".format(partitions))
    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val

def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0

@task(cache_version="1", container_image=spark_image)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1

@workflow
def wf(triggered_date: datetime.datetime = datetime.datetime.now()) -> float:
    """
    Using the workflow is still as any other workflow. 
    As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

github-actions[bot] commented 2 months ago

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable. Thank you for your contribution and understanding! 🙏