apache / incubator-wayang

Apache Wayang(incubating) is the first cross-platform data processing system.
https://wayang.incubator.apache.org/
Apache License 2.0
174 stars 70 forks source link

Feature kafka source and sink mk 002 #403

Open kamir opened 5 months ago

kamir commented 5 months ago

This is a draft pull request.

I wanted to test the added function readKafkaTopic(topicName: String) which is new in the JavaPlanBuilder.

Here is my test code:

import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; import org.apache.wayang.java.Java;

import java.util.Arrays; import java.util.Collection;

public class KafkaTopicWordCount {

public static void main(String[] args){

    System.out.println( ">>> Apache Wayang Test #01");
    System.out.println( "    We use a Kafka topic and a 'Java Context'.");
    int i = 0;
    for (String arg : args) {
        String line = String.format( "  %d    - %s", i,arg);
        System.out.println(line);
        i=i+1;
    }

    // Settings
    String topicName = args[1];

    // Get a plan builder.
    WayangContext wayangContext = new WayangContext(new Configuration())
            .withPlugin(Java.basicPlugin());
    //        .withPlugin(Spark.basicPlugin());
    JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
            .withJobName(String.format("WordCount (%s)", topicName))
            .withUdfJarOf(KafkaTopicWordCount.class);

    // Start building the WayangPlan.
    Collection<Tuple2<String, Integer>> wordcounts = planBuilder
            // Read the text file.
            .readKafkaTopic(topicName).withName("Load data from topic")

            // Split each line by non-word characters.
            .flatMap(line -> Arrays.asList(line.split("\\W+")))
            .withSelectivity(10, 100, 0.9)
            .withName("Split words")

            // Filter empty tokens.
            .filter(token -> !token.isEmpty())
            .withSelectivity(0.99, 0.99, 0.99)
            .withName("Filter empty words")

            // Attach counter to each word.
            .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

            // Sum up counters for every word.
            .reduceByKey(
                    Tuple2::getField0,
                    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
            )
            .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
            .withName("Add counters")

            // Execute the plan and collect the results.
            .collect();

    System.out.println(wordcounts);
    System.out.println( "### Done. ###" );

}

}

kamir commented 5 months ago

The visibility issue has been solved. My end-2-end test as shown in the comment above can be started. It can find the function now, but it fails still with an error, due to a missing WayangPlan.

[INFO] --- exec:3.0.0:java (default-cli) @ wayang-test-01 ---

Apache Wayang Test #01 We use a Kafka topic and a 'Java Context'. *** Use default topic name: banking-tx-small-csv 16:39:45.821 [KafkaTopicWordCount.main()] ERROR org.apache.wayang.core.optimizer.enumeration.PlanEnumerator - No comprehensive PlanEnumeration. 16:39:45.829 [KafkaTopicWordCount.main()] ERROR org.apache.wayang.core.optimizer.enumeration.PlanEnumerator - Pending enumerations: [] 16:39:45.829 [KafkaTopicWordCount.main()] ERROR org.apache.wayang.core.optimizer.enumeration.PlanEnumerator - Pending concatenations: [] [WARNING] org.apache.wayang.core.api.exception.WayangException: Could not find a single execution plan. at org.apache.wayang.core.optimizer.enumeration.PlanEnumerator.enumerate (PlanEnumerator.java:305) at org.apache.wayang.core.api.Job.createInitialExecutionPlan (Job.java:417) at org.apache.wayang.core.api.Job.doExecute (Job.java:290) at org.apache.wayang.core.util.OneTimeExecutable.tryExecute (OneTimeExecutable.java:41) at org.apache.wayang.core.util.OneTimeExecutable.execute (OneTimeExecutable.java:54) at org.apache.wayang.core.api.Job.execute (Job.java:244) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:120) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:108) at org.apache.wayang.api.PlanBuilder.buildAndExecute (PlanBuilder.scala:105) at org.apache.wayang.api.DataQuanta.collect (DataQuanta.scala:758) at org.apache.wayang.api.DataQuantaBuilder.collect (DataQuantaBuilder.scala:369) at org.apache.wayang.api.DataQuantaBuilder.collect$ (DataQuantaBuilder.scala:367) at org.apache.wayang.api.BasicDataQuantaBuilder.collect (DataQuantaBuilder.scala:448) at KafkaTopicWordCount.main (KafkaTopicWordCount.java:75) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:829) [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3.777 s [INFO] Finished at: 2024-02-06T16:39:45+01:00 [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project wayang-test-01: An exception occured while executing the Java class. Could not find a single execution plan. -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR]

zkaoudi commented 5 months ago

I think you missed the mapping from the KafkaSourceOperator which is a platform-agnostic Wayang operator to its Java-specific implementation. See, for example, here for the FlatMap Operator: https://github.com/apache/incubator-wayang/blob/main/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/FlatMapMapping.java

Then this mapping needs to be registered in the Java platform. https://github.com/apache/incubator-wayang/blob/main/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java

Hope with these it works.

kamir commented 5 months ago

Yes, very good. Thanks for that pointer. I added the mapping, updated my credentials, and voila ....

We get a new error related to the "Load Estimation procedure":

[INFO] --- exec:3.0.0:java (default-cli) @ wayang-test-01 ---

Apache Wayang Test #01 We use a Kafka topic and a 'Java Context'. *** Use default topic name: banking-tx-small-csv

7 ...

Create consumer from DEFAULT PROPERTIES. SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/mkaempf/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/mkaempf/.m2/repository/org/slf4j/slf4j-simple/1.7.13/slf4j-simple-1.7.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. KafkaTopicSource isInitialized=true

8 ...

[WARNING] org.apache.wayang.core.api.exception.WayangException: Job execution failed. at org.apache.wayang.core.api.Job.doExecute (Job.java:330) at org.apache.wayang.core.util.OneTimeExecutable.tryExecute (OneTimeExecutable.java:41) at org.apache.wayang.core.util.OneTimeExecutable.execute (OneTimeExecutable.java:54) at org.apache.wayang.core.api.Job.execute (Job.java:244) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:120) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:108) at org.apache.wayang.api.PlanBuilder.buildAndExecute (PlanBuilder.scala:105) at org.apache.wayang.api.DataQuanta.collect (DataQuanta.scala:758) at org.apache.wayang.api.DataQuantaBuilder.collect (DataQuantaBuilder.scala:369) at org.apache.wayang.api.DataQuantaBuilder.collect$ (DataQuantaBuilder.scala:367) at org.apache.wayang.api.BasicDataQuantaBuilder.collect (DataQuantaBuilder.scala:463) at KafkaTopicWordCount.main (KafkaTopicWordCount.java:78) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:829) Caused by: java.lang.NullPointerException at org.apache.wayang.core.platform.AtomicExecution.estimateLoad (AtomicExecution.java:59) at org.apache.wayang.core.platform.AtomicExecutionGroup.lambda$estimateLoad$0 (AtomicExecutionGroup.java:90) at java.util.stream.ReferencePipeline$3$1.accept (ReferencePipeline.java:195) at java.util.LinkedList$LLSpliterator.forEachRemaining (LinkedList.java:1239) at java.util.stream.AbstractPipeline.copyInto (AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:474) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential (ReduceOps.java:913) at java.util.stream.AbstractPipeline.evaluate (AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.reduce (ReferencePipeline.java:558) at org.apache.wayang.core.platform.AtomicExecutionGroup.estimateLoad (AtomicExecutionGroup.java:91) at org.apache.wayang.core.platform.AtomicExecutionGroup.estimateExecutionTime (AtomicExecutionGroup.java:108) at org.apache.wayang.core.platform.AtomicExecutionGroup.estimateExecutionTime (AtomicExecutionGroup.java:117) at org.apache.wayang.core.platform.PartialExecution.lambda$getOverallTimeEstimate$3 (PartialExecution.java:173) at java.util.stream.ReferencePipeline$3$1.accept (ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining (ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto (AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:474) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential (ReduceOps.java:913) at java.util.stream.AbstractPipeline.evaluate (AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.reduce (ReferencePipeline.java:553) at org.apache.wayang.core.platform.PartialExecution.getOverallTimeEstimate (PartialExecution.java:174) at org.apache.wayang.core.profiling.PartialExecutionMeasurement. (PartialExecutionMeasurement.java:62) at org.apache.wayang.core.api.Job.logExecution (Job.java:705) at org.apache.wayang.core.api.Job.doExecute (Job.java:325) at org.apache.wayang.core.util.OneTimeExecutable.tryExecute (OneTimeExecutable.java:41) at org.apache.wayang.core.util.OneTimeExecutable.execute (OneTimeExecutable.java:54) at org.apache.wayang.core.api.Job.execute (Job.java:244) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:120) at org.apache.wayang.core.api.WayangContext.execute (WayangContext.java:108) at org.apache.wayang.api.PlanBuilder.buildAndExecute (PlanBuilder.scala:105) at org.apache.wayang.api.DataQuanta.collect (DataQuanta.scala:758) at org.apache.wayang.api.DataQuantaBuilder.collect (DataQuantaBuilder.scala:369) at org.apache.wayang.api.DataQuantaBuilder.collect$ (DataQuantaBuilder.scala:367) at org.apache.wayang.api.BasicDataQuantaBuilder.collect (DataQuantaBuilder.scala:463) at KafkaTopicWordCount.main (KafkaTopicWordCount.java:78) at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:254) at java.lang.Thread.run (Thread.java:829)

zkaoudi commented 5 months ago

Hmmm that's interesting but at least we move forward :)

I think because you added the following without adding these keys in the properties files, it throws the execption: @Override public Collection getLoadProfileEstimatorConfigurationKeys() { return Arrays.asList("wayang.java.kafkatopicsource.load.prepare", "wayang.java.kafkatopicsource.load.main"); } Maybe try to add a cost function in the java properties file for the kafkatopisource: https://github.com/apache/incubator-wayang/blob/cd4936e665c2978394943117b44853f7ebabbec8/wayang-platforms/wayang-java/src/main/resources/wayang-java-defaults.properties You can copy the one of the collectionsource for now.

To also check it quickly you could also try to not overload the getLoadProfileEstimatorConfigurationKeys() method.

kamir commented 5 months ago

By not overwriting getLoadProfileEstimatorConfigurationKeys() it works. And with the right definitions in the wayang-java-defaults.properties file ist works as well.

KafkaWordCount Example

KafkaTopicSource isInitialized=true [(address, 9), (city, 9), (orderid, 9), (18, 9), (94041, 9), (ordertime, 9), (zipcode, 9), (itemid, 9), (view, 9), (mountain, 9), (item_184, 9), (1497014222380, 9), (state, 9), (ca, 9)] Done.

zkaoudi commented 5 months ago

That's awesome! The best way is to provide the keys and add the cost function in the properties file.

Now there are some conflicts in some files.. I will check them out and see if I can resolve them in the weekend.

kamir commented 5 months ago

KafkaSource and KafkaSink are ready.

I will add tests later this week, or maybe next weekend.

But my external KafkaWordCountDemo works.

import org.apache.wayang.api.JavaPlanBuilder; import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.api.Configuration; import org.apache.wayang.core.api.WayangContext; import org.apache.wayang.core.function.FunctionDescriptor; import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator; import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; import org.apache.wayang.java.Java;

import java.util.Arrays; import java.util.Collection;

public class KafkaTopicWordCount {

// Define the lambda function for formatting the output
private static final FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf = tuple -> {
    return tuple.getField0() + ": " + tuple.getField1();
};

public static void main(String[] args){

    System.out.println( ">>> Apache Wayang Test #01");
    System.out.println( "    We use a Kafka topic and a 'Java Context'.");

    // Default topic name
    String topicName = "banking-tx-small-csv";

    // Check if at least one argument is provided
    if (args.length > 0) {
        // Assuming the first argument is the topic name
        topicName = args[0];

        int i = 0;
        for (String arg : args) {
            String line = String.format( "  %d    - %s", i,arg);
            System.out.println(line);
            i=i+1;
        }

    }
    else {
        System.out.println( "*** Use default topic name: " + topicName );
    }

    Configuration configuration = new Configuration();
    // Get a plan builder.
    WayangContext wayangContext = new WayangContext(configuration)
            .withPlugin(Java.basicPlugin());
    //        .withPlugin(Spark.basicPlugin());
    JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
            .withJobName(String.format("WordCount (%s)", topicName))
            .withUdfJarOf(KafkaTopicWordCount.class);

/* // Start building the WayangPlan. Collection<Tuple2<String, Integer>> wordcounts_collection = planBuilder // Read the text file. .readKafkaTopic(topicName).withName("Load data from topic")

            // Split each line by non-word characters.
            .flatMap(line -> Arrays.asList(line.split("\\W+")))
            .withSelectivity(10, 100, 0.9)
            .withName("Split words")

            // Filter empty tokens.
            .filter(token -> !token.isEmpty())
            .withSelectivity(0.99, 0.99, 0.99)
            .withName("Filter empty words")

            // Attach counter to each word.
            .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

            // Sum up counters for every word.
            .reduceByKey(
                    Tuple2::getField0,
                    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
            )
            .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
            .withName("Add counters")

            // Execute the plan and collect the results.
            .collect();

    System.out.println( wordcounts_collection );
    System.out.println( "### Done. ###" );
 */

    // Start building the WayangPlan.
    planBuilder
            // Read the text file.
            .readKafkaTopic(topicName).withName("Load data from topic")

            // Split each line by non-word characters.
            .flatMap(line -> Arrays.asList(line.split("\\W+")))
            .withSelectivity(10, 100, 0.9)
            .withName("Split words")

            // Filter empty tokens.
            .filter(token -> !token.isEmpty())
            .withSelectivity(0.99, 0.99, 0.99)
            .withName("Filter empty words")

            // Attach counter to each word.
            .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

            // Sum up counters for every word.
            .reduceByKey(
                    Tuple2::getField0,
                    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
            )
            .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
            .withName("Add counters")

            // Execute the plan and store the results in Kafka.
            //.writeKafkaTopic("file:///Users/mkaempf/GITHUB.private/open-sustainability-data/bin/test_23456.txt", d -> String.format("%.2f, %d", d.getField1(), d.getField0()), "job_test_1",
            .writeKafkaTopic("test_23456", d -> String.format("%d, %s", d.getField1(), d.getField0()), "job_test_1",
                    LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load", configuration) );

}

}

kamir commented 4 months ago

For testing the SparkKafka integration I used this test class:

import org.apache.wayang.api.DataQuantaBuilder;
import org.apache.wayang.api.FilterDataQuantaBuilder;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.api.ReduceByDataQuantaBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;

public class KafkaTopicWordCountSpark implements Serializable {

    public KafkaTopicWordCountSpark() {}

    // Define the lambda function for formatting the output
    private static final FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> udf = tuple -> {
        return tuple.getField0() + ": " + tuple.getField1();
    };

    public static void main(String[] args){

        System.out.println( ">>> Apache Wayang Test #02");
        System.out.println( "    We use a Kafka topic and an 'Apache Spark Context'.");

        // Default topic name
        String topicName = "banking-tx-small-csv";

        // Check if at least one argument is provided
        if (args.length > 0) {
            // Assuming the first argument is the topic name
            topicName = args[0];

            int i = 0;
            for (String arg : args) {
                String line = String.format( "  %d    - %s", i,arg);
                System.out.println(line);
                i=i+1;
            }
        }
        else {
            System.out.println( "*** Use default topic name: " + topicName );
        }

        Configuration configuration = new Configuration();
        // Get a plan builder.
        WayangContext wayangContext = new WayangContext(configuration)
                .withPlugin(Spark.basicPlugin());

        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
                .withJobName(String.format("WordCount via Spark on Kafka topic (%s)", topicName))
                .withUdfJarOf(KafkaTopicWordCountSpark.class);

        // Start building the WayangPlan.
        //Collection<Tuple2<String, Integer>> wordcounts_collection =

        planBuilder
                // Read the records from a Kafka topic.
                .readKafkaTopic(topicName).withName("Load-data-from-topic")

                // Split each line by non-word characters.
                .flatMap(line -> Arrays.asList(line.split("\\W+")))
                .withSelectivity(10, 100, 0.9)
                .withName("Split-words")

                // Filter empty tokens.
                .filter(token -> !token.isEmpty())
                .withSelectivity(0.99, 0.99, 0.99)
                .withName("Filter empty words")
                // Attach counter to each word.
                .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To-lower-case, add-counter")

                // Sum up counters for every word.
                .reduceByKey(
                        Tuple2::getField0,
                        (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
                )
                .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
                .withName("Add counters")
                // .collect();

                // Execute the plan and store the results in Kafka.
                //.writeKafkaTopic("file:///Users/mkaempf/GITHUB.private/open-sustainability-data/bin/test_23456789.txt", d -> String.format("%.2f, %d", d.getField1(), d.getField0()), "job_test_1",
                .writeKafkaTopic("test_23456", d -> Util.formatData( d.getField0(), d.getField1() ), "job_test_2",
                        LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load", configuration) );

    }

With this utility class:

import java.io.Serializable;

public class Util implements Serializable {
    public static String formatData( String f1, Integer f2 ) {
        return String.format("%d, %s", f1, f2);
    }
}

Currently I still run into NotSerializableExceptions.

More details have been shared on the dev mailinglist.

I push the code so that others can see where I made changes so far, and maybe, someone has an idea where to remove the ugly blocker.

2pk03 commented 3 months ago

@zkaoudi @kamir - can you please review?

zkaoudi commented 3 months ago

@2pk03 for some reason there are conflicts which must be resolved before merging.