renz011tzar / Omniverse_crazyfie

Other
0 stars 1 forks source link

Code 2.0 #2

Open Wangshengyang2004 opened 6 months ago

Wangshengyang2004 commented 6 months ago
import numpy as np
import torch
from omni.isaac.core.objects import DynamicSphere
from omni.isaac.core.prims import RigidPrimView
from omni.isaac.core.utils.prims import get_prim_at_path
from omni.isaac.core.utils.torch.rotations import *
from omniisaacgymenvs.tasks.base.rl_task import RLTask
from omniisaacgymenvs.robots.articulations.crazyflie2 import Crazyflie2
from omniisaacgymenvs.robots.articulations.views.crazyflie2_view import Crazyflie2View

EPS = 1e-6  # small constant to avoid divisions by 0 and log(0)

class Crazyflie2Task(RLTask):
    def __init__(self, name, sim_config, env, offset=None) -> None:
        self.update_config(sim_config)
        self._device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
        self._num_observations = 18
        self._num_actions = 4

        self.stage = 1

        self._crazyflie_position = torch.tensor([0, 0, 1.0],device=self._device)
        self._red_ball_position = torch.tensor([0, 0, 2.0],device=self._device)
        self._blue_ball_position = torch.tensor([-4.0, 5.0, 3.0],device=self._device)
        self._yellow_ball_position = torch.tensor([17.0, 8.0, 4.0],device=self._device)

        RLTask.__init__(self, name=name, env=env)

        return

    def update_config(self, sim_config):
        self._sim_config = sim_config
        self._cfg = sim_config.config
        self._task_cfg = sim_config.task_config

        self._num_envs = self._task_cfg["env"]["numEnvs"]
        self._env_spacing = self._task_cfg["env"]["envSpacing"]
        self._max_episode_length = self._task_cfg["env"]["maxEpisodeLength"]

        self.dt = self._task_cfg["sim"]["dt"]

        # parameters for the crazyflie
        self.arm_length = 0.05

        # parameters for the controller
        self.motor_damp_time_up = 0.15
        self.motor_damp_time_down = 0.15

        # I use the multiplier 4, since 4*T ~ time for a step response to finish, where
        # T is a time constant of the first-order filter
        self.motor_tau_up = 4 * self.dt / (self.motor_damp_time_up + EPS)
        self.motor_tau_down = 4 * self.dt / (self.motor_damp_time_down + EPS)

        # thrust max
        self.mass = 0.028
        self.thrust_to_weight = 1.9

        self.motor_assymetry = np.array([1.0, 1.0, 1.0, 1.0])
        # re-normalizing to sum-up to 4
        self.motor_assymetry = self.motor_assymetry * 4.0 / np.sum(self.motor_assymetry)

        self.grav_z = -1.0 * self._task_cfg["sim"]["gravity"][2]

    # Remember to call `get_targets` instead of `get_target` in your `set_up_scene` method.
    def set_up_scene(self, scene) -> None:
        self.get_crazyflie()
        self.get_targets()  # Updated method call
        RLTask.set_up_scene(self, scene)
        self._copters = Crazyflie2View(prim_paths_expr="/World/envs/.*/Crazyflie2", name="crazyflie2_view")
        self._red_ball = RigidPrimView(prim_paths_expr="/World/envs/.*/red_ball", name="red_ball_view")
        self._blue_ball = RigidPrimView(prim_paths_expr="/World/envs/.*/blue_ball", name="blue_ball_view")
        self._yellow_ball = RigidPrimView(prim_paths_expr="/World/envs/.*/yellow_ball", name="yellow_ball_view")
        scene.add(self._copters)
        scene.add(self._red_ball)
        scene.add(self._blue_ball)
        scene.add(self._yellow_ball)
        for i in range(4):
            scene.add(self._copters.physics_rotors[i])
        return

    def initialize_views(self, scene):
        super().initialize_views(scene)
        if scene.object_exists("crazyflie2_view"):
            scene.remove_object("crazyflie2_view", registry_only=True)
        if scene.object_exists("red_ball_view"):
            scene.remove_object("red_ball_view", registry_only=True)
        if scene.object_exists("blue_ball_view"):
            scene.remove_object("blue_ball_view", registry_only=True)
        if scene.object_exists("yellow_ball_view"):
            scene.remove_object("yellow_ball_view", registry_only=True)

        for i in range(1, 5):
            scene.remove_object(f"m{i}_prop_view", registry_only=True)
        self._copters = Crazyflie2View(prim_paths_expr="/World/envs/.*/Crazyflie2", name="crazyflie2_view")
        self._red_ball = RigidPrimView(prim_paths_expr="/World/envs/.*/red_ball", name="red_ball_view")
        self._blue_ball = RigidPrimView(prim_paths_expr="/World/envs/.*/blue_ball", name="blue_ball_view")
        self._yellow_ball = RigidPrimView(prim_paths_expr="/World/envs/.*/yellow_ball", name="yellow_ball_view")
        scene.add(self._copters)
        scene.add(self._red_ball)
        scene.add(self._blue_ball)
        scene.add(self._yellow_ball)
        for i in range(4):
            scene.add(self._copters.physics_rotors[i])

    def get_crazyflie(self):
        copter = Crazyflie2(
            prim_path=self.default_zero_env_path + "/Crazyflie2", name="crazyflie2", translation=self._crazyflie_position
        )
        self._sim_config.apply_articulation_settings(
            "crazyflie2", get_prim_at_path(copter.prim_path), self._sim_config.parse_actor_config("crazyflie2")
        )

    def get_targets(self):
        # Define positions for the red, blue and yellow balls
        radius = 0.1
        red_color = torch.tensor([1, 0, 0])
        blue_color = torch.tensor([0, 0, 1])
        yellow_color = torch.tensor([1, 1, 0])
        # The typo here cause the red ball fall down
        red_ball = DynamicSphere(
            prim_path=self.default_zero_env_path + "/red_ball",
            translation=self._red_ball_position.cpu().numpy(),  # Assuming simulation uses CPU numpy arrays
            name="red_ball",
            radius=radius,
            color=red_color,
        )
        self._sim_config.apply_articulation_settings(
            "red_ball", get_prim_at_path(red_ball.prim_path), self._sim_config.parse_actor_config("red_ball")
        )

        # Create the blue ball
        blue_ball = DynamicSphere(
            prim_path=self.default_zero_env_path + "/blue_ball",
            translation=self._blue_ball_position.cpu().numpy(),  # Assuming simulation uses CPU numpy arrays
            name="blue_ball",
            radius=radius,
            color=blue_color  # Blue color
        )
        self._sim_config.apply_articulation_settings(
            "blue_ball", get_prim_at_path(blue_ball.prim_path), self._sim_config.parse_actor_config("blue_ball")
        )

        # Create the yellow ball
        yellow_ball = DynamicSphere(
            prim_path=self.default_zero_env_path + "/yellow_ball",
            translation=self._yellow_ball_position.cpu().numpy(),  # Assuming simulation uses CPU numpy arrays
            name="yellow_ball",
            radius=radius,
            color=yellow_color # Yellow color
        )
        self._sim_config.apply_articulation_settings(
            "yellow_ball", get_prim_at_path(yellow_ball.prim_path), self._sim_config.parse_actor_config("yellow_ball")
        )

        # Optionally disable collision if these are purely visual/positional targets
        red_ball.set_collision_enabled(False)
        blue_ball.set_collision_enabled(False)
        yellow_ball.set_collision_enabled(False)

    def get_observations(self) -> dict:
        self.root_pos, self.root_rot = self._copters.get_world_poses(clone=False)
        self.root_velocities = self._copters.get_velocities(clone=False)   

        root_positions = self.root_pos - self._env_pos
        root_quats = self.root_rot

        rot_x = quat_axis(root_quats, 0)
        rot_y = quat_axis(root_quats, 1)
        rot_z = quat_axis(root_quats, 2)

        root_linvels = self.root_velocities[:, :3]
        root_angvels = self.root_velocities[:, 3:]

        self.obs_buf[..., 0:3] = self.target_positions - root_positions

        self.obs_buf[..., 3:6] = rot_x
        self.obs_buf[..., 6:9] = rot_y
        self.obs_buf[..., 9:12] = rot_z

        self.obs_buf[..., 12:15] = root_linvels
        self.obs_buf[..., 15:18] = root_angvels

        observations = {self._copters.name: {"obs_buf": self.obs_buf}}
        return observations

    def pre_physics_step(self, actions) -> None:
        if not self.world.is_playing():
            return
        self.update_stage()
        print(f"Current Stage:{self.stage} Target0 Position:{self.target_positions[0]}Root Position0:{self.root_pos[0]}")
        reset_env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
        if len(reset_env_ids) > 0:
            self.reset_idx(reset_env_ids)

        set_target_ids = (self.progress_buf % 500 == 0).nonzero(as_tuple=False).squeeze(-1)
        if len(set_target_ids) > 0:
            self.set_targets(set_target_ids)

        actions = actions.clone().to(self._device)
        self.actions = actions

        # clamp to [-1.0, 1.0]
        thrust_cmds = torch.clamp(actions, min=-1.0, max=1.0)
        # scale to [0.0, 1.0]
        thrust_cmds = (thrust_cmds + 1.0) / 2.0
        # filtering the thruster and adding noise
        motor_tau = self.motor_tau_up * torch.ones((self._num_envs, 4), dtype=torch.float32, device=self._device)
        motor_tau[thrust_cmds < self.thrust_cmds_damp] = self.motor_tau_down
        motor_tau[motor_tau > 1.0] = 1.0

        # Since NN commands thrusts we need to convert to rot vel and back
        thrust_rot = thrust_cmds**0.5
        self.thrust_rot_damp = motor_tau * (thrust_rot - self.thrust_rot_damp) + self.thrust_rot_damp
        self.thrust_cmds_damp = self.thrust_rot_damp**2

        ## Adding noise
        thrust_noise = 0.01 * torch.randn(4, dtype=torch.float32, device=self._device)
        thrust_noise = thrust_cmds * thrust_noise
        self.thrust_cmds_damp = torch.clamp(self.thrust_cmds_damp + thrust_noise, min=0.0, max=1.0)

        thrusts = self.thrust_max * self.thrust_cmds_damp

        # thrusts given rotation
        root_quats = self.root_rot
        rot_x = quat_axis(root_quats, 0)
        rot_y = quat_axis(root_quats, 1)
        rot_z = quat_axis(root_quats, 2)
        rot_matrix = torch.cat((rot_x, rot_y, rot_z), 1).reshape(-1, 3, 3)

        force_x = torch.zeros(self._num_envs, 4, dtype=torch.float32, device=self._device)
        force_y = torch.zeros(self._num_envs, 4, dtype=torch.float32, device=self._device)
        force_xy = torch.cat((force_x, force_y), 1).reshape(-1, 4, 2)
        thrusts = thrusts.reshape(-1, 4, 1)
        thrusts = torch.cat((force_xy, thrusts), 2)

        thrusts_0 = thrusts[:, 0]
        thrusts_0 = thrusts_0[:, :, None]

        thrusts_1 = thrusts[:, 1]
        thrusts_1 = thrusts_1[:, :, None]

        thrusts_2 = thrusts[:, 2]
        thrusts_2 = thrusts_2[:, :, None]

        thrusts_3 = thrusts[:, 3]
        thrusts_3 = thrusts_3[:, :, None]

        mod_thrusts_0 = torch.matmul(rot_matrix, thrusts_0)
        mod_thrusts_1 = torch.matmul(rot_matrix, thrusts_1)
        mod_thrusts_2 = torch.matmul(rot_matrix, thrusts_2)
        mod_thrusts_3 = torch.matmul(rot_matrix, thrusts_3)

        self.thrusts[:, 0] = torch.squeeze(mod_thrusts_0)
        self.thrusts[:, 1] = torch.squeeze(mod_thrusts_1)
        self.thrusts[:, 2] = torch.squeeze(mod_thrusts_2)
        self.thrusts[:, 3] = torch.squeeze(mod_thrusts_3)

        # clear actions for reset envs
        self.thrusts[reset_env_ids] = 0

        # spin spinning rotors
        prop_rot = self.thrust_cmds_damp * self.prop_max_rot
        self.dof_vel[:, 0] = prop_rot[:, 0]
        self.dof_vel[:, 1] = -1.0 * prop_rot[:, 1]
        self.dof_vel[:, 2] = prop_rot[:, 2]
        self.dof_vel[:, 3] = -1.0 * prop_rot[:, 3]

        self._copters.set_joint_velocities(self.dof_vel)

        # apply actions
        for i in range(4):
            self._copters.physics_rotors[i].apply_forces(self.thrusts[:, i], indices=self.all_indices)

    def post_reset(self):
        thrust_max = self.grav_z * self.mass * self.thrust_to_weight * self.motor_assymetry / 4.0
        self.thrusts = torch.zeros((self._num_envs, 4, 3), dtype=torch.float32, device=self._device)
        self.thrust_cmds_damp = torch.zeros((self._num_envs, 4), dtype=torch.float32, device=self._device)
        self.thrust_rot_damp = torch.zeros((self._num_envs, 4), dtype=torch.float32, device=self._device)
        self.thrust_max = torch.tensor(thrust_max, device=self._device, dtype=torch.float32)

        self.motor_linearity = 1.0
        self.prop_max_rot = 433.3

        # CHECK HERE!
        self.target_positions = torch.zeros((self._num_envs, 3), device=self._device, dtype=torch.float32)
        self.target_positions[:, 2] = 1
        self.actions = torch.zeros((self._num_envs, 4), device=self._device, dtype=torch.float32)

        self.all_indices = torch.arange(self._num_envs, dtype=torch.int32, device=self._device)

        # Extra info
        self.extras = {}

        torch_zeros = lambda: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
        self.episode_sums = {
            "rew_pos": torch_zeros(),
            "rew_orient": torch_zeros(),
            "rew_effort": torch_zeros(),
            "rew_spin": torch_zeros(),
            "raw_dist": torch_zeros(),
            "raw_orient": torch_zeros(),
            "raw_effort": torch_zeros(),
            "raw_spin": torch_zeros(),
        }

        self.root_pos, self.root_rot = self._copters.get_world_poses()
        self.root_velocities = self._copters.get_velocities()
        self.dof_pos = self._copters.get_joint_positions()
        self.dof_vel = self._copters.get_joint_velocities()

        self.initial_red_ball_pos, self.initial_red_ball_rot = self._red_ball.get_world_poses(clone=False)
        self.initial_blue_ball_pos, self.initial_blue_ball_rot = self._blue_ball.get_world_poses(clone=False)
        self.initial_yellow_ball_pos, self.initial_yellow_ball_rot = self._yellow_ball.get_world_poses(clone=False)
        self.initial_root_pos, self.initial_root_rot = self.root_pos.clone(), self.root_rot.clone()

        # control parameters
        self.thrusts = torch.zeros((self._num_envs, 4, 3), dtype=torch.float32, device=self._device)
        self.thrust_cmds_damp = torch.zeros((self._num_envs, 4), dtype=torch.float32, device=self._device)
        self.thrust_rot_damp = torch.zeros((self._num_envs, 4), dtype=torch.float32, device=self._device)

        self.set_targets(self.all_indices)

    # !!! Modify here may cause bugs
    def set_targets(self, env_ids):
        # Adjust target positions based on the stage
        if self.stage == 1:
            # Target is the blue ball's position
            self.target_positions[:] = self._red_ball_position
        elif self.stage == 3:
            # Target is the yellow ball's position
            self.target_positions[:] = self._yellow_ball_position
        # No need to update target positions for stage 2 as it focuses on flips

    def reset_idx(self, env_ids):
        num_resets = len(env_ids)

        self.dof_pos[env_ids, :] = torch_rand_float(-0.0, 0.0, (num_resets, self._copters.num_dof), device=self._device)
        self.dof_vel[env_ids, :] = 0

        root_pos = self.initial_root_pos.clone()
        root_pos[env_ids, 0] += torch_rand_float(-0.0, 0.0, (num_resets, 1), device=self._device).view(-1)
        root_pos[env_ids, 1] += torch_rand_float(-0.0, 0.0, (num_resets, 1), device=self._device).view(-1)
        root_pos[env_ids, 2] += torch_rand_float(-0.0, 0.0, (num_resets, 1), device=self._device).view(-1)
        root_velocities = self.root_velocities.clone()
        root_velocities[env_ids] = 0

        # apply resets
        self._copters.set_joint_positions(self.dof_pos[env_ids], indices=env_ids)
        self._copters.set_joint_velocities(self.dof_vel[env_ids], indices=env_ids)

        self._copters.set_world_poses(root_pos[env_ids], self.initial_root_rot[env_ids].clone(), indices=env_ids)
        self._copters.set_velocities(root_velocities[env_ids], indices=env_ids)

        # bookkeeping
        self.reset_buf[env_ids] = 0
        self.progress_buf[env_ids] = 0

        self.thrust_cmds_damp[env_ids] = 0
        self.thrust_rot_damp[env_ids] = 0

        # fill extras
        self.extras["episode"] = {}
        for key in self.episode_sums.keys():
            self.extras["episode"][key] = torch.mean(self.episode_sums[key][env_ids]) / self._max_episode_length
            self.episode_sums[key][env_ids] = 0.0

    def evaluate_double_flip(self, initial_orientation, final_orientation):
        """
        Evaluate the drone's performance in executing two consecutive 180-degree flips.

        Args:
            initial_orientation (torch.Tensor): The drone's initial orientation as a quaternion (wxyz format).
            final_orientation (torch.Tensor): The drone's final orientation after attempting two flips, as a quaternion (wxyz format).

        Returns:
            torch.Tensor: A scalar tensor representing the reward based on the accuracy of the flips.
        """
        # Convert orientations to xyzw format for compatibility with utility functions
        initial_orientation_xyzw = wxyz2xyzw(initial_orientation)
        final_orientation_xyzw = wxyz2xyzw(final_orientation)

        # Expected change in orientation after two 180-degree flips should ideally bring the drone back to its initial orientation
        # However, due to potential inaccuracies, we assess the difference from the initial orientation
        diff_rad = quat_diff_rad(initial_orientation_xyzw, final_orientation_xyzw)

        # Reward based on the difference, with smaller differences resulting in higher rewards
        # Assuming a tolerance of pi / 10 radians (18 degrees) for the flips to be considered accurate
        tolerance_rad = np.pi / 10
        accuracy_measure = torch.exp(-10 * torch.abs(diff_rad))

        # Clamp the reward to a maximum of 1.0
        flip_reward = torch.clamp(accuracy_measure, max=1.0)

        return flip_reward

    def flips_executed_successfully(self):
        """
        Determine if two consecutive 180-degree flips were executed successfully.
        This function checks if the drone's orientation after the flips closely matches
        its initial orientation, accounting for a reasonable tolerance.
        """
        # Calculate the difference in orientation in radians
        diff_rad = quat_diff_rad(self.initial_orientation, self.root_rot)

        # Define a tolerance level for the flip accuracy
        # Considering a tolerance that accounts for slight inaccuracies in flip execution
        tolerance_rad = torch.tensor(np.pi / 18, device=self.device)  # Tolerance of 10 degrees in radians

        # Check if the orientation difference is within the tolerance
        successful_flips = diff_rad <= tolerance_rad

        # Return True if flips are successful
        return successful_flips.all()

    def calculate_flip_reward(self, root_angvels):
        """
        Calculate the reward for executing flips based on the drone's angular velocities.

        Args:
            root_angvels (torch.Tensor): The drone's angular velocities in the world frame.

        Returns:
            torch.Tensor: A scalar tensor representing the reward based on the drone's angular velocities.
        """
        # Calculate the magnitude of the angular velocities
        angvel_magnitude = torch.norm(root_angvels, dim=1)

        # Reward based on the magnitude of the angular velocities
        # Assuming a threshold of 0.5 rad/s for the drone to be considered as flipping
        flipping_reward = torch.exp(-10 * (angvel_magnitude - 0.5))

        return flipping_reward

    def update_stage(self):
        # Update the stage based on drone's current position and task progress
        # Calculate distance to the red and yellow balls
        distance_to_red = torch.norm(self._red_ball_position - self.root_pos, dim=1)
        distance_to_yellow = torch.norm(self._yellow_ball_position - self.root_pos, dim=1)

        # Transition from stage 1 to stage 2: When the drone reaches the red ball
        if self.stage == 1 and distance_to_red.min() < 0.2:
            self.stage = 2
            self.initial_orientation = self.root_rot.clone()  # Save initial orientation for flips

        # Transition from stage 2 to stage 3: Check if flips have been successfully executed
        if self.stage == 2:
            # Assuming flips_executed_successfully is a method to check if the flips are done
            if self.flips_executed_successfully():
                self.stage = 3

        # Transition from stage 3 to completion: When the drone reaches the yellow ball
        if self.stage == 3 and distance_to_yellow.min() < 0.5:
            self.stage = 4  # Assuming stage 4 signifies task completion

        # You might need to adjust the logic in flips_executed_successfully method
        # to accurately detect successful flips based on the drone's orientation change.

    def calculate_metrics(self) -> None:
        self.root_positions = self.root_pos - self._env_pos
        root_quats = self.root_rot
        root_angvels = self.root_velocities[:, 3:]
        ups = quat_axis(root_quats, 2)
        self.orient_z = ups[..., 2]
        # Distance to the current target
        if self.stage == 1:
            target_dist = torch.norm(self._red_ball_position - self.root_positions, dim=1)
        elif self.stage == 3:
            target_dist = torch.norm(self._yellow_ball_position - self.root_positions, dim=1)
        else:
            # During the flips in stage 2, distance metric is not relevant
            target_dist = torch.tensor(0.0, device=self._device)

        pos_reward = torch.exp(-0.1 * target_dist)  # Encourage moving closer to the target
        self.target_dist = target_dist
        # Orientation reward, relevant in all stages but calculated differently in stage 2
        if self.stage != 2:

            up_reward = torch.clamp(ups[..., 2], min=0.0, max=1.0)  # Encourage staying upright
        else:
            # In stage 2, the reward is based on executing flips
            flip_reward = self.evaluate_double_flip(self.initial_orientation, root_quats)
            up_reward = flip_reward

        # Effort reward to encourage efficient movement
        effort_reward = torch.exp(-0.05 * torch.square(self.actions).sum(-1))

        # Spin reward to discourage unnecessary spinning
        # Spin reward modification
        if self.stage == 2:
            # During flips, spinning is desired - modify or remove penalties for spinning
            spin_reward = self.calculate_flip_reward(root_angvels)
        else:
            # In stages 1 and 3, penalize excessive spinning
            spin = torch.square(root_angvels).sum(-1)
            spin_reward = 0.01 * torch.exp(-1.0 * spin)

        # Combined reward
        if self.stage == 2:
            self.rew_buf[:] = flip_reward  # Prioritize flip accuracy in stage 2
        else:
            self.rew_buf[:] = pos_reward + pos_reward * (up_reward + spin_reward) - effort_reward

        # Log episode reward sums and raw info for debugging
        self.extras["episode"]["rew_pos"] = pos_reward.mean().item()
        self.extras["episode"]["rew_orient"] = up_reward.mean().item()
        self.extras["episode"]["rew_effort"] = effort_reward.mean().item()
        self.extras["episode"]["rew_spin"] = spin_reward.mean().item()
        self.extras["episode"]["raw_dist"] = target_dist.mean().item()

    def is_done(self) -> None:
        ones = torch.ones_like(self.reset_buf)
        die = torch.zeros_like(self.reset_buf)

        # Update conditions for episode termination based on distance and boundaries
        die = torch.where(self.target_dist > 5.0, ones, die)
        die = torch.where(self.root_positions[..., 2] < 0.5, ones, die)
        die = torch.where(self.root_positions[..., 2] > 5.0, ones, die)
        die = torch.where(self.orient_z < 0.0, ones, die)

        # Check if the stage for all environments is 4 (completed)
        # Since self.stage is a scalar, directly comparing might not work for batched environments
        # Instead, manually set the condition for environments that have completed the task
        if self.stage == 4:
            completed = ones  # Mark all environments as completed if global stage is 4
        else:
            completed = torch.zeros_like(self.reset_buf)  # None are completed

        # Conditions for resetting environments
        # Mark for reset if the task is completed or any termination conditions are met
        self.reset_buf = torch.where(completed.bool() | (die.bool()), ones, torch.zeros_like(self.reset_buf))
Wangshengyang2004 commented 6 months ago

Prompt:

Ask GPT-4 for revision:
Constant:
• task_obs_code_string: The whole piece of code.
• task_description: The drone needs to go three stages movement. The drone starts at point (0,0,0), where the red ball locates for better visualization. For stage 1, the drone first heads straight towards a blue ball at (2,2,2). For stage 2, the drone needs to perform a two-flips rotation to turn its head 180 degrees.
Initialize:
You are a reward engineer trying to write reward functions to solve reinforcement learning tasks as effective as possible.
Your goal is to write a reward function for the environment that will help the agent learn the task described in text. And, if necessary, please also provide the modifications for other parts of the code.
Your reward function should use useful variables from the environment as inputs. As an example,
the reward function signature can be: 
    def calculate_metrics(self) -> None:
Since the reward function will be decorated with @torch.jit.script,
please make sure that the code is compatible with TorchScript (e.g., use torch tensor instead of numpy array). 
Make sure any new tensor or variable you introduce is on the same device as the input tensors. 
For quaternion conversion, you should use the official functions:
{quat_functions}
The Python environment is {task_obs_code_string}. Write a reward function for the following task: {task_description}. The current reward function is: {cur_reward_func}

The final lines of the reward function should consist of two items:
    (1) compute the total reward,
    (2) a dictionary of each individual reward component and raw information that are added into lists.
The code output should be formatted as a python code string: "```python ... ```".

Some helpful tips for writing the reward function code:
    (1) You may find it helpful to normalize the reward to a fixed range by applying transformations like torch.exp to the overall reward or its components.
    (2) If you choose to transform a reward component, then you must also introduce a temperature parameter inside the transformation function; this parameter must be a named variable in the reward function and it must not be an input variable. Each transformed reward component should have its own temperature variable. This is for tuning purpose
    (3) Make sure the type of each input variable is correctly specified; a float input variable should not be specified as torch.Tensor
    (4) Most importantly, the reward code's input variables must contain only attributes of the provided environment class definition (namely, variables that have prefix self.). Under no circumstance can you introduce new input variables.

Run into error:
Executing the reward function code above has the following error: {traceback_msg}. Please fix the bug and provide a new, improved reward function!

For improvement:
We trained a RL policy using the provided reward function code and tracked the values of the individual components in the reward function as well as global policy metrics such as success rates and episode lengths after every {epoch_freq} epochs and the maximum, mean, minimum values encountered:

Please carefully analyze the policy feedback and provide a new, improved reward function that can better solve the task. Some helpful tips for analyzing the policy feedback:
    (1) If the success rates are always near zero, then you must rewrite the entire reward function
    (2) If the values for a certain reward component are near identical throughout, then this means RL is not able to optimize this component as it is written. You may consider
        (a) Changing its scale or the value of its temperature parameter
        (b) Re-writing the reward component 
        (c) Discarding the reward component
    (3) If some reward components' magnitude is significantly larger, then you must re-scale its value to a proper range
Please analyze each existing reward component in the suggested manner above first, and then write the reward function code.