ow2-proactive / scheduling

Multi-platform Scheduling and Workflows Engine
http://www.activeeon.com/workflows-scheduling
GNU Affero General Public License v3.0
62 stars 54 forks source link

Dependency between workflow and task queue control? #2319

Closed winghc closed 8 years ago

winghc commented 9 years ago

1.Do this product support dependency on taskFlow level? Here is the my scenaria, taskFlow A including 100 tasks inside which is going to download 100 tables to csv files from db2, taskFlow B will import those 100 files exported by taskFlow A to another database through 100 tasks defined. Here taskFlow B depend on taskFlow A's success.

2.Still for above scenaria, in taskFlow A, I don't want the agent to run 100 connections with db2 and download data at the same time, I prefer using limited connection (resource) to do such download work, such as 5 connections for one loop, and put other download jobs in pending status.

Appreciate your reply.

fviale commented 9 years ago

Currently, using ProActive workflow model, it is not possible to express dependencies at job level (i.e. complete Taskflows). But the use case you describe can be entirely expressed using our workflow model. Consider for example, the following workflow: image 1) The task Import100Tables is replicated 100 times, and will do your database imports. To respond to your second question, even if there are 100 tasks, not all of them will run in parallel. The number of parallel execution will be decided by the number of ProActive Node Resources you allocate. So if you decide to allocate 5 ProActive Nodes, the 100 tasks will be run 5 by 5, sequentially. You can see the number of ProActive Node ressources in the Resource Manager interface: image

2) After those 100 tasks have terminated, the second part of the workflow will be executed. The Export100Tables replicated tasks will do the export to your second database.

winghc commented 9 years ago

thank you so much for your quick response. I will try to play with it.

fviale commented 9 years ago

You're welcome, it might be a bit tricky to route the tasks properly using the studio. Simply use two "replicate" constructs and bind them together by: 1) Removing the first construct "Merge" task 2) Renaming the second construct Split => Merge 3) Connect the first construct to the new Merge task

winghc commented 9 years ago

For my environment, one data warehouse building by db2 and hadoop, there are hundreds jobs for different tasks and almost impossible to put all tasks in one big job. I am wondering if this product can have any protocol for those job to talk with each other? For example, JobA will fire up one event or change some common variabes on the fly, JobB is going to run at one given time period and on condition of receiving this signal fired by JobA or checking variabes' value to be matched.

fviale commented 9 years ago

there are no common environment variables shared by different jobs (only variables shared within a job). But maybe something can be done by using files. Our framework deploys two shared file storage servers called UserSpace (specific to one user) or GlobalSpace (available to all user). Normally they are used to transfer files before executing a task or after a given task execution, but they can also be queried directly by using the Dataspace API.

Does your TaskFlow A job need to execute completely and produce all csv files before the TaskFlow B starts to run ?

Does the TaskFlow A job need to continue its execution after it has imported all tables (for example to download more tables on updates) ?

winghc commented 9 years ago

Shared file storage is one good alternative. So I need embed Dataspace API in start node of Job, right? Can I call Dataspace API using shell script and Java?

Yes. TaskFlow A export files from application system and TaskFlow B need to load those files to data warehouse and do other ETL jobs. Since our network environment is a lttle bit complicated, we can not put export, upload in one job for each table.

TaskFlow A will run daily at given time period to download the updated records.

I have once used Control-M, Informatia and researched oozie, linkedin's azkaban. This product's framework impress me.

fviale commented 9 years ago

Thanks for your nice compliment :-)

Maybe it will not be necessary to use the DataSpace API directly.

I don't know if you are connected to our demo platform on http://try.activeeon.com to do some tryouts, but I would suggest it.

An additional feature which will be very useful in your scenario is CRON tasks. Here is a workflow which integrates a cron loop which will run Task1, Task2, Task3, Task4 every minute

You can import it on the try platform studio by clicking on import (after having previously saved the xml as a file).

<?xml version="1.0" encoding="UTF-8"?>
<job
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="urn:proactive:jobdescriptor:3.3"
     xsi:schemaLocation="urn:proactive:jobdescriptor:3.3 http://www.activeeon.com/public_content/schemas/proactive/jobdescriptor/3.3/schedulerjob.xsd"
    name="Cron Example Workflow" 
    priority="normal"
    cancelJobOnError="false">
  <description>
    <![CDATA[ A loop is a set of tasks that will be duplicated and rescheduled.
        A script allows to control the number of replication by setting the variable 'loop' to true or false. ]]>
  </description>
  <taskFlow>
    <task name="Start">
      <scriptExecutable>
        <script>
          <code language="javascript">
            <![CDATA[
print('Loop block start ' + variables.get('PA_TASK_ITERATION'))
]]>
          </code>
        </script>
      </scriptExecutable>
      <controlFlow block="start"></controlFlow>
    </task>
    <task name="Loop">
      <depends>
        <task ref="Task1"/>
        <task ref="Task2"/>
        <task ref="Task3"/>
        <task ref="Task4"/>
      </depends>
      <scriptExecutable>
        <script>
          <code language="javascript">
            <![CDATA[
print('Loop block end ' + variables.get('PA_TASK_ITERATION'))
]]>
          </code>
        </script>
      </scriptExecutable>
      <controlFlow  block="end">
        <loop target="Start">
          <script>
            <code language="javascript">
              <![CDATA[
// You can use a Cron Expression here
// examples http://www.sauronsoftware.it/projects/cron4j/manual.php#p02

loop = "* * * * *";
]]>
            </code>
          </script>
        </loop>
      </controlFlow>
    </task>
    <task name="Task1">
      <depends>
        <task ref="Start"/>
      </depends>
      <scriptExecutable>
        <script>
          <code language="javascript">
            <![CDATA[
print(java.lang.System.getProperty('pas.task.name'))
]]>
          </code>
        </script>
      </scriptExecutable>
      <controlFlow block="none"></controlFlow>
    </task>
    <task name="Task2">
      <depends>
        <task ref="Start"/>
      </depends>
      <scriptExecutable>
        <script>
          <code language="javascript">
            <![CDATA[
print(java.lang.System.getProperty('pas.task.name'))
]]>
          </code>
        </script>
      </scriptExecutable>
      <controlFlow block="none"></controlFlow>
    </task>
    <task name="Task3">
      <depends>
        <task ref="Start"/>
      </depends>
      <scriptExecutable>
        <script>
          <code language="javascript">
            <![CDATA[
print(java.lang.System.getProperty('pas.task.name'))
]]>
          </code>
        </script>
      </scriptExecutable>
      <controlFlow block="none"></controlFlow>
    </task>
    <task name="Task4">
      <depends>
        <task ref="Start"/>
      </depends>
      <scriptExecutable>
        <script>
          <code language="javascript">
            <![CDATA[
print(java.lang.System.getProperty('pas.task.name'))
]]>
          </code>
        </script>
      </scriptExecutable>
      <controlFlow block="none"></controlFlow>
    </task>
  </taskFlow>
</job>

In this workflow you can add as many Tasks as you want in the center part without changing the overall behaviour. You can also change the loop expression to set it to a different CRON than every minutes.

What I think is easily feasible using ProActive in your use case is to have 2 such workflow jobs which run at different times (which should be reasonable considering your ETL), you can eventually add a buffering wait in the starting task of your TaskFlowB in case there was some delays in the ETL.