conductor-sdk / conductor-python

Conductor OSS SDK for Python programming language
Apache License 2.0
62 stars 29 forks source link

Refactor unit tests, to be compliant with recent client changes #76

Closed gardusig closed 2 years ago

gardusig commented 2 years ago

Aim to test all possible ways of application startup, with and without some parameters.

Tool used to test by hand:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging

logger = logging.getLogger(
    Configuration.get_logging_formatted_name(
        __name__
    )
)

class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result

def get_python_task_definition_example() -> List[dict]:
    return [
        {
            "createTime": 1650595379661,
            "createdBy": "",
            "name": "python_task_example_from_code",
            "description": "Python task example from code",
            "retryCount": 3,
            "timeoutSeconds": 300,
            "inputKeys": [],
            "outputKeys": [],
            "timeoutPolicy": "TIME_OUT_WF",
            "retryLogic": "FIXED",
            "retryDelaySeconds": 10,
            "responseTimeoutSeconds": 180,
            "inputTemplate": {},
            "rateLimitPerFrequency": 0,
            "rateLimitFrequencyInSeconds": 1,
            "ownerEmail": "gustavo.gardusi@orkes.io",
            "backoffScaleFactor": 1
        },
    ]

def get_python_workflow_definition_example() -> dict:
    return {
        "updateTime": 1650595431465,
        "name": "workflow_with_python_task_example_from_code",
        "description": "Workflow with python task example from code",
        "version": 1,
        "tasks": [
            {
                "name": "python_task_example_from_code",
                "taskReferenceName": "python_task_example_from_code_ref_0",
                "inputParameters": {

                },
                "type": "SIMPLE",
                "decisionCases": {

                },
                "defaultCase": [

                ],
                "forkTasks":[

                ],
                "startDelay":0,
                "joinOn":[

                ],
                "optional":False,
                "defaultExclusiveJoinTask":[

                ],
                "asyncComplete":False,
                "loopOver":[

                ]
            }
        ],
        "inputParameters": [

        ],
        "outputParameters": {
            "workerOutput": "${python_task_example_from_code_ref_0.output}"
        },
        "schemaVersion": 2,
        "restartable": True,
        "workflowStatusListenerEnabled": False,
        "ownerEmail": "gustavo.gardusi@orkes.io",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0,
        "variables": {

        },
        "inputTemplate": {

        }
    }

def define_task_and_workflow(api_client: ApiClient) -> None:
    metadata_client = MetadataResourceApi(api_client)
    try:
        metadata_client.register_task_def1(
            body=get_python_task_definition_example()
        )
        metadata_client.create(
            body=get_python_workflow_definition_example()
        )
    except Exception as e:
        logger.debug(f'Failed to define task/workflow, reason: {e}')

def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
    workflow_client = WorkflowResourceApi(api_client)
    workflowId = workflow_client.start_workflow(
        body={},
        name=workflow_name
    )
    return workflowId

def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
    workflowIdList = []
    for _ in range(qty):
        try:
            workflowId = start_workflow(api_client, workflow_name)
            workflowIdList.append(workflowId)
            logger.debug(
                f'Started workflow: {workflow_name}, with id: {workflowId}'
            )
        except Exception as e:
            logger.debug(
                f'Failed to start workflow: {workflow_name}, reason: {e}'
            )
    return workflowIdList

def main():
    configuration = Configuration(
        base_url='https://play.orkes.io',
        debug=True,
        authentication_settings=AuthenticationSettings(
            key_id='',
            key_secret=''
        )
    )
    configuration.apply_logging_config()

    api_client = ApiClient(configuration)

    workflow_id = start_workflow(
        api_client,
        'workflow_with_python_task_example_from_code'
    )
    logger.debug(f'workflow_id: {workflow_id}')

    task_api = TaskResourceApi(api_client)
    response = task_api.update_task_by_ref_name(
        output={'hello': 'world'},
        workflow_id=workflow_id,
        task_ref_name='python_task_example_from_code_ref_0',
        status=TaskResultStatus.COMPLETED.value,
    )
    logger.debug(f'task update response: {response}')

    workers = [
        SimplePythonWorker('python_task_example_from_code'),
    ]
    workflow_ids = start_workflows(
        api_client,
        'workflow_with_python_task_example_from_code',
        10
    )
    metrics_settings = MetricsSettings()
    with TaskHandler(workers, configuration, metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()

if __name__ == '__main__':
    main()
gardusig commented 2 years ago

Issue will be continued here: https://github.com/conductor-sdk/conductor-python/issues/38