kube-HPC / hkube

🐟 High Performance Computing over Kubernetes - Core Repo 🎣
http://hkube.io
MIT License
303 stars 20 forks source link

Doc #1327

Open nassiharel opened 2 years ago

nassiharel commented 2 years ago

@yehiyam

@golanha

@maty21

@NassiHarel

@tamir321

intro

this feature is.....

screenshots

maty21 commented 2 years ago

Template proposal

Feature name

Introduction

here we will write in a few sentences a short explanation about the feature and what is good for

Use Cases (Optional)

Feature List(Optional)

How to work with

Advnaced (Optional)

maty21 commented 2 years ago

we can use k8s style guide k8s style guide

maty21 commented 2 years ago

streaming

Streaming

Introduction

While Batch processing is need for some cases, such as gathering and data enrichment, there are other cases where the data is generated continuously, which typically send in the data records simultaneously. Streaming data includes a wide variety of data such as log files generated by customers using your mobile or web applications, eCommerce purchases, in-game player activity, information from social networks, financial trading floors, or geospatial services telemetry from connected devices or instrumentation in data centers.

HKube's data streaming is an extension to hkube batch processing pipeline architecture that handles millions of events at scale, In real-time. As a result, you can collect, analyze, and store large amounts of information.

That capability allows for applications, analytics, and reporting in real-time.

Use Cases - Stream Tweets in real-time

So where are hkube data streams good for? we can take a look on stream from Twitter as an example on this particular case we want to enrich the data from other resources for example Facebook or Linkedin and other internal databases before saving it

image0001

Features

Hkube streaming pipeline supports :

Unique data transportation

hkube as its own data transportation which allows sending data directly between nodes in that way we can ensure the follows

Autoscaling

The throughput of streaming can be varied over time so we will able to handle bursts and also free resources for other jobs in case it's not needed

With its own unique heuristic system hkube able to recognize changes in throughput and to act pretty fast to support the needs

To understand it lets look at a scenario the demonstrate how hkube handle pressures

conditional data flows

On streaming data in the majority of the time, we want that the data will move on a specific flow but there are scenarios when we want to change the flow dynamically. To understand it let take the twitter use case for example, in the majority of the cases we just want to enrich the data with more data from other resources but for example, in case that we cant recognize the post writer for some reason we want to create other prerequisites before the enrichment. Hkube helps you to handle that situation with conditional data flow we will explain later how to create and work with this feature

How to work with

Stateful algorithm

Stateless algorithm

Streaming Flow

e.g: "streaming":{

"flows":{

"analyze":"sort>>A"

"master":"twitt >>sort>>B"

}

"defaultFlow":"master"

}  

Advnaced

HKUBE API STREAMING METHODS for Stateful algorithm

The onMessage signature is onMessage(msg, origin) where the origin is the name of the previous node.

tamir321 commented 2 years ago

CodeAPI

Introduction

HKUBE Code API is a set of JAVA/PYTHON/JavaScript API that you can invoke in your algorithm code in order to start sub-pipeline or algorithm

Python

Start algorithm

Starts algorithm execution with input within the current pipeline and waits for results

Start stored sub pipeline

Starts sub-pipeline execution with input and waits for results

Start raw sub pipeline

Starts sub-pipeline execution with input, nodes, options, and waits for results

Code example

def start(args, hkubeapi):
    input = args['input'][0]
    if input and input["action"] == "start_alg":
        ret = hkubeapi.start_algorithm("black-alg", [7], includeResult=True)
        return ret

    if input and input["action"] == "start_stored_subpipeline":
        ret = hkubeapi.start_stored_subpipeline("simple", {"files": {"link": "links-1"}}, includeResult=True)
        return ret

    if input and input["action"] == "start_raw_subpipeline":
        subPipeOp = {
            "batchTolerance": "100",
            "concurrentPipelines": {
                "amount": "10",
                "rejectOnFailure": "true"
            },
            "progressVerbosityLevel": "info",
            "ttl": "3600"
        }
        flowInput = {
            "files": {
                "link": "links-1"
            }
        }
        nodes = [{"nodeName": "one",
                  "algorithmName": "green-alg",
                  "input": []},
                 {"nodeName": "two",
                  "algorithmName": "black-alg",
                  "input": ["@one"]}]
        ret = hkubeapi.start_raw_subpipeline("raw-sub-pipeline", nodes, flowInput, options=subPipeOp, webhooks={},
                                             includeResult=True)
        return ret

return 42

Java

Start algorithm

Starts algorithm execution with input within the current pipeline and waits for results

Start stored sub pipeline

Starts sub-pipeline execution with input and waits for results

Start raw sub pipeline

Starts sub-pipeline execution with input, nodes, options and waits for results

Code example

import hkube.algo.wrapper.IAlgorithm;

import hkube.api.IHKubeAPI; import hkube.api.INode; import org.json.JSONObject; import java.util.*;

public class javaCodeApi implements IAlgorithm {

@Override
public void Init(Map args) {

}

@Override
public Object Start(Map args, IHKubeAPI hkubeAPI) throws Exception {

    Collection<Object> input = (Collection<Object>)args.get("input");
    Map action = (Map) input.iterator().next();

    String act = (String)action.get("action");

    if(act.equals("startAlg")){
        List<String> list=new ArrayList<String>();
        list.add("4");
        Object res =  hkubeAPI.startAlgorithm("green-alg",list ,true);
        return res;
    }

    if(act.equals("startStored")){
        Object res =   hkubeAPI.startStoredPipeLine("simple",new HashMap());
        return res;
    }

    if(act.equals("startRaw")){
        INode jnkNode = createNode("one","green-alg" , "42");
        INode[] nodes ={jnkNode};
        Object res =   hkubeAPI.startRawSubPipeLine("raw-pipe",nodes,new HashMap(),new HashMap(),new HashMap());
        return res;
    }

    return  42;
}

private INode createNode(String nodeName,String algName,String nodeInput) {
    JSONObject obj=new JSONObject();
    obj.put("input",nodeInput);
    JSONObject[] obj1 =new JSONObject[1];
    obj1[0]=obj;

    INode node = new INode() {
        @Override
        public String getName() {
            return nodeName;
        }

        @Override
        public JSONObject[] getInput() {
            return obj1;
        }

        @Override
        public void setInput(JSONObject[] input) {

        }

        @Override
        public String getAlgorithmName() {
            return algName;
        }

        @Override
        public void setAlgorithmName(String algName) {

        }

    };
    return node;
}

@Override
public void Stop() {

}

@Override
public void Cleanup() {

}

}

Start algorithm asynchroniesly

Starts algorithm execution with input within the current pipeline and returns Future Than will later of be filled with the result value.

Start stored sub pipeline asynchroniesly

Starts sub-pipeline execution with input and and returns Future Than will later of be filled with the result value.

Start raw sub pipeline asynchroniesly

Starts sub-pipeline execution with input, nodes, options and and returns Future Than will later of be filled with the result value.

Code example

import hkube.algo.wrapper.IAlgorithm;
import hkube.api.IHKubeAPI;
import hkube.api.INode;
import org.json.JSONObject;
import java.util.*;

public class javaCodeApi implements IAlgorithm {

    @Override
    public void Init(Map args) {

    }
    private Map returnWhenExecDone(APIExecutionFuture future) {
        while (!future.isDone()) {
           try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

@Override
public Object Start(Map args, IHKubeAPI hkubeAPI) throws Exception {

    Collection<Object> input = (Collection<Object>)args.get("input");
    Map action = (Map) input.iterator().next();

    String act = (String)action.get("action");

    if(act.equals("startAlg")){
        List<String> list=new ArrayList<String>();
        list.add("4");
        APIExecutionFuture futureResult =  hkubeAPI.startAlgorithmAsynch("green-alg",list ,true);
        res = returnWhenExecDone (futureResult);
        return res;
    }

    if(act.equals("startStored")){
        APIExecutionFuture futureResult =   hkubeAPI.startStoredPipeLineAsynch("simple",new HashMap());
        res = returnWhenExecDone (futureResult);
        return res;
    }

    if(act.equals("startRaw")){
        INode jnkNode = createNode("one","green-alg" , "42");
        INode[] nodes ={jnkNode};
         APIExecutionFuture futureResult =   hkubeAPI.startRawSubPipeLineAsynch("raw-pipe",nodes,new HashMap(),new HashMap(),new HashMap());
        res = returnWhenExecDone (futureResult);
        return res;
    }

    return  42;
}

private INode createNode(String nodeName,String algName,String nodeInput) {
    JSONObject obj=new JSONObject();
    obj.put("input",nodeInput);
    JSONObject[] obj1 =new JSONObject[1];
    obj1[0]=obj;

    INode node = new INode() {
        @Override
        public String getName() {
            return nodeName;
        }

        @Override
        public JSONObject[] getInput() {
            return obj1;
        }

        @Override
        public void setInput(JSONObject[] input) {

        }

        @Override
        public String getAlgorithmName() {
            return algName;
        }

        @Override
        public void setAlgorithmName(String algName) {

        }

    };
    return node;
}

@Override
public void Stop() {

}

@Override
public void Cleanup() {

}

}    

Node JS

Start algorithm

Starts algorithm execution with input within the current pipeline and waits for results

Start stored sub pipeline

Starts sub-pipeline execution with input and waits for results

Start raw sub pipeline

Starts sub-pipeline execution with input, nodes, options, and waits for results

Code example

const start = async (args, api) => {
let ret="did zero action"

input=args['input'][0]
if (input.hasOwnProperty("action")){
    if (input.action == "startAlg"){
         ret = await api.startAlgorithm("green-alg",[4]);
    }
    if (input.action == "startStored"){
         ret = await api.startStoredSubpipeline("simple",{"files": {"link": "links-1"}});
    }
    if (input.action == "startRaw"){
      const subPipeOption={
           "batchTolerance": "100",
           "concurrentPipelines": {
               "amount": "10",
               "rejectOnFailure": "true"
           },
           "progressVerbosityLevel": "info",
           "ttl": "3600"
       }
      const flowInput = {
                     "files": {
                          "link": "links-1"
                     }
       }
      const nodes = [{"nodeName": "one",
                     "algorithmName": "green-alg",
                     "input": []},
                {"nodeName": "two",
                     "algorithmName": "black-alg",
                     "input": ["@one"]}]

      ret = await api.startRawSubpipeline("raw-pipe",nodes,flowInput,subPipeOption);

      }       
}
return ret;

} module.exports = { start }

tamir321 commented 2 years ago

Caching / Run Node

Introduction

Hkube pipeline execution can be long and complex

Hkube Caching allows you to run job from a specific node and getting the data of the predecessors from caching of the previous run, saving time and resources enable you to monitor or debug the relevant Node without the need to wait for previous prosses to complete

Note: the Caching is only relevant to the batch pipeline

Use Cases

After executing the following job you suspect that the "yellow" node does not calculate its output properly and you wish to rerun the job with the same input while monitoring the yellow node during execution, to avoid rerunning the "red" and the "green" process you can select the yellow node and run it using the caching.

simple.png

The "red" and "green" output will be taken from storage saving processing time and resources simple2.png

How to activate cache

UI

Rest API

golanha commented 2 years ago

Setup a run debug java configuration

run configuraion.jpg a. Main class.

Java main class is hkube.algo.wrapper.Main from artifact io.hkube:wrapper:2.0-SNAPSHOT (also found in https://oss.sonatype.org/content/repositories/snapshots)

b. Runtime ClassPath

   1) io.hkube:wrapper:2.0-SNAPSHOT and its dependencies.

   2) The written algorithm module.

image.png When running the algorithm locally you need to set 2 environment variables:

  1) ALGORITHM_ENTRY_POINT - The name of the class you wrote implementing IAlgorithm.

  2) WORKER_SOCKET_URL - path obtained from a the debug algorithm defined on hkube deployment. (example ws://63.34.172.241/hkube/debug/something)

  Instead of setting these environment variables, you can also add a config.properties file to the running classpath root directory and set these environment variable names as keys.

            WORKER_SOCKET_URL=ws://cd.hkube.io/hkube/debug/green-alg
            ALGORITHM_ENTRY_POINT=Algorithm

d. Program argument 'debug'

 To avoid the program attempting to load the algorithm from a jar, 'debug' should be given as a program argument.
tamir321 commented 2 years ago

Debug algorithm

Introduction

Hkube enable debugging algorithm on your local IDE as part of a pipeline executed on the cluster

Use Cases

After executing the following job you suspect that the "yellow" node does not calculate its output properly and you wish to debug it in your local IDE

simpleDebug.png

How to Debug your algorithm

HKUBE has four options for debug algorithm:

Obtaining the debugging URL

TBD

On your IDE

In order to debug your algorithm locally, install the "HKUBE wrapper" package

After installing the "HKUBE wrapper" package add the following code to your project and start debugging :

Python:


from hkube_python_wrapper import Algorunner

import algorithm.main as algorithm #the path of you "start" function

def main():

        print("starting algorithm runner")
        Algorunner.Debug("<the debugging URL>",algorithm.start)

if __name__ == "__main__":
    main()

NodeJS

const NodejsWrapper = require('@hkube/nodejs-wrapper');
const alg = require("./hkubeApiImage") //the path of you "start" function
const main = async () => {
    NodejsWrapper.debug("<the debugging URL>",alg)
}

main()
golanha commented 2 years ago

Tensorboard integration

Hkube allows to view data generated by tensorflow of users algorithm in Tensorboard UI.

logdir

An algorithm using tensorflow generates tensorboard log data. In the algorithm code when creating Tensorboard object a logDir parameter is passed. For integration with Hkube the logDir value passed, must first be obtained from the environment variable : ALGO_METRICS_DIR.

log_dir = os.environ['ALGO_METRICS_DIR']
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
model.fit(x=x_train,
              y=y_train,
              epochs=5,
              validation_data=(x_test, y_test),
              callbacks=[tensorboard_callback])

Loading metrics in Tensorboard

tenosrboard1.jpg In hkube ui pipelines tab, its possible to identify which pipline have ran and created tensorboard metrics by the orange label "metrics" Hovering over the actions icon pops up a "Create tensorboard for selected nodes" window. You can now select which nodes to create the tensorboard for, and click "create board".

Link to Tensorboard UI

tenosr2.jpg Once board is created a label "boards" is added. Hovering over the "boards" pops up the link to the board displaying the data of the collected metrics. tensor3.jpg

golanha commented 2 years ago

Output node

A user can define a pipeline's custom output structure and content. This out put may contain output from nodes which are not leafs and have child nodes, it may contain a sub set of a pipelines default output. To define such output a node of kind "output" is added to the pipeline. This node output is equal to its input, what ever is defined in the output node input attribute will be in the algorithm result. An output node can not have an empty input attribute. An output node can not have other nodes depend on him, it must me the pipelines last node.

golanha commented 2 years ago

hyperParam Node

hyperparams node invokes an objective pipeline with a range of different hyperparams values - according to configuration, and helps to determine the optimal hyperparm

{
"kind": "hyperparamsTuner",
"nodeName": "optimizerNode",
"spec": {
"numberOfTrials": 9,
"objectivePipeline": "Green",
"hyperParams": [
{
"suggest": "uniform",
"name": "x",
"low": -10,
"high": 10
}
]
}
}

numberOfTrials - Is the number of times the objective pipeline will be invoked.

hyperParams - a set of hyperparams definitions, For each hyperparam:

  suggest - The hyperParams node suggests different values. 

   There are five different suggestion methods:
  - uniform          *also need to define low and high numbers
  - loguniform      *also need to define low and high numbers
  - int                   *also need to define low and high numbers
  - discrete_uniform  *also need to define low and high numbers
  - categorical        *also need to define choices array.

objectivePipeline - The name of the pipeline to invoked with the suggested hyperparams values.

{
"kind": "hyperparamsTuner",
"nodeName": "optimizerNode",
"spec": {
"numberOfTrials": 100,
"objectivePipeline": "Green",
"sampler": { "name": "Grid" ,"search_space": { "x": [1,2], "y" :[3,5] } },
]
}
}

sampler - a different way to get the suggested hyperparams values is using a sampler. The for the sampler the following attributes must be defined: name: currently only the value Grid is supported search_space - a dictionary of hyperparams as key and choices array as value.

Objective pipeline

The flowinput of the objective pipeline contains an object named hyperParams with the hyperparam name as attribute name and the suggested value for it as value.