flyteorg / flyte

Scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks.
https://flyte.org
Apache License 2.0
5.26k stars 553 forks source link

[Core feature] ability to sample df's on serialisation #5050

Open andrewm4894 opened 4 months ago

andrewm4894 commented 4 months ago

Motivation: Why do you think this is important?

when we pass df to and from flyte tasks - is there any native way to ask Flyte to do sampling as its reading and writing the data? eg i sometimes want to just randomly sample or write N rows to/from a task and wondering if Flyte could in some way do this natively?

use case here is say task A makes a big df task B1 and B2 need that as input - B1 needs it all but B2 actually only needs a random sample of N from df

Goal: What should the final outcome look like, ideally?

maybe I could pass the sampling param to the task decorator.

Describe alternatives you've considered

Maybe it's a horrible idea idk. Just that I find myself having some tasks where first thing I do is quickly downsample the data right away.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

Have you read the Code of Conduct?

welcome[bot] commented 4 months ago

Thank you for opening your first issue here! 🛠

dosubot[bot] commented 4 months ago

Hey there @andrewm4894! Great to see you bringing up another interesting idea around here. Hope you've been doing well!

Based on the information available in the Flyte repository, there isn't a native way in Flyte to perform sampling, such as randomly sampling or writing a specific number of rows, when reading and writing data to and from tasks. The data sampling operation is typically performed within the task's logic, using the data processing libraries of your choice (e.g., pandas, PySpark, etc.) after fetching the data. Flyte tasks support custom Python code, allowing you to use any Python library for data manipulation, including sampling, within the task functions.

However, Flyte's plugin system is designed to be extensible, allowing developers to create custom plugins or modify existing ones to meet specific requirements. For instance, you could potentially modify the OutputWriter interface methods in the plugin code to include data sampling logic when writing task outputs to storage. Here's a simplified example of how you might extend the OutputWriter interface to include data sampling:

type CustomOutputWriter struct {
    io.OutputWriter // Embed the existing OutputWriter for standard functionality
}

// NewCustomOutputWriter creates a new instance of CustomOutputWriter
func NewCustomOutputWriter(baseWriter io.OutputWriter) *CustomOutputWriter {
    return &CustomOutputWriter{OutputWriter: baseWriter}
}

// PutOutput implements custom logic for data sampling during serialization
func (cow *CustomOutputWriter) PutOutput(ctx context.Context, reader io.OutputReader) error {
    // Implement data sampling logic here
    // For example, read the full dataset, sample it, and then write the sampled data
    sampledData, err := sampleData(reader)
    if err != nil {
        return err
    }

    // Use the embedded OutputWriter to write the sampled data
    return cow.OutputWriter.Put(ctx, sampledData)
}

// sampleData is a hypothetical function that takes an OutputReader and returns sampled data
func sampleData(reader io.OutputReader) ([]byte, error) {
    // Implement the data sampling logic here
    return nil, nil
}

In this example, CustomOutputWriter extends the existing OutputWriter by embedding it and overriding the PutOutput method to include data sampling logic. This approach allows you to intercept the process of writing outputs and manipulate the data as needed.

To integrate this custom output writer into a Flyte task or plugin, you would instantiate CustomOutputWriter with the base output writer and use it in place of the standard output writer in the task's execution context.

This example demonstrates the extensibility of Flyte plugins and how they can be adapted to include additional functionality like data sampling. However, please note that this would require a good understanding of the Flyte plugin system and the specific plugin you're modifying.

As for the Flyte v1.11.0 milestone release, it includes a new feature that could potentially be used for data sampling during serialization. The feature is described as "Improved Caching: Support for loading cached sublists with multiple data types," which eliminates issues related to cache retrieval across varied data formats. This improvement could be leveraged for more efficient data sampling during serialization by utilizing the enhanced caching mechanism to handle diverse data types more effectively.

I hope this information is helpful. If you have further questions or need more clarification, feel free to ask.

Sources

#### About Dosu This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant. If you want to continue the conversation, start your reply with @dosu-bot.
eapolinario commented 4 months ago

I feel like there might be a great integration with Ibis and Flyte's Structured Datasets.