apache / incubator-wayang

Apache Wayang(incubating) is the first cross-platform data processing system.
https://wayang.incubator.apache.org/
Apache License 2.0
174 stars 70 forks source link

this should iterate through previous REDESIGN #443

Open github-actions[bot] opened 2 months ago

github-actions[bot] commented 2 months ago

this should iterate through previous REDESIGN

https://github.com/apache/incubator-wayang/blob/c25c6561bf786d76d0dc717b28cf15885c269232/python/old_code/pywayang/src/pywy/orchestrator/operator.py#L109


#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pickle
import cloudpickle
from pywy.config.config_reader import get_source_types
from pywy.config.config_reader import get_sink_types
from pywy.config.config_reader import get_boundary_types
import logging

pickle_protocol = pickle.HIGHEST_PROTOCOL

# Describes an Operation over an intermediate result
# Each operation could be processed by Python or Java platforms
class Operator:

    def __init__(
            self, operator_type=None, udf=None, previous=None,
            iterator=None, python_exec=False
    ):

        # Operator ID
        self.id = id(self)

        # Operator Type
        self.operator_type = operator_type

        # Set Boundaries
        if self.operator_type in get_boundary_types():
            self.boundary = True
        else:
            self.boundary = False

        # UDF Function
        self.udf = udf

        # Source types must come with an Iterator
        self.iterator = iterator
        if operator_type in get_source_types():
            if iterator is None:
                print("Source Operator Type without an Iterator")
                raise
            else:
                self.source = True
        else:
            self.source = False

        # Sink Operators
        if operator_type in get_sink_types():
            self.sink = True
        else:
            self.sink = False

        # TODO Why managing previous and predecessors per separate?
        self.previous = previous

        self.successor = []
        self.predecessor = []

        self.parameters = {}

        # Set predecessors and successors from previous
        if self.previous:
            for prev in self.previous:
                if prev is not None:
                    prev.set_successor(self)
                    self.set_predecessor(prev)

        self.python_exec = python_exec

        logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
                     str(self.python_exec) +
                     ", is boundary: " + str(self.is_boundary()) + ", is source: " +
                     str(self.source) + ", is sink: " + str(self.sink))

    def getID(self):
        return self.id

    def is_source(self):
        return self.source

    def is_sink(self):
        return self.sink

    def is_boundary(self):
        return self.boundary

    def serialize_udf(self):
        self.udf = cloudpickle.dumps(self.udf)

    def getIterator(self):
        if self.is_source():
            return self.iterator
        # TODO this should iterate through previous REDESIGN
        return self.udf(self.previous[0].getIterator())

    def set_parameter(self, key, value):
        self.parameters[key] = value

    def set_successor(self, suc):
        if (not self.is_sink()) and self.successor.count(suc) == 0:
            self.successor.append(suc)

    def set_predecessor(self, suc):
        if self.predecessor.count(suc) == 0:
            self.predecessor.append(suc)

4baec02253eb7358e50ee51940e28e5b8b26a89f