flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.63k stars 623 forks source link

[BUG] Failed to run BQ task when cache is enabled #2864

Closed pingsutw closed 1 year ago

pingsutw commented 2 years ago

Describe the bug

Slack Thread

Failed to run BQ task when the cache is enabled because type validation is failing.

{"json":{"exec_id":"atfkcwwv5cfr7wzhqq94","node":"n1","ns":"flytesnacks-
development","res_ver":"5501760","routine":"worker-3","tasktype":"python-
task","wf":"flytesnacks:development:example_test.wf"},"level":"error","msg":"DataCatalog failed to get outputs from 
artifact 45bd1d68-a013-43b1-a56b-b7597b559125, err: unexpected artifactData: [o0] type: 
[structured_dataset_type:\u003c\u003e ] does not match any task output type: 
[structured_dataset_type:\u003cformat:\"parquet\" \u003e ]","ts":"2022-09-12T06:56:41Z"}

When the cache is enabled, we'll retrieve artifacts from datacatalog and check if the structured dataset's schema and format match the expected type.

However, the default format of the structured dataset in the expected type is always Parquet, but the format of the output structured dataset is "".

@task(cache=True, cache_version="1.0")
def t2() -> StructuredDataset: # The default format of structured dataset is Parquet here
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    return StructuredDataset(df, uri=bq_uri) # The format of structured dataset is "" 

Two ways to fix it.

  1. Change these lines to
    if len(structuredDatasetType.Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
        return false
    }
  2. Change the default format of the expected type to "" in flytekit, and change these lines to the below. However, it will break existing users. If users upgrade flytekit, they have to upgrade the propeller as well.
    if len(t.literalType.GetStructuredDatasetType().Format) != 0 && !strings.EqualFold(structuredDatasetType.Format, t.literalType.GetStructuredDatasetType().Format) {
        return false
    }

    structuredDatasetType is input type t.literalType.GetStructuredDatasetType() is expected type

Expected behavior

BQ task should run successfully even if the cache is enabled

Additional context to reproduce

import uuid
import pandas as pd
from typing_extensions import Annotated
from flytekit import task, workflow, StructuredDataset, kwtypes

@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
    df = pd.DataFrame({
        "name": ["dylan", "steve"],
        "age": [33, 32]
    })
    return StructuredDataset(df)

@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> StructuredDataset:
    df = pd.DataFrame({"len": [len(sd.open(pd.DataFrame).all())]})
    table_id = str(uuid.uuid4())
    bq_uri = f"bq://flyte-test-340607.dataset.{table_id}"
    return StructuredDataset(df, uri=bq_uri)

@workflow
def wf() -> StructuredDataset:
    return t2(sd=t1())

if __name__ == "__main__":
    wf()

Screenshots

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

pingsutw commented 2 years ago

cc @wild-endeavor

hamersaw commented 2 years ago

I'm not sure I have enough context on StructuredDatasets to completely understand this issue. It sounds like this type supports many different formats (ie. parquet, csv, etc), but the column definitions are not known at compile-time. So currently, we type check only the format. The issue is that sometimes the format can be defined as empty (ie. "")? It seems like with how opinionated Flyte is with statically typing data allowing an empty format to satisfy everything is only asking for problems downstream. Where does the default parquet format come from? Where exactly does the empty format come from?

pingsutw commented 2 years ago

Type validation happens at compile(register) time and runtime.

@task(cache=True, cache_version="2.0")
def t1() -> StructuredDataset:
    ... 
    return StructuredDataset(df=df, uri="bq://...")

@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset) -> Annotated[StructuredDataset, "csv"]
    return return StructuredDataset(df=df)

@workflow
def wf():
    t2(sd: t1())
hamersaw commented 2 years ago

OK, last two questions: (1) Does the format ever matter in a StructuredDataset? It seems that it can be used arbitrarily, and then if it doesn't matter why do we include it in type checking? (2) If I understand correctly, this breaks because the function returns a different type; although not to the same extent, this is same as:

@task
def foo() -> int:
    return "a"

In this case it will obviously break in local execution. When executed in a cluster, how does this break? Will FlyteKit try to read / write the data and fail? If we just remove the runtime check on StructuredDataset format will this fail the same way?

hamersaw commented 2 years ago

Quick update - the following example:

@task
def say_hello() -> str:
    return 0

@workflow
def my_wf() -> str:
    res = say_hello()
    return res

fails with:

[2/2] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 526, in dispatch_execute
        raise TypeError(

Message:

    Failed to convert return value for var o0 for function flyte.workflows.example.say_hello with error <class 'flytekit.core.type_engine.TypeTransformerFailedError'>: Type of Val '0' is not an instance of <class 'str'>

SYSTEM ERROR! Contact platform administrators.

I'm still wondering if it makes sense to remove the runtime check on the data type during the cache lookup and just let flytekit fail if there's an issue?!?!

pingsutw commented 2 years ago

I'm still wondering if it makes sense to remove the runtime check on the data type during the cache lookup and just let flytekit fail if there's an issue?!?!

There are still some benefits to do a runtime check on the data type during the cache. For example, If the error happens on the propeller side, then we don't need to spend additional time and resources to run the task.

we just remove the runtime check on StructuredDataset format

I think you're probably right.

@task(cache=True, cache_version="2.0")
def t1() -> Annotated[StructuredDataset, "csv"]:
    ... 
    return StructuredDataset(df=df) # here we already have "format" and "URI" in StructuredDatasetMetadata

@task(cache=True, cache_version="2.0")
def t2(sd: StructuredDataset):
    ...     

@workflow
def wf():
    t2(sd: t1())

In the above example, it fails at compile time because the format in the SD doesn't match. However, I think It is not necessary for users to specify the format when using SD as input because we already have SD metadata in upstream literal, and the SD transformer is able to find the specific decoder to deserialize SD.

As a result, I think we can remove this line. just remove the check on the format.

cc @wild-endeavor

gitgraghu commented 1 year ago

any update on the fix for this issue ?

pingsutw commented 1 year ago

@gitgraghu we have fixed this issue.