flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.73k stars 649 forks source link

[Core feature] map_task should be able to handle a partitioned StructuredDataset #3226

Open cosmicBboy opened 1 year ago

cosmicBboy commented 1 year ago

Motivation: Why do you think this is important?

As a data practitioner, I should be able to apply a map_task to a partitioned StructuredDataset automatically so that I can process the partitions in an embarrassingly parallel fashion without too much extra code.

Goal: What should the final outcome look like, ideally?

Suppose we have a task that produces a StructuredDataset

@task
def make_df() -> StructuredDataset:
    df = pd.DataFrame.from_records([
        {
            "id": i,
            "partition": (i % 10) + 1,
            "name": "".join(
                random.choices(string.ascii_uppercase + string.digits, k=10)
            )
        }
        for i in range(1000)
    ])
    return StructuredDataset(dataframe=df, partition_col=["partition"])

Ideally, I should be able to do something like this:

@task
def process_df(dataset: StructuredDataset) -> StructuredDataset:
    df = structured_dataset.open(pd.DataFrame).read_partition()  # read the partition
    ... # do stuff

@task
def use_processed_df(dataset: List[StructuredDataset]) -> ...:
    ...

@workflow
def wf() -> StructuredDataset:
    structured_dataset = make_df()
    # where structured_dataset.partitions is a list of unpartitioned StructuredDatasets
    results: List[StructuredDataset] = map_task(process_df)(dataset=structured_dataset.partitions)
    return use_processed_df(dataset=results)

Note that in this example code a few magical things are happening:

  1. we pass in structured_dataset.partitions into the map task, which indicates that we want to apply process_df to each of the partitions defined in make_df
  2. The fact that map_task(process_df) returns a StructuredDataset implies that using map tasks with structured datasets does an implicit reduction, i.e. the outputs of map_task(process_df) are written to the same blob store prefix.

Ideally the solution enables processing of StructuredDataset without having to manually handle reading in of partitions in the map task, and automatically reduces the results into a StructuredDataset without having to explicitly write a coalense/reduction task.

Describe alternatives you've considered

Users would have to roll their own way of processing partitions of a structured dataset using dynamic tasks.

Propose: Link/Inline OR Additional context

Slack context: https://flyte-org.slack.com/archives/CP2HDHKE1/p1673380243923279

Related to https://github.com/flyteorg/flyte/issues/3219

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

hamersaw commented 1 year ago

@cosmicBboy the discussion was to implement this entirely in flytekit then? So processing a single StructuredDataset partition does not necessarily have to align with a map_task but could be performed with any task - just mapping over the partitions is an ergonomic API?

kumare3 commented 1 year ago

@hamersaw / @cosmicBboy I feel this is a larger story than just structured dataset. I would love to be able to map over any mappable entity. For example, a Flat directory of files, a List of values, a Map of key-values, StructuredDataset of partitions etc..

Thus for me, it feels like a materialized trait on the Literal that makes it possible to map over that object

cosmicBboy commented 1 year ago

yep, the code example in the description is kinda gross because it special-cases StructuredDataset. However, if we have a notion of mappable types in Flyte, with natural (but overrideable ways) of reducing the the results of applying a function via map tasks, that would be 🔥

github-actions[bot] commented 1 year ago

Hello 👋, This issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏