Closed MarkGaox closed 2 years ago
We want support customized ETL processing of data. There will be three stages: E(Extract), T(Transform), L(Load). One way to achieve this is by allowing user to inject a python script such that he can process the data in anyway he wants.
We might have to change the API input expression (TaskList) accordingly, for example:
Task1
> transform.py
> Task2
Task1 is an extracting command that can extract data. ">" means we need to use the output of previous task as the input of this task. In this way, python script can ingest the data from last task and transform it. Task2 is a loading task. It's going to take in the transformed data and load them into another data storage, e.x., MySQL.
heroku test1: ELT processing √ { "TaskList" : ["Product:mysql:table:Operations:GET id:6 > transform.py load_mysql_data_into_kafka > Product:kafka:topic:Operations:POST *"], "MultiThread" : false }
heroku test2: Mysql get - multi-thread √ { "TaskList" : ["Product:mysql:table:Operations:GET_ALL", "Product:mysql:table:Operations:GET id:6", "Product:mysql:table:Operations:GET id:7", "Product:mysql:table:Operations:GET id:8", "Product:mysql:table:Operations:GET id:9"], "MultiThread" : true }
heroku test3: Mysql get - sequential √ { "TaskList" : ["Product:mysql:table:Operations:GET_ALL", "Product:mysql:table:Operations:GET id:6", "Product:mysql:table:Operations:GET id:7", "Product:mysql:table:Operations:GET id:8", "Product:mysql:table:Operations:GET id:9"], "MultiThread" : false }
heroku test4: Mysql post & delete - sequential √ { "TaskList": ["Product:mysql:table:Operations:POST resource:[{name:jiarui2},{name:junyu2}]", "Product:mysql:table:Operations:GET_ALL", "Product:mysql:table:Operations:GET id:7", "Product:mysql:table:Operations:DELETE id:7", "Product:mysql:table:Operations:GET_ALL" ], "MultiThread" : false }
heroku test5: Mysql post & delete - multi-thread √ { "TaskList": ["Product:mysql:table:Operations:POST resource:[{name:jiarui2},{name:junyu2}]", "Product:mysql:table:Operations:GET_ALL", "Product:mysql:table:Operations:DELETE id:43", "Product:mysql:table:Operations:DELETE id:44"], "MultiThread" : true }
heroku test6: mysql delete - multi-thread √ { "TaskList": ["Product:mysql:table:Operations:DELETE id:34","Product:mysql:table:Operations:DELETE id:35","Product:mysql:table:Operations:DELETE id:36","Product:mysql:table:Operations:DELETE id:37","Product:mysql:table:Operations:DELETE id:38", "Product:mysql:table:Operations:DELETE id:41", "Product:mysql:table:Operations:DELETE id:42", "Product:mysql:table:Operations:DELETE id:44"], "MultiThread" : true }
heroku test7: kafka get & post sequential √ { "TaskList" : ["Product:kafka:topic:Operations:GET_ALL", "Product:kafka:topic:Operations:POST topic_name:t3", "Product:kafka:topic:Operations:GET_ALL", "Product:kafka:topic:Operations:POST topic_name:t4", "Product:kafka:topic:Operations:GET_ALL"], "MultiThread" : false }
heroku test8: kafka get & post multi-thread √ { "TaskList" : ["Product:kafka:topic:Operations:GET_ALL", "Product:kafka:topic:Operations:POST topic_name:t5", "Product:kafka:topic:Operations:GET_ALL", "Product:kafka:topic:Operations:POST topic_name:t6", "Product:kafka:topic:Operations:GET_ALL"], "MultiThread" : true }
heroku test9: kafka delete sequential √ { "TaskList" : [ "Product:kafka:topic:Operations:DELETE topic_name:t5", "Product:kafka:topic:Operations:DELETE topic_name:t6"], "MultiThread" : false }
user-defined injected python script