apache / incubator-wayang

Apache Wayang(incubating) is the first cross-platform data processing system.
https://wayang.incubator.apache.org/
Apache License 2.0
184 stars 73 forks source link

Add all plugins #430

Open github-actions[bot] opened 5 months ago

github-actions[bot] commented 5 months ago

Add all plugins

  1. Add plugins

https://github.com/apache/incubator-wayang/blob/b67b404ed7cc3349991844ab6b7b5c4af29c2574/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/serialization/customserializers/MultiContextDeserializer.scala#L78


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.serialization.customserializers

import com.fasterxml.jackson.core.{JsonParser, JsonProcessingException}
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer
import org.apache.wayang.api.MultiContext
import org.apache.wayang.api.serialization.SerializationUtils.mapper
import org.apache.wayang.core.api.Configuration
import org.apache.wayang.java.Java
import org.apache.wayang.postgres.Postgres
import org.apache.wayang.spark.Spark
import org.apache.wayang.sqlite3.Sqlite3

import java.io.IOException

class MultiContextDeserializer extends JsonDeserializer[MultiContext] {

  override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer): AnyRef = {
    this.deserialize(p, ctxt)
  }

  @throws[IOException]
  @throws[JsonProcessingException]
  override def deserialize(jp: JsonParser, ctxt: DeserializationContext): MultiContext = {

    // Deserialize each field of MultiContext separately
    val node: JsonNode = jp.getCodec.readTree(jp)

    val configurationParser: JsonParser = node.get("configuration").traverse(jp.getCodec)
    val configuration: Configuration = mapper.readValue(configurationParser, classOf[Configuration])

    val sinkParser: JsonParser = node.get("sink").traverse(jp.getCodec)
    val sink: Option[MultiContext.UnarySink] = mapper.readValue(sinkParser, new TypeReference[Option[MultiContext.UnarySink]]() {})

    val pluginsParser: JsonParser = node.get("plugins").traverse(jp.getCodec)
    val plugins: List[String] = mapper.readValue(pluginsParser, new TypeReference[List[String]]() {})

    //
    // Create the whole deserialized multi context
    //
    // 1. Add configuration
    val multiContext = new MultiContext(configuration)

    // 2. Add sink
    sink match {
      case Some(MultiContext.TextFileSink(url)) =>
        println(s"It's a TextFileSink with url: $url")
        multiContext.withTextFileSink(url)
      case Some(MultiContext.ObjectFileSink(url)) =>
        println(s"It's an ObjectFileSink with url: $url")
        multiContext.withObjectFileSink(url)
      case None =>
        println("No sink defined")
      case _ =>
        println("Unknown sink type")
    }

    // TODO: Add all plugins
    // 3. Add plugins
    val javaPluginName = Java.basicPlugin.getClass.getName
    val sparkPluginName = Spark.basicPlugin.getClass.getName
    val postgresPluginName = Postgres.plugin().getClass.getName
    // val flinkPluginName = Flink.basicPlugin().getClass.getName
    val sqlite3PluginName = Sqlite3.plugin().getClass.getName

    plugins.foreach {
      case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
      case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
      case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
      // case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
      case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
      case _ => println("Unknown plugin detected")
    }

    multiContext
  }
}

3faf7c0b35880576ada4079a1dfa5524f5522691

Aryant01 commented 2 months ago

Is this issue is still in consideration? I would like to work on this. Can you please assign me to this issue?

Aryant01 commented 1 month ago

Hey @zkaoudi , I have added two more plugins: Hadoop and GraphChi. `// Add all plugins val javaPluginName = Java.basicPlugin().getClass.getName val sparkPluginName = Spark.basicPlugin().getClass.getName val postgresPluginName = Postgres.plugin().getClass.getName val sqlite3PluginName = Sqlite3.plugin().getClass.getName val flinkPluginName = Flink.basicPlugin().getClass.getName val hadoopPluginName = Hadoop.basicPlugin().getClass.getName val graphchiPluginName = GraphChi.basicPlugin().getClass.getName

plugins.foreach {
  case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
  case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
  case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
  case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
  case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
  case pluginName if pluginName == hadoopPluginName => multiContext.register(Hadoop.basicPlugin())
  case pluginName if pluginName == graphchiPluginName => multiContext.register(GraphChi.basicPlugin())
  case _ => println("Unknown plugin detected")
}`

Am I missing something? If that's all i had to do, can you please tell how do I get an approval to make the pool request. I am new to open source contributions. Thankyou.