HiroIshida / hifuku

Code for paper https://arxiv.org/abs/2405.02968
3 stars 0 forks source link

refactor: incorporate skmp and rpbench #6

Closed HiroIshida closed 1 year ago

HiroIshida commented 1 year ago

graph

HiroIshida commented 1 year ago

Maybe show_library module will be used later

import copy
import logging
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Set

import numpy as np
import torch
from skplan.viewer.skrobot_viewer import set_robot_config
from skrobot.model.primitives import LineString
from skrobot.models.pr2 import PR2

from hifuku.library import SolutionLibrary
from hifuku.threedim.tabletop import TabletopPlanningProblem
from hifuku.threedim.viewer import SceneWrapper, set_robot_alpha

logger = logging.getLogger(__name__)

@dataclass
class LibraryVisualizer:
    viewer: SceneWrapper
    problem: TabletopPlanningProblem
    linestrings: List[LineString]
    robots: List[PR2]
    success_guess: bool
    success_actual: bool
    trajectory_guess: np.ndarray
    trajectory_actual: np.ndarray
    added_ids: Set

    @classmethod
    def create(
        cls, lib: SolutionLibrary[TabletopPlanningProblem], problem: TabletopPlanningProblem
    ):
        viewer = SceneWrapper()
        viewer.camera_transform = cls.camera_transform_from_top()

        efkin, colkin = lib.problem_type.setup_kinmaps()

        linestrings = []
        n_waypoint: Optional[int] = None
        for pred in lib.predictors:
            assert pred.initial_solution is not None
            avs = pred.initial_solution.reshape(-1, 10)
            n_waypoint = len(avs)
            endpoints, _ = efkin.map(avs)
            linestrings.append(LineString(endpoints[:, 0, :3]))

        # make the activated trajectry red
        result_infer = lib.infer(problem)[0]
        success_guess = result_infer.nit < lib.success_iter_threshold()
        trajectory_guess = result_infer.init_solution.reshape(-1, 10)
        linestrings[result_infer.idx].visual_mesh.colors = [[255, 0, 0, 255]]

        # solve
        result_actual = problem.solve(result_infer.init_solution)[0]
        success_actual = result_actual.success
        trajectory_actual = result_actual.traj_solution.numpy().reshape(-1, 10)

        pr2 = lib.problem_type.setup_pr2()
        assert n_waypoint is not None
        pr2_list = [copy.deepcopy(pr2) for _ in range(n_waypoint)]
        return cls(
            viewer,
            problem,
            linestrings,
            pr2_list,
            success_guess,
            success_actual,
            trajectory_guess,
            trajectory_actual,
            set(),
        )

    @classmethod
    def camera_transform_from_top(cls) -> np.ndarray:
        transform = np.array(
            [
                [0.83342449, 0.51809572, -0.19230299, -0.06748249],
                [-0.55223975, 0.76765106, -0.32518165, -0.77247049],
                [-0.02085362, 0.37721171, 0.92589225, 3.05147158],
                [0.0, 0.0, 0.0, 1.0],
            ]
        )
        return transform

    def render_initial(self, file_path: Path):
        for linestring in self.linestrings:
            if linestring not in self.added_ids:
                self.viewer.add(linestring)
                self.added_ids.add(linestring)

        set_robot_alpha(self.robots[0], 255)
        if self.robots[0] not in self.added_ids:
            self.viewer.add(self.robots[0])
            self.added_ids.add(self.robots[0])
        # somehow this must come at the end
        self.problem.add_elements_to_viewer(self.viewer)  # type: ignore

        png = self.viewer.save_image(resolution=[640, 480], visible=True)
        with file_path.open(mode="wb") as f:
            f.write(png)
        self.add_texts(file_path)

    def render_initial_solution(self, file_path: Path):
        for linestring in self.linestrings:
            if linestring not in self.added_ids:
                self.viewer.add(linestring)

        efkin, colkin = self.problem.setup_kinmaps()
        for i, av in enumerate(self.trajectory_guess):
            robot = self.robots[i]
            set_robot_config(robot, efkin.control_joint_names, av, with_base=True)
            set_robot_alpha(robot, 150)
            if robot not in self.added_ids:
                self.viewer.add(robot)
                self.added_ids.add(robot)
        # somehow this must come at the end
        self.problem.add_elements_to_viewer(self.viewer)  # type: ignore

        png = self.viewer.save_image(resolution=[640, 480], visible=True)
        with file_path.open(mode="wb") as f:
            f.write(png)
        self.add_texts(file_path)

    def render_solution(self, file_path: Path):

        for linestring in self.linestrings:
            if linestring in self.added_ids:
                self.viewer.delete_geometry(str(id(linestring)))

        efkin, colkin = self.problem.setup_kinmaps()
        assert len(self.trajectory_actual) == len(self.robots)
        for i, av in enumerate(self.trajectory_actual):
            robot = self.robots[i]
            set_robot_config(robot, efkin.control_joint_names, av, with_base=True)
            set_robot_alpha(robot, 100)
            if robot not in self.added_ids:
                self.viewer.add(robot)
                self.added_ids.add(robot)
            else:
                self.viewer.redraw()
        set_robot_alpha(self.robots[-1], 255)
        # somehow this must come at the end
        self.problem.add_elements_to_viewer(self.viewer)  # type: ignore

        png = self.viewer.save_image(resolution=[640, 480], visible=True)
        with file_path.open(mode="wb") as f:
            f.write(png)
        self.add_texts(file_path)

    def add_texts(self, file_path: Path):
        import subprocess

        text_expectation = "expected: {}".format("success" if self.success_guess else "fail")
        color_expectation = "blue" if self.success_guess else "red"

        text_actual = "actual: {}".format("success" if self.success_actual else "fail")
        color_actual = "blue" if self.success_actual else "red"
        fontsize = 30

        subprocess.run(
            'convert {0} -gravity NorthWest -pointsize 30 -fill {1} -annotate +10+10 "{2}" {0}'.format(
                file_path, color_expectation, text_expectation
            ),
            shell=True,
        )
        subprocess.run(
            'convert {0} -gravity NorthWest -pointsize 30 -fill {1} -annotate +10+{pos} "{2}" {0}'.format(
                file_path, color_actual, text_actual, pos=fontsize + 20
            ),
            shell=True,
        )

if __name__ == "__main__":
    p = Path("~/.mohou/tabletop_solution_library").expanduser()
    lib = SolutionLibrary.load(p, TabletopPlanningProblem, device=torch.device("cpu"))[0]
    lib.limit_thread = False

    for _ in range(30):
        problem = TabletopPlanningProblem.sample(1)
        base_name = str(uuid.uuid4())[-8:]

        vis = LibraryVisualizer.create(lib, problem)
        vis.render_initial(Path(base_name + "-a.png"))

        vis = LibraryVisualizer.create(lib, problem)
        vis.render_initial_solution(Path(base_name + "-b.png"))

        vis = LibraryVisualizer.create(lib, problem)
        vis.render_solution(Path(base_name + "-c.png"))