Tartan-AUV / tauv-mono

16 stars 2 forks source link

Implement mission planning framework #103

Open theochemel opened 1 year ago

theochemel commented 1 year ago

Need to meet theo + sam to talk about design

SamuelSchaub commented 1 year ago

Design iteration 1:

class TaskResult(Enum):
    SUCCESS = 1
    FAILURE = 2

@dataclass
class TaskParams:
    mu: MotionUtils
    ac: AlarmClient
    retries: int

class Task(metaclass=ABCMeta):
    @abstractclassmethod
    def __init__(self, params: TaskParams) -> None:
        pass

    @abstractclassmethod
    def run(self) -> TaskResult:
        pass

class TaskManager():
    def __init__(self) -> None:
        #initialize everything
        pass

    def runTasks(self) -> TaskResult:
        pass
theochemel commented 1 year ago

retries probably shouldn't be part of TaskParams if TaskParams is what a task needs to run, right? My understanding is that retrying a task would be handled by the TaskManager and one execution of a task would correspond to one retry. So a task wouldn't know how many retries it is allowed, it would just be run multiple times by the TaskManager.

theochemel commented 1 year ago

Otherwise looks real good. How are you planning on structuring the task graph / decision tree / whatever we're calling it?

SamuelSchaub commented 1 year ago

Your right about the retries I'll take it out.

SamuelSchaub commented 1 year ago

For implementation, I think it depends on what we need. We should discuss.

SamuelSchaub commented 1 year ago

Design iteration 2:

from abc import ABCMeta, abstractclassmethod
from dataclasses import dataclass
from enum import Enum
from typing import Dict
from collections.abc import Callable

@dataclass
class TaskParams:
    mu: MotionUtils
    ac: AlarmClient

class Task(metaclass=ABCMeta):
    @abstractclassmethod
    def __init__(self, params: TaskParams, **kwargs) -> None:
        pass

    @abstractclassmethod
    def run(self) -> Enum:
        pass

class Mission(metaclass=ABCMeta):
    @abstractclassmethod
    def __init__(self, params: TaskParams) -> None:
        pass

class TaskManager():
    def __init__(self, mission: Mission) -> None:
        #initialize everything
        pass

    def runTasks(self) -> None:
        pass
SamuelSchaub commented 1 year ago

Not the greatest example but you get the idea

class Dive(Task):
    class DiveResults(Enum):
        SUCCESS = 1
        FAILURE = 2

    def __init__(self, params: TaskParams, **kwargs) -> None:
        self.mu = params.mu
        self.ac = params.ac
        self.depth = kwargs.depth

    def run(self) -> DiveResults:
        self.mu.goDown(self.depth)
        if self.mu.currentDepth >= self.depth:
            return self.DiveResults.SUCCESS
        else:
            return self.DiveResults.FAILURE

class Cups(Task):
    class CupsResults(Enum):
        SUCCESS = 1
        DROPPED_CUP = 2
        LOST_TRACK_OF_CUP = 3
        WRONG_BIN = 4
        FAIL = 5

    def __init__(self, params: TaskParams, **kwargs) -> None:
        self.mu = params.mu
        self.ac = params.ac

    def run(self) -> CupsResults:
        self.mu.goToCups()
        if self.mu.cannotSeeCup():
            return self.CupsResults.LOST_TRACK_OF_CUP

        self.mu.pickUpCup()
        if self.mu.doesNotHaveCup():
            return self.CupsResults.DROPPED_CUP

        self.mu.placeCupInBin()
        if self.mu.cupInWrongBin():
            return self.CupsResults.WRONG_BIN

        if self.mu.cupNotInBin():
            return self.CupsResults.FAIL

        return self.CupsResults.SUCCESS

class TestMission(Mission):
    def __init__(self, params: TaskParams) -> None:
        self.mu = params.mu
        self.ac = params.ac
        self.startState = 'dive'

        self.tasks = {
            'dive': Dive(params, { 'depth': 5 }),
            'cups': Cups(params)
        }

        self.transitions = {
            'dive': self.diveTransition,
            'cups': self.cupsTransition
        }

    def diveTransition(status: Dive.DiveResults):
        if status == Dive.DiveResults.FAILURE:
            return 'dive'
        return 'cups'

    def cupsTransition(status: Cups.CupsResults):
        match status:
            case Cups.CupsResults.SUCCESS:
                return 'finish'
            case Cups.CupsResults.DROPPED_CUP:
                self.mu.pickUpCup()
                return 'cups'
            case Cups.CupsResults.LOST_TRACK_OF_CUP:
                self.mu.backUp()
                return 'cups'
            case Cups.CupsResults.WRONG_BIN:
                self.mu.pickUpCup()
                return 'cups'
            case Cups.CupsResults.FAIL:
                return 'finish'

class TaskManager():
    def __init__(self, mission: Mission):
        self.mission = mission
        self.mu = MotionUtils()
        self.ac = Alarm()
        # initialize rest of stuff

    def runMission():
        state = self.mission.startState
        while state != 'finish':
            status = self.mission.tasks[state].run()
            state = self.mission.transitions[state](status)

tm = TaskManager(TestMission)
tm.runMission()
SamuelSchaub commented 1 year ago

We can come up with a better way to use it but this is the general idea

theochemel commented 1 year ago

Looks fantastic!

theochemel commented 1 year ago

Looks fantastic!

On Thu, Feb 9, 2023 at 1:13 AM Samuel Schaub @.***> wrote:

We can come up with a better way to use it but this is the general idea

— Reply to this email directly, view it on GitHub https://github.com/Tartan-AUV/TAUV-ROS-Packages/issues/103#issuecomment-1423692531, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADDVVKCPZKBYSPAOMJVVWGLWWSDKPANCNFSM6AAAAAAT7WEOSU . You are receiving this because you authored the thread.Message ID: @.***>

theochemel commented 1 year ago

Maybe can enum the task identifiers too so there aren't random strings floating around