runabol / piper

piper - a distributed workflow engine
Apache License 2.0
487 stars 86 forks source link

java.lang.IllegalArgumentException: Unknown task handler: subflow #29

Open omartrigui opened 5 years ago

omartrigui commented 5 years ago

As the title mentions, i'm running the latest version of docker container "creactiviti/piper" I wanted to use the subflow task in order to make a modular code, but an exception was raised

$ docker pull creactiviti/piper
Using default tag: latest
latest: Pulling from creactiviti/piper
Digest: sha256:1d9e5465304805191e0a7322e7c7b36ca2a4f3dc577f02d7bb6f7d46f82726ef
Status: Image is up to date for creactiviti/piper:latest
        {
            "jobId": "3559b511ff3e48ff9baa5548c04c785d",
            "createTime": "2019-01-25T12:08:26.118+0000",
            "inputs": [
                {
                    "source": "/path/to/source/dir"
                },
                {
                    "destination": "/path/to/destination/dir"
                }
            ],
            "taskNumber": 2,
            "id": "c072543876024b89a50d753164d0201e",
            "endTime": "2019-01-25T12:08:26.128+0000",
            "type": "subflow",
            "priority": 0,
            "error": {
                "stackTrace": [
                    "java.lang.IllegalArgumentException: Unknown task handler: subflow",
                    "\tat org.springframework.util.Assert.notNull(Assert.java:134)",
                    "\tat com.creactiviti.piper.core.task.DefaultTaskHandlerResolver.resolve(DefaultTaskHandlerResolver.java:33)",
                    "\tat com.creactiviti.piper.core.Worker.lambda$handle$0(Worker.java:90)",
                    "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)",
                    "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)",
                    "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
                    "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
                    "\tat java.lang.Thread.run(Thread.java:748)"
                ],
                "message": "Unknown task handler: subflow"
            },
            "pipelineId": "job",
            "status": "FAILED"
        }

The Docker Hub's image seems to be outdated

runabol commented 5 years ago

Yeah, looks like I need to make a new docker image build.

On Fri, Jan 25, 2019, 7:14 AM Omar Trigui <notifications@github.com wrote:

As the title mentions, i'm running the latest version of docker container "creactiviti/piper" I wanted to use the subflow task in order to make a modular code, but an exception was raised

    {
        "jobId": "3559b511ff3e48ff9baa5548c04c785d",
        "createTime": "2019-01-25T12:08:26.118+0000",
        "inputs": [
            {
                "source": "/path/to/source/dir"
            },
            {
                "destination": "/path/to/destination/dir"
            }
        ],
        "taskNumber": 2,
        "id": "c072543876024b89a50d753164d0201e",
        "endTime": "2019-01-25T12:08:26.128+0000",
        "type": "subflow",
        "priority": 0,
        "error": {
            "stackTrace": [
                "java.lang.IllegalArgumentException: Unknown task handler: subflow",
                "\tat org.springframework.util.Assert.notNull(Assert.java:134)",
                "\tat com.creactiviti.piper.core.task.DefaultTaskHandlerResolver.resolve(DefaultTaskHandlerResolver.java:33)",
                "\tat com.creactiviti.piper.core.Worker.lambda$handle$0(Worker.java:90)",
                "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)",
                "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)",
                "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
                "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
                "\tat java.lang.Thread.run(Thread.java:748)"
            ],
            "message": "Unknown task handler: subflow"
        },
        "pipelineId": "job",
        "status": "FAILED"
    }

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/creactiviti/piper/issues/29, or mute the thread https://github.com/notifications/unsubscribe-auth/AC6VBgILckNMTEM4EIH5YccapYXfXiQNks5vGvUfgaJpZM4aSxQi .

omartrigui commented 5 years ago

I wonder when the new docker image build will be scheduled.

runabol commented 5 years ago

Try now

omartrigui commented 5 years ago

@creactiviti : The image is well updated but i am encountering an exception every time i use "subflow" it in my pipeline code. Am i missing something ?

2019-01-27 20:06:43.823 DEBUG 1 --- [cTaskExecutor-1] c.c.piper.core.event.LogEventListener    : {jobId=fecd16a663f54dd0a9ecdf8f6105009e, createTime=2019-01-27T20:06:43.755+0000, id=ff070892e93a4a1eb956ba779efdaac9, type=job.status, status=CREATED}
2019-01-27 20:06:43.848  WARN 1 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'start' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:395) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.util.Map
    at com.creactiviti.piper.core.MapObject.getMap(MapObject.java:239) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.MapObject.getMap(MapObject.java:248) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.task.SubflowTaskDispatcher.dispatch(SubflowTaskDispatcher.java:45) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.task.SubflowTaskDispatcher.dispatch(SubflowTaskDispatcher.java:34) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.task.TaskDispatcherChain.dispatch(TaskDispatcherChain.java:32) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.DefaultJobExecutor.executeNextTask(DefaultJobExecutor.java:77) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.DefaultJobExecutor.execute(DefaultJobExecutor.java:52) ~[classes!/:0.0.1-SNAPSHOT]
    at com.creactiviti.piper.core.Coordinator.start(Coordinator.java:133) ~[classes!/:0.0.1-SNAPSHOT]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265) ~[spring-core-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
    ... 12 common frames omitted

2019-01-27 20:06:43.850  WARN 1 --- [cTaskExecutor-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"createTime":"2019-01-27T20:06:43.658+0000","webhooks":[],"inputs":{"input":"lol"},"id":"fecd16a663f54dd0a9ecdf8f6105009e","label":"Transcode","priority":0,"pipelineId":"video/transcode","status":"CREATED","tags":[]}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.String, __TypeId__=com.creactiviti.piper.core.job.SimpleJob}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=piper.tasks, receivedRoutingKey=jobs, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-MYsssvVE8heBKBDbZBMsdg, consumerQueue=jobs])

video/transcode.yaml :

label: Transcode 

inputs:
  - name: input
    label: Input File
    type: string
    required: true

tasks: 

  - type: bash
    label: some_label
    script: date

  - type: parallel
    tasks: 

      - type: subflow
        pipelineId: video/flow1
        inputs: 
          - source: source
          - destination: destination

video/flow1.yaml :

label: Flow1

inputs:

  - name: source
    label: Input File
    type: string
    required: true

  - name: destination
    label: Input File
    type: string
    required: true

tasks: 

  - label: labell
    type: print
    text: "flow1"

EDIT :

The problem doesn't seems to be related to the subflow task handler. The following pipeline raises the same exception

label: Transcode 

inputs:
  - name: input
    label: Input File
    type: string
    required: true

tasks: 

- type: each
  list: [1000,2000,3000]
  iteratee:
    - type: bash
      label: First executed command
      script: echo hello
anshumanr commented 5 years ago

Hi, were you able to figure out why this is happening? I am getting the same error using subflow. My script is very simple.

label: Inbound

inputs:
  - name: dnis
    type: string
    required: true    

tasks:
  - type: subflow
    pipelineId: ivr
    inputs:
      - name: ${dnis}

The ivr script:

label: Ivr 

inputs:
  - name: name
    type: string
    required: true

outputs:
  - name: myMagicNumber
    value: ${transferTo}

tasks:
  - name: playPrompt 
    label: Play welcome message 
    text: Play welcome message 
    type: playFile
    fileName: ${name}

  - name: dtmf
    label: Collect DTMF 
    type: dtmf 
    text: Collecting dtmf digits
    digits: "32342@p2p"

  - name: transferTo
    label: Transfering to queue 
    type: transfer 
    queue: ${dtmf}

  - label: Print a farewell
    type: print
    text: Goodbye ${transferTo}`

The ivr script executes on it's own but get the same error as above when trying to execute from subflow.

2019-05-14 11:43:23.801 WARN 14366 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set. org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method 'start' threw exception; nested exception is java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.util.Map at >org.springframework.jms.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:322) ~[spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:243) ~[spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719) ~[spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) ~[spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649) ~[spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317) [spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255) [spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168) [spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160) [spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at >org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057) [spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191] Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.util.Map at com.creactiviti.piper.core.MapObject.getMap(MapObject.java:239) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.MapObject.getMap(MapObject.java:248) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.task.SubflowTaskDispatcher.dispatch(SubflowTaskDispatcher.java:45) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.task.SubflowTaskDispatcher.dispatch(SubflowTaskDispatcher.java:34) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.task.TaskDispatcherChain.dispatch(TaskDispatcherChain.java:32) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.DefaultJobExecutor.executeNextTask(DefaultJobExecutor.java:77) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.DefaultJobExecutor.execute(DefaultJobExecutor.java:52) ~[classes!/:0.0.1-SNAPSHOT] at com.creactiviti.piper.core.Coordinator.start(Coordinator.java:133) ~[classes!/:0.0.1-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191] at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265) ~[spring-core-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] at org.springframework.jms.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:314) ~[spring-jms-4.3.18.RELEASE.jar!/:4.3.18.RELEASE] ... 10 common frames omitted`

EDIT Problem seems to be because inputs in subflow are parsed as array rather than a map object. To unblock myself I did this

public Map<String,Object> getMap (Object aKey) {
    Object value = get(aKey);
    if(value == null) {
      return null;
    }
    if (value instanceof List ) {
      Map<String,Object> map = new HashMap<>();
      ((ArrayList) value).forEach((val) -> map.putAll((Map<String,Object>)(val)));
      return map;
    }

    return Collections.unmodifiableMap((Map<String,Object>)value);
  }

If acceptable, i can submit a PR