apache / incubator-wayang

Apache Wayang(incubating) is the first cross-platform data processing system.
https://wayang.incubator.apache.org/
Apache License 2.0
184 stars 73 forks source link

Add Tensorflow operators to python API #454

Closed juripetersen closed 1 month ago

juripetersen commented 4 months ago

Changes to Configuration

In order to prevent users of the Python API having to meddle with wayang.properties files, the Python API now supports a version of Configuration as we know it from Wayang:

config = Configuration()
config.set_property("wayang.api.python.worker", "/var/www/html/python/src/pywy/execution/worker.py")

# named functions with  signatures
ctx = WayangContext(config) \
  .register({JavaPlugin, SparkPlugin}) \
   .textfile("file:///var/www/html/README.md") \
   .flatmap(fm_func) \
   .filter(filter_func) \
   .map(map_func) \
   .reduce_by_key(key_func, reduce_func) \
   .sort(sort_func) \
   .store_textfile("file:///var/www/html/data/wordcount-out-python.txt", (str, int))

Tensorflow Operators

Both DLTrainingOperator and PredictOperator have been added to the Tensorflow platform in the Python API. This comes with a number of Ops and Models.

      l1 = Linear(4, 64, True)
      s1 = Sigmoid()
      l2 = Linear(64, 3, True)

      s1.with_ops(l1.with_ops(Input(Input.Type.FEATURES)))
      l2.with_ops(s1)

      model = DLModel(l2)

      criterion = CrossEntropyLoss(3)
      criterion.with_ops(
          Input(Input.Type.PREDICTED),
          Input(Input.Type.LABEL, Op.DType.INT32)
      )
      acc = Mean(0)
      acc.with_ops(
          Cast(Op.DType.FLOAT32).with_ops(
              Eq().with_ops(
                  ArgMax(1).with_ops(
                      Input(Input.Type.PREDICTED)
                  ),
                  Input(Input.Type.LABEL, Op.DType.INT32)
              )
          )
      )

      optimizer = GradientDescent(0.02)
      option = Option(criterion, optimizer, 6, 100)

      floats: List[List[int]] = [[5.1, 3.5, 1.4, 0.2]]

      ints: List[List[int]] = [[0, 0, 1, 1, 2, 2]]

      ctx = WayangContext() \
          .register({JavaPlugin, SparkPlugin, TensorflowPlugin})
      trainXSource = ctx.textfile("file:///var/www/html/README.md").map(lambda x: floats, str, List[List[float]])
      trainYSource = ctx.textfile("file:///var/www/html/README.md").map(lambda x: floats, str, List[List[float]])
      testXSource = ctx.textfile("file:///var/www/html/README.md").map(lambda x: floats, str, List[List[float]])

      trainXSource.dlTraining(model, option, trainYSource, List[List[float]], List[List[float]]) \
          .predict(testXSource, List[List[float]], List[List[float]]) \
          .map(lambda x: "Test", List[List[float]], str) \
          .store_textfile("file:///var/www/html/data/wordcount-out-python.txt", List[float])

I would argue that there is still work to do in order to create a more broad way of opening up multi-language API support with the current JSON API, but I will create further issues that lead to a more streamlined use of both the JSON API and the Python API.

juripetersen commented 2 months ago

@zkaoudi I made the changes to pass the build actions/workflows and added features as described in the description of this PR.