apache / incubator-wayang

Apache Wayang(incubating) is the first cross-platform data processing system.
https://wayang.incubator.apache.org/
Apache License 2.0
184 stars 73 forks source link

Implement cost model abstraction to be pluggable #396

Closed juripetersen closed 8 months ago

juripetersen commented 8 months ago

This PR proposes an abstraction of the current cost model. As of now, the cost model uses a somewhat hardcoded calculation. To make the optimizers cost calculation more pluggable and allow users to optimize for their desired cost metric, this PR provides an interface called EstimatableCost that can be implemented to define custom ways of calculating costs.

The current hardcoded calculation was moved to DefaultEstimatableCost and is used by default.

Proposed provision of a cost model:

Example from guides/WordCount.java:

public class CustomEstimatableCost implements EstimatableCost {
    /* Provide concrete implementations to match desired cost function(s)
     * by implementing the interface in this class.
     */
}
public class WordCount {
    public static void main(String[] args) {
        /* Create a Wayang context and specify the platforms Wayang will consider */
        Configuration config = new Configuration();
        /* Provision of a EstimatableCost that implements the interface.*/
        config.setCostModel(new CustomEstimatableCost());
        WayangContext wayangContext = new WayangContext(config)
                .withPlugin(Java.basicPlugin())
                .withPlugin(Spark.basicPlugin());
        /*... omitted */
    }
}

In a project, we used this abstraction to implement a proof-of-concept ML runtime estimation as a cost model. Because the ML model just needed to decide between (sub-)plans, we modified the Job class to allow overwriting the decision between these plans in an implementation of EstimatableCost.

Thus, extending the EstimatableCost interface with the following method seems logical:

    public PlanImplementation pickBestExecutionPlan(
        Collection<PlanImplementation> executionPlans,
        ExecutionPlan existingPlan,
        Set<Channel> openChannels,
        Set<ExecutionStage> executedStages);

This method could then be invoked in Job.java like this:

    public PlanImplementation pickBestExecutionPlan(
        Collection<PlanImplementation> executionPlans,
        ExecutionPlan existingPlan,
        Set<Channel> openChannels,
        Set<ExecutionStage> executedStages) {
        return this.configuration
            .getCostModel()
            .getFactory()
            .makeCost()
            .pickBestExecutionPlan(
                executionPlans,
                existingPlan,
                openChannels,
                executedStages
            );
    }