HiroIshida / hifuku

Code for paper https://arxiv.org/abs/2405.02968
3 stars 0 forks source link

tmp scripts #1

Open HiroIshida opened 1 year ago

HiroIshida commented 1 year ago

measure "true" feasible rate by birrt

import time

from skplan.robot.pr2 import PR2Paramter
from skplan.solver.constraint import (
    ConstraintSatisfactionFail,
    ObstacleAvoidanceConstraint,
    PoseConstraint,
    TrajectoryEqualityConstraint,
    TrajectoryInequalityConstraint,
    batch_project_to_manifold,
)
from skplan.solver.rrt import BidirectionalRRT, RRTConfig, StartConstraintRRT
from skplan.space import ConfigurationSpace, TaskSpace
from skplan.viewer.skrobot_viewer import get_robot_config

from hifuku.library import SimpleFixedProblemPool
from hifuku.threedim.tabletop import TabletopPlanningProblem

validation_pool = SimpleFixedProblemPool.initialize(TabletopPlanningProblem, 100)
n_success = 0

pr2 = TabletopPlanningProblem.setup_pr2()
efkin, colkin = TabletopPlanningProblem.setup_kinmaps()
start = get_robot_config(pr2, efkin.control_joint_names, with_base=True)

counter = 0
for problem in validation_pool:
    counter += 1
    tspace = TaskSpace(3, sdf=problem.world.get_union_sdf())  # type: ignore
    cspace = ConfigurationSpace(tspace, colkin, PR2Paramter.rarm_default_bounds(with_base=True))

    n_wp = 15
    target_pose = problem.target_pose_list[0]
    eq_const = TrajectoryEqualityConstraint.from_start(start, n_wp)
    pose_const = PoseConstraint.from_skrobot_coords(target_pose, efkin, cspace=cspace)
    eq_const.add_goal_constraint(pose_const)

    obstacle_const = ObstacleAvoidanceConstraint(cspace)
    ineq_const = TrajectoryInequalityConstraint.create_homogeneous(obstacle_const, n_wp, 10)
    ts = time.time()
    try:
        samples = batch_project_to_manifold(
            20,
            cspace,
            eq_const=pose_const,
            ineq_const=obstacle_const,
            focus_weight=0.0,
            max_sample_per_sample=1000,
        )
    except ConstraintSatisfactionFail:
        print("satisuction fail")
        continue

    goal_tree = StartConstraintRRT.from_samples(samples, cspace)
    rrt_config = RRTConfig(n_max_iter=5000)
    rrt = BidirectionalRRT.create_default(start, goal_tree, cspace, rrt_config)
    rrt_solution = rrt.solve()
    print("{} sec".format(time.time() - ts))
    if rrt_solution is None:
        print("fail")
        continue
    n_success += 1
    print(len(rrt_solution))
    print("{}/ {}".format(n_success, counter))

test

from pathlib import Path

from hifuku.library import SolutionLibrary
from hifuku.threedim.tabletop import TabletopPlanningProblem

p = Path("~/.mohou/tabletop_solution_library").expanduser()

lib = SolutionLibrary.load(p, TabletopPlanningProblem)[0]
prob = TabletopPlanningProblem.sample(1)
num = lib.infer_iteration_num(prob)
print(num)

result = prob.solve()[0]
HiroIshida commented 1 year ago

compare performance vs rrt

import time
from pathlib import Path

import numpy as np
import torch

from hifuku.library import SolutionLibrary
from hifuku.threedim.tabletop import TabletopPlanningProblem

np.random.seed(0)
p = Path("~/.mohou/tabletop_solution_library").expanduser()

# comapre
lib = SolutionLibrary.load(p, TabletopPlanningProblem, device=torch.device("cpu"))[0]
for _ in range(40):
    problem = TabletopPlanningProblem.sample(1)

    ts = time.time()
    result = lib.infer(problem)[0]
    res = problem.solve(result.init_solution)[0]
    if not res.success:
        continue
    print("============== compare ==============")
    print("ours: success {}, time {}".format(res.success, time.time() - ts))

    ts = time.time()
    try:
        res = problem.solve()[0]
        success = res.success
    except:
        success = False
    print("rrt: success {}, time {}".format(success, time.time() - ts))
HiroIshida commented 1 year ago
import copy
import logging
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Set

import numpy as np
import torch
from skplan.viewer.skrobot_viewer import set_robot_config
from skrobot.model.primitives import LineString
from skrobot.models.pr2 import PR2

from hifuku.library import SolutionLibrary
from hifuku.threedim.tabletop import TabletopPlanningProblem
from hifuku.threedim.viewer import SceneWrapper, set_robot_alpha

logger = logging.getLogger(__name__)

@dataclass
class LibraryVisualizer:
    viewer: SceneWrapper
    problem: TabletopPlanningProblem
    linestrings: List[LineString]
    robots: List[PR2]
    success_guess: bool
    success_actual: bool
    trajectory_guess: np.ndarray
    trajectory_actual: np.ndarray
    added_ids: Set

    @classmethod
    def create(
        cls, lib: SolutionLibrary[TabletopPlanningProblem], problem: TabletopPlanningProblem
    ):
        viewer = SceneWrapper()
        viewer.camera_transform = cls.camera_transform_from_top()

        efkin, colkin = lib.problem_type.setup_kinmaps()

        linestrings = []
        n_waypoint: Optional[int] = None
        for pred in lib.predictors:
            assert pred.initial_solution is not None
            avs = pred.initial_solution.reshape(-1, 10)
            n_waypoint = len(avs)
            endpoints, _ = efkin.map(avs)
            linestrings.append(LineString(endpoints[:, 0, :3]))

        # make the activated trajectry red
        result_infer = lib.infer(problem)[0]
        success_guess = result_infer.nit < lib.success_iter_threshold()
        trajectory_guess = result_infer.init_solution.reshape(-1, 10)
        linestrings[result_infer.idx].visual_mesh.colors = [[255, 0, 0, 255]]

        # solve
        result_actual = problem.solve(result_infer.init_solution)[0]
        success_actual = result_actual.success
        trajectory_actual = result_actual.traj_solution.numpy().reshape(-1, 10)

        pr2 = lib.problem_type.setup_pr2()
        assert n_waypoint is not None
        pr2_list = [copy.deepcopy(pr2) for _ in range(n_waypoint)]
        return cls(
            viewer,
            problem,
            linestrings,
            pr2_list,
            success_guess,
            success_actual,
            trajectory_guess,
            trajectory_actual,
            set(),
        )

    @classmethod
    def camera_transform_from_top(cls) -> np.ndarray:
        transform = np.array(
            [
                [0.83342449, 0.51809572, -0.19230299, -0.06748249],
                [-0.55223975, 0.76765106, -0.32518165, -0.77247049],
                [-0.02085362, 0.37721171, 0.92589225, 3.05147158],
                [0.0, 0.0, 0.0, 1.0],
            ]
        )
        return transform

    def render_initial(self, file_path: Path):
        for linestring in self.linestrings:
            if linestring not in self.added_ids:
                self.viewer.add(linestring)
                self.added_ids.add(linestring)

        set_robot_alpha(self.robots[0], 255)
        if self.robots[0] not in self.added_ids:
            self.viewer.add(self.robots[0])
            self.added_ids.add(self.robots[0])
        # somehow this must come at the end
        self.problem.add_elements_to_viewer(self.viewer)  # type: ignore

        png = self.viewer.save_image(resolution=[640, 480], visible=True)
        with file_path.open(mode="wb") as f:
            f.write(png)
        self.add_texts(file_path)

    def render_initial_solution(self, file_path: Path):
        for linestring in self.linestrings:
            if linestring not in self.added_ids:
                self.viewer.add(linestring)

        efkin, colkin = self.problem.setup_kinmaps()
        for i, av in enumerate(self.trajectory_guess):
            robot = self.robots[i]
            set_robot_config(robot, efkin.control_joint_names, av, with_base=True)
            set_robot_alpha(robot, 150)
            if robot not in self.added_ids:
                self.viewer.add(robot)
                self.added_ids.add(robot)
        # somehow this must come at the end
        self.problem.add_elements_to_viewer(self.viewer)  # type: ignore

        png = self.viewer.save_image(resolution=[640, 480], visible=True)
        with file_path.open(mode="wb") as f:
            f.write(png)
        self.add_texts(file_path)

    def render_solution(self, file_path: Path):

        for linestring in self.linestrings:
            if linestring in self.added_ids:
                self.viewer.delete_geometry(str(id(linestring)))

        efkin, colkin = self.problem.setup_kinmaps()
        assert len(self.trajectory_actual) == len(self.robots)
        for i, av in enumerate(self.trajectory_actual):
            robot = self.robots[i]
            set_robot_config(robot, efkin.control_joint_names, av, with_base=True)
            set_robot_alpha(robot, 100)
            if robot not in self.added_ids:
                self.viewer.add(robot)
                self.added_ids.add(robot)
            else:
                self.viewer.redraw()
        set_robot_alpha(self.robots[-1], 255)
        # somehow this must come at the end
        self.problem.add_elements_to_viewer(self.viewer)  # type: ignore

        png = self.viewer.save_image(resolution=[640, 480], visible=True)
        with file_path.open(mode="wb") as f:
            f.write(png)
        self.add_texts(file_path)

    def add_texts(self, file_path: Path):
        import subprocess

        text_expectation = "expected: {}".format("success" if self.success_guess else "fail")
        color_expectation = "blue" if self.success_guess else "red"

        text_actual = "actual: {}".format("success" if self.success_actual else "fail")
        color_actual = "blue" if self.success_actual else "red"
        fontsize = 30

        subprocess.run(
            'convert {0} -gravity NorthWest -pointsize 30 -fill {1} -annotate +10+10 "{2}" {0}'.format(
                file_path, color_expectation, text_expectation
            ),
            shell=True,
        )
        subprocess.run(
            'convert {0} -gravity NorthWest -pointsize 30 -fill {1} -annotate +10+{pos} "{2}" {0}'.format(
                file_path, color_actual, text_actual, pos=fontsize + 20
            ),
            shell=True,
        )

if __name__ == "__main__":
    p = Path("~/.mohou/tabletop_solution_library").expanduser()
    lib = SolutionLibrary.load(p, TabletopPlanningProblem, device=torch.device("cpu"))[0]
    lib.limit_thread = False

    for _ in range(30):
        problem = TabletopPlanningProblem.sample(1)
        base_name = str(uuid.uuid4())[-8:]

        vis = LibraryVisualizer.create(lib, problem)
        vis.render_initial(Path(base_name + "-a.png"))

        vis = LibraryVisualizer.create(lib, problem)
        vis.render_initial_solution(Path(base_name + "-b.png"))

        vis = LibraryVisualizer.create(lib, problem)
        vis.render_solution(Path(base_name + "-c.png"))
HiroIshida commented 1 year ago

compare performance plot

import pickle

import matplotlib.pyplot as plt
import numpy as np

with open("/tmp/hifuku_bench.pkl", "rb") as f:
    (naives, mines) = pickle.load(f)

print(np.mean(naives))
print(np.mean(mines))

fig, ax = plt.subplots()
ax.plot(naives, "o", label="naive")
ax.plot(mines, "x", label="proposed")
ax.legend()
ax.set_xlabel("each trial [-]")
ax.set_ylabel("computation time [sec]")
plt.savefig("result.png")
# plt.show()