knix-microfunctions / knix

Serverless computing platform with process-based lightweight function execution and container-based application isolation. Works in Knative and bare metal/VM environments.
https://knix.io
Apache License 2.0
201 stars 26 forks source link

knix can not support parallel states. #152

Closed lambda7xx closed 1 year ago

lambda7xx commented 1 year ago

my code for this workflow is below. I think there is some bug in knix

import time
import logging
from mfn_sdk import MfnClient

host = 'myhost'

c = MfnClient(
    mfn_url=f'http://{host}',
    mfn_user="lmyemail",
    mfn_password="mypassword",
    # mfn_name="test",
    proxies={
        "http": f'http://{host}:12345',
        "https": f'http://{host}:23456'
    })

logging.basicConfig(level=logging.DEBUG)

workflow = c.add_workflow("echo_wf_parallel")
workflow.json = """
{
  "StartAt": "A",
  "States": {
    "A": {
      "Type": "Task",
      "Resource": "A",
      "Next": "ParallelState"
    },
    "ParallelState": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "B",
          "States": {
            "B": {
              "Type": "Task",
              "Resource": "B",
              "End": true
            }
          }
        },
        {
          "StartAt": "C",
          "States": {
            "C": {
              "Type": "Task",
              "Resource": "C",
              "End": true
            }
          }
        }
      ],
      "End": true
    }
  }
}

"""

fn = c.add_function("A")
fn.source = {'code': """
def handle(event, context):
    import time
    context.log("Echoing event: "+str(event))
    time.sleep(1.4)
    print("dhy")
    return event
"""}

fn = c.add_function("B")
fn.source = {'code': """
def handle(event, context):
    import time
    context.log("Echoing event: "+str(event))
    time.sleep(1.2)
    print("dhy")
    return event
"""}

fn = c.add_function("C")
fn.source = {'code': """
def handle(event, context):
    import time
    context.log("Echoing event: "+str(event))
    time.sleep(1.6)
    print("dhy")
    return event
"""}

workflow.deploy(600)

request = {"hui":"hoi","blue":True,"Five":5}
start = time.time()
response = workflow.execute(request,timeout=5)
print(time.time() - start)
print(response)
# assert response == request

logdata = workflow.logs()
print("Exceptions:")
print(logdata['exceptions'])
print("Logs:")
print(logdata['log'])
print("Progress:")
print(logdata['progress'])
lambda7xx commented 1 year ago

@iakkus sorry to bother you. thank you very much

iakkus commented 1 year ago

To create a parallel state, you need another state that is used to collect the results of the parallel branches.

Please use the tests folder examples as templates.

Your workflow has two End states, which means when one finishes before the other (most probably) and even if they finish simultaneously, the client is awaiting a single response, not two.

lambda7xx commented 1 year ago

@iakkus thank you very much

  • I think maybe something wrong. For example. example-json1 where add and substract are parallel and they both have End states. example-json2 has a branch that has three branch. Branch1Terminal and Branch2Terminal and Branch3Terminal have End states.
import time
import logging
from mfn_sdk import MfnClient

host = 'XXXX'

c = MfnClient(
    mfn_url=f'http://{host}',
    mfn_user="xxxx",
    mfn_password="xxx",
    # mfn_name="test",
    proxies={
        "http": f'http://{host}:12345',
        "https": f'http://{host}:23456'
    })

logging.basicConfig(level=logging.DEBUG)

workflow = c.add_workflow("echo_wf_parallel_test")
workflow.json = """
{
  "StartAt": "A",
  "States": {
    "A": {
      "Type": "Task",
      "Resource": "A",
      "Next": "ParallelState"
    },
    "ParallelState": {
      "Type": "Parallel",
      "End":true,
      "Branches": [
        {
          "StartAt": "B",
          "States": {
            "B": {
              "Type": "Task",
              "Resource": "B",
              "End":true
            }
          }
        },
        {
          "StartAt": "C",
          "States": {
            "C": {
              "Type": "Task",
              "Resource": "C",
              "End": true
            }
          }
        }
      ]
    }
  }
}

"""

fn = c.add_function("A")
fn.source = {'code': """
def handle(event, context):
    import time
    context.log("Echoing event: "+str(event))
    time.sleep(1.4)
    print("dhy")
    return event
"""}

fn = c.add_function("B")
fn.source = {'code': """
def handle(event, context):
    import time
    context.log("Echoing event: "+str(event))
    time.sleep(1.2)
    print("dhy")
    return event
"""}

fn = c.add_function("C")
fn.source = {'code': """
def handle(event, context):
    import time
    context.log("Echoing event: "+str(event))
    time.sleep(1.6)
    print("dhy")
    return event
"""}

workflow.deploy(600)

request = {"hui":"hoi","blue":True,"Five":5}
start = time.time()
response = workflow.execute(request,timeout=5)
print(time.time() - start)
print(response)
# # assert response == request

logdata = workflow.logs()
print("Exceptions:")
# print(logdata['exceptions'])
# print("Logs:")
# print(logdata['log'])
# print("Progress:")
# print(logdata['progress'])
iakkus commented 1 year ago

Then please use and modify one of those templates to write your workflow/functions. It is not really easy to debug from your code. If it still does not work, then reopen.

lambda7xx commented 1 year ago

Then please use and modify one of those templates to write your workflow/functions. It is not really easy to debug from your code. If it still does not work, then reopen.

thanks. I use this templates to write my workflow. But it does not work

lambda7xx commented 1 year ago

Then please use and modify one of those templates to write your workflow/functions. It is not really easy to debug from your code. If it still does not work, then reopen.

thanks. I use this templates to write my workflow. But it does not work