I've been trying to make a collaboration between human and autonomous system in a way that human can take and give back the control of the vehicle often. In order to achieve this, I've modified Manual_Control_SteeringWheel script in a way that switching between the autopilot and manual control mode is done by pressing a button on the steeringwheel (added under pygame.JOYBUTTONDOWN instead of pygame.KEYUP).
Switching between manual and autopilot has not caused any problem. However, switching from autopilot to manual control does cause an issue. However, this is a random issue, meaning that once the system sets to Autopilot, switching to manual causes the pygame to quit without a specific pattern. Sometimes it crashes after 3rd time, sometimes after 5th time and sometimes even after 2nd time. Since it neither has a pattern nor make any error (just quit the game), I have not been able to debug and find out what causing the issue. So, I'm hoping you guys (@jackbart94 , @bernatx ) know the solution
In advance, I truly appreciate your help.
CARLA Version: 0.9.10
OS: Windows
Processor: Intel i9-9900k 3.6GHz
GPU: GeForce RTX2080 Super
from __future__ import print_function
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
from carla import ColorConverter as cc
import argparse
import collections
import datetime
import logging
import math
import random
import re
import weakref
import time
if sys.version_info >= (3, 0):
from configparser import ConfigParser
else:
from ConfigParser import RawConfigParser as ConfigParser
try:
import glob
import os
import sys
import pygame
from pygame.locals import KMOD_CTRL
from pygame.locals import KMOD_SHIFT
from pygame.locals import K_0
from pygame.locals import K_9
from pygame.locals import K_BACKQUOTE
from pygame.locals import K_BACKSPACE
from pygame.locals import K_COMMA
from pygame.locals import K_DOWN
from pygame.locals import K_ESCAPE
from pygame.locals import K_F1
from pygame.locals import K_LEFT
from pygame.locals import K_PERIOD
from pygame.locals import K_RIGHT
from pygame.locals import K_SLASH
from pygame.locals import K_SPACE
from pygame.locals import K_TAB
from pygame.locals import K_UP
from pygame.locals import K_a
from pygame.locals import K_b
from pygame.locals import K_c
from pygame.locals import K_d
from pygame.locals import K_h
from pygame.locals import K_m
from pygame.locals import K_p
from pygame.locals import K_q
from pygame.locals import K_r
from pygame.locals import K_s
from pygame.locals import K_w
from pygame.locals import K_z
from pygame.locals import K_t
except ImportError:
raise RuntimeError('cannot import pygame, make sure pygame package is installed')
import datetime
import numpy as np
import math, os, sys, zipfile
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense
from sklearn.compose import ColumnTransformer, \
make_column_transformer # labelencoder class takes cat. var. and assign value to them
from sklearn.pipeline import Pipeline
from sklearn.utils import resample
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
def find_weather_presets():
rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)')
name = lambda x: ' '.join(m.group(0) for m in rgx.finditer(x))
presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)]
return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets]
def get_actor_display_name(actor, truncate=250):
name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:])
return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name
class World(object):
def __init__(self, carla_world, hud, udpsrv, actor_filter):
self.world = carla_world
self.hud = hud
self.udpsrv = udpsrv
self.player = None
self.collision_sensor = None
self.lane_invasion_sensor = None
self.gnss_sensor = None
self.camera_manager = None
self._weather_presets = find_weather_presets()
self._weather_index = 0
self._actor_filter = actor_filter
self.restart()
self.world.on_tick(hud.on_world_tick)
def restart(self):
cam_index = self.camera_manager.index if self.camera_manager is not None else 0
cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 0
# blueprint = random.choice(self.world.get_blueprint_library().filter(self._actor_filter))
blueprint_library = self.world.get_blueprint_library()
blueprint= blueprint_library.filter('model3')[0]
blueprint.set_attribute('role_name', 'hero')
if blueprint.has_attribute('color'):
color = random.choice(blueprint.get_attribute('color').recommended_values)
blueprint.set_attribute('color', color)
# Spawn the player.
if self.player is not None:
print("player is not None")
spawn_point = self.player.get_transform()
spawn_point.location.z += 2.0
spawn_point.rotation.roll = 0.0
spawn_point.rotation.pitch = 0.0
self.destroy()
self.player = self.world.try_spawn_actor(blueprint, spawn_point)
# Where the spawn point starts from, either random or specific
while self.player is None:
print("player is none")
# spawn_points = self.world.get_map().get_spawn_points()
# spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform()
# print("List of SPawn:----",spawn_points)
spawn_point = carla.Transform(carla.Location(x=128.79713867, y=-23.40604248, z=0.300000),
carla.Rotation(pitch=0.000000, yaw=179.791321, roll=0.000000))
self.player = self.world.try_spawn_actor(blueprint, spawn_point)
# Set up the sensors
self.collision_sensor = CollisionSensor(self.player, self.hud)
self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud)
self.gnss_sensor = GnssSensor(self.player)
self.camera_manager = CameraManager(self.player, self.hud)
self.camera_manager.transform_index = cam_pos_index
self.camera_manager.set_sensor(cam_index, notify=False)
actor_type = get_actor_display_name(self.player)
self.hud.notification(actor_type)
def next_weather(self, reverse=False):
self._weather_index += -1 if reverse else 1
self._weather_index %= len(self._weather_presets)
preset = self._weather_presets[self._weather_index]
self.hud.notification('Weather: %s' % preset[1])
self.player.get_world().set_weather(preset[0])
def tick(self, clock, ctrlobj):
self.hud.tick(self, clock, self.udpsrv, ctrlobj) #self.cnn,
def render(self, display):
self.camera_manager.render(display)
self.hud.render(display)
def destroy(self):
sensors = [
self.camera_manager.sensor,
self.collision_sensor.sensor,
self.lane_invasion_sensor.sensor,
self.gnss_sensor.sensor]
for sensor in sensors:
if sensor is not None:
sensor.stop()
sensor.destroy()
if self.player is not None:
self.player.destroy()
class DualControl(object):
def __init__(self, world, start_in_autopilot):
self._autopilot_enabled = start_in_autopilot
self._b_pressed = False
self._z_pressed = False
self._t_pressed = False
self.cnt = 0
if isinstance(world.player, carla.Vehicle):
self._control = carla.VehicleControl()
world.player.set_autopilot(self._autopilot_enabled)
# print("testttt", world.player.get_wheel_steer_angle())
# print("wheel Angle",carla.Vehicle.get_wheel_steer_angle(self._carla.VehicleWheelLocation))
elif isinstance(world.player, carla.Walker):
self._control = carla.WalkerControl()
self._autopilot_enabled = False
self._rotation = world.player.get_transform().rotation
else:
raise NotImplementedError("Actor type not supported")
self._steer_cache = 0.0
world.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
# initialize steering wheel
pygame.joystick.init()
joystick_count = pygame.joystick.get_count()
if joystick_count > 1:
raise ValueError("Please Connect Just One Joystick")
self._joystick = pygame.joystick.Joystick(0)
self._joystick.init()
self._parser = ConfigParser()
self._parser.read('wheel_config.ini')
self._steer_idx = int(
self._parser.get('G29 Racing Wheel', 'steering_wheel'))
self._throttle_idx = int(
self._parser.get('G29 Racing Wheel', 'throttle'))
self._brake_idx = int(self._parser.get('G29 Racing Wheel', 'brake'))
self._reverse_idx = int(self._parser.get('G29 Racing Wheel', 'reverse'))
self._handbrake_idx = int(self._parser.get('G29 Racing Wheel', 'handbrake'))
def parse_events(self, world, clock):
for event in pygame.event.get():
if event.type == pygame.QUIT:
return True
elif event.type == pygame.JOYBUTTONDOWN:
# print('event button?', event.button)
# print('instance of control?', isinstance(self._control, carla.VehicleControl))
if event.button == 0:
world.restart()
elif event.button == 1:
world.hud.toggle_info()
elif event.button == 2:
world.camera_manager.toggle_camera()
elif event.button == 3:
world.next_weather()
elif event.button == self._reverse_idx:
self._control.gear = 1 if self._control.reverse else -1
elif event.button == 23:
world.camera_manager.next_sensor()
# if isinstance(self._control, carla.VehicleControl):
elif event.button == 10: # 10 is R3 on Steering
try:
self._autopilot_enabled = not self._autopilot_enabled
if( not self._autopilot_enabled):
self.cnt = self.cnt + 1;
print("the counter value is", self.cnt);
world.player.set_autopilot(self._autopilot_enabled)
print("It's OK", self._autopilot_enabled)
world.hud.notification('Autopilot %s' % ('On' if self._autopilot_enabled else 'Off'))
except ValueError as ve:
print(ve)
print("An exception occurred")
else:
print("this is not the vehicle control", self._control)
elif event.type == pygame.KEYUP:
if self._is_quit_shortcut(event.key):
return True
elif event.key == K_BACKSPACE:
world.restart()
elif event.key == K_F1:
world.hud.toggle_info()
elif event.key == K_h or (event.key == K_SLASH and pygame.key.get_mods() & KMOD_SHIFT):
world.hud.help.toggle()
elif event.key == K_TAB:
world.camera_manager.toggle_camera()
elif event.key == K_c and pygame.key.get_mods() & KMOD_SHIFT:
world.next_weather(reverse=True)
elif event.key == K_c:
world.next_weather()
elif event.key == K_BACKQUOTE:
world.camera_manager.next_sensor()
elif event.key > K_0 and event.key <= K_9:
world.camera_manager.set_sensor(event.key - 1 - K_0)
elif event.key == K_r:
world.camera_manager.toggle_recording()
if isinstance(self._control, carla.VehicleControl):
if event.key == K_q:
self._control.gear = 1 if self._control.reverse else -1
elif event.key == K_m:
self._control.manual_gear_shift = not self._control.manual_gear_shift
self._control.gear = world.player.get_control().gear
world.hud.notification('%s Transmission' %
('Manual' if self._control.manual_gear_shift else 'Automatic'))
elif self._control.manual_gear_shift and event.key == K_COMMA:
self._control.gear = max(-1, self._control.gear - 1)
elif self._control.manual_gear_shift and event.key == K_PERIOD:
self._control.gear = self._control.gear + 1
# elif event.key == K_p:
# self._autopilot_enabled = not self._autopilot_enabled
# world.player.set_autopilot(self._autopilot_enabled)
# world.hud.notification('Autopilot %s' % ('On' if self._autopilot_enabled else 'Off'))
elif event.key == K_b:
self._b_pressed = True
elif event.key == K_z:
self._z_pressed = True
elif event.key == K_t:
self._t_pressed = True
if not self._autopilot_enabled:
if isinstance(self._control, carla.VehicleControl):
self._parse_vehicle_keys(pygame.key.get_pressed(), clock.get_time())
self._parse_vehicle_wheel()
self._control.reverse = self._control.gear < 0
elif isinstance(self._control, carla.WalkerControl):
self._parse_walker_keys(pygame.key.get_pressed(), clock.get_time())
world.player.apply_control(self._control)
def _parse_vehicle_keys(self, keys, milliseconds):
self._control.throttle = 1.0 if keys[K_UP] or keys[K_w] else 0.0
steer_increment = 5e-4 * milliseconds
if keys[K_LEFT] or keys[K_a]:
self._steer_cache -= steer_increment
elif keys[K_RIGHT] or keys[K_d]:
self._steer_cache += steer_increment
else:
self._steer_cache = 0.0
self._steer_cache = min(0.7, max(-0.7, self._steer_cache))
self._control.steer = round(self._steer_cache, 1)
self._control.brake = 1.0 if keys[K_DOWN] or keys[K_s] else 0.0
self._control.hand_brake = keys[K_SPACE]
def _parse_vehicle_wheel(self):
numAxes = self._joystick.get_numaxes()
jsInputs = [float(self._joystick.get_axis(i)) for i in range(numAxes)]
# print (jsInputs)
jsButtons = [float(self._joystick.get_button(i)) for i in
range(self._joystick.get_numbuttons())]
# Custom function to map range of inputs [1, -1] to outputs [0, 1] i.e 1 from inputs means nothing is pressed
# For the steering, it seems fine as it is
K1 = 10.0 # 0.55
steerCmd = K1 * math.tan(1.1 * jsInputs[self._steer_idx])
# rawSteer = jsInputs[self._steer_idx]
K2 = 1.6 # 1.6
throttleCmd = K2 + (2.05 * math.log10(
-0.7 * jsInputs[self._throttle_idx] + 1.4) - 1.2) / 0.92
if throttleCmd <= 0:
throttleCmd = 0
elif throttleCmd > 1:
throttleCmd = 1
brakeCmd = 1.6 + (2.05 * math.log10(
-0.7 * jsInputs[self._brake_idx] + 1.4) - 1.2) / 0.92
if brakeCmd <= 0:
brakeCmd = 0
elif brakeCmd > 1:
brakeCmd = 1
self._control.steer = steerCmd
self._control.brake = brakeCmd
self._control.throttle = throttleCmd
# toggle = jsButtons[self._reverse_idx]
self._control.hand_brake = bool(jsButtons[self._handbrake_idx])
def _parse_walker_keys(self, keys, milliseconds):
self._control.speed = 0.0
if keys[K_DOWN] or keys[K_s]:
self._control.speed = 0.0
if keys[K_LEFT] or keys[K_a]:
self._control.speed = .01
self._rotation.yaw -= 0.08 * milliseconds
if keys[K_RIGHT] or keys[K_d]:
self._control.speed = .01
self._rotation.yaw += 0.08 * milliseconds
if keys[K_UP] or keys[K_w]:
self._control.speed = 5.556 if pygame.key.get_mods() & KMOD_SHIFT else 2.778
self._control.jump = keys[K_SPACE]
self._rotation.yaw = round(self._rotation.yaw, 1)
self._control.direction = self._rotation.get_forward_vector()
@staticmethod
def _is_quit_shortcut(key):
return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
class HUD(object):
def __init__(self, width, height):
self.dim = (width, height)
font = pygame.font.Font(pygame.font.get_default_font(), 20)
font_name = 'courier' if os.name == 'nt' else 'mono'
fonts = [x for x in pygame.font.get_fonts() if font_name in x]
default_font = 'ubuntumono'
mono = default_font if default_font in fonts else fonts[0]
mono = pygame.font.match_font(mono)
self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14)
self._notifications = FadingText(font, (width, 40), (0, height - 40))
self.help = HelpText(pygame.font.Font(mono, 24), width, height)
self.server_fps = 0
self.frame = 0
self.simulation_time = 0
self._show_info = True
self._info_text = []
self._server_clock = pygame.time.Clock()
self._crosslinetype = 0
self._crosslinewait = 0
def on_world_tick(self, timestamp):
self._server_clock.tick()
self.server_fps = self._server_clock.get_fps()
self.frame = timestamp.frame
self.simulation_time = timestamp.elapsed_seconds
def tick(self, world, clock, udpsrv, ctrlobj): #cnn,
self._notifications.tick(world, clock)
if not self._show_info:
return
t = world.player.get_transform()
v = world.player.get_velocity()
c = world.player.get_control()
##
global_data['t'] = t #make sure you get the data that you needed such as: t.roatation.yaw
global_data['v'] = v #3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)
global_data['c'] = c #c.brake
global_data['manual'] = c.manual_gear_shift
###
heading = 'N' if abs(t.rotation.yaw) < 89.5 else ''
heading += 'S' if abs(t.rotation.yaw) > 90.5 else ''
heading += 'E' if 179.5 > t.rotation.yaw > 0.5 else ''
heading += 'W' if -0.5 > t.rotation.yaw > -179.5 else ''
colhist = world.collision_sensor.get_collision_history()
collision = [colhist[x + self.frame - 200] for x in range(0, 200)]
max_col = max(1.0, max(collision))
collision = [x / max_col for x in collision]
vehicles = world.world.get_actors().filter('vehicle.*')
self._info_text = [
'* INFO *',
# 'Server: % 16.0f FPS' % self.server_fps,
# 'Client: % 16.0f FPS' % clock.get_fps(),
# '',
# 'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20),
# 'Map: % 20s' % world.world.map_name,
# 'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)),
'',
'Speed: % 15.0f km/h' % (3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)),
# 'Heading:% 16.0f\N{DEGREE SIGN} % 2s' % (t.rotation.yaw, heading),
# 'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (t.location.x, t.location.y)),
# 'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)),
# 'Height: % 18.0f m' % t.location.z,
'']
if isinstance(c, carla.VehicleControl):
self._info_text += [
# ('Throttle:', c.throttle, 0.0, 1.0),
# ('Steer:', c.steer, -1.0, 1.0),
('Brake:', c.brake, 0.0, 1.0),
'',
# ('Reverse:', c.reverse),
# ('Hand brake:', c.hand_brake),
# ('Manual:', c.manual_gear_shift),
'Gear: %s' % {-1: 'R', 0: 'N'}.get(c.gear, c.gear),
'']
elif isinstance(c, carla.WalkerControl):
self._info_text += [
('Speed:', c.speed, 0.0, 5.556),
('Jump:', c.jump)]
# self._info_text += [
# '',
# 'Collision:',
# collision,
# '',
# 'Number of vehicles: % 8d' % len(vehicles)]
if len(vehicles) > 1:
# self._info_text += ['Nearby vehicles:']
distance = lambda l: math.sqrt((l.x - t.location.x)**2 + (l.y - t.location.y)**2 + (l.z - t.location.z)**2)
vehicles = [(distance(x.get_location()), x) for x in vehicles if x.id != world.player.id]
for d, vehicle in sorted(vehicles):
if d > 200.0:
break
vehicle_type = get_actor_display_name(vehicle, truncate=22)
self._info_text.append('% 4dm %s' % (d, vehicle_type))
vehicle_speed = 3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)
vehicle_throttle = c.throttle
vehicle_gearshift = int(c.manual_gear_shift)
vehicle_brake = c.brake
vehicle_collision = (collision[-1]+collision[-2]) / 2.0
if vehicle_collision > 0:
vehicle_collision = 1
vehicle_numofvehicles = len(vehicles)
vehicle_autopilot = int(ctrlobj._autopilot_enabled)
vehicle_press_b = int(ctrlobj._b_pressed) #For warnings
vehicle_press_z = int(ctrlobj._z_pressed) #For FalseAlarms
vehicle_press_t = int(ctrlobj._t_pressed) #For TakeoverAlarms
vehicle_steering = c.steer
location_x = t.location.x
location_y = t.location.y
vehicle_crosslinetype = self._crosslinetype
self._crosslinewait = self._crosslinewait + 1
if self._crosslinewait > 5:
self._crosslinetype = 0
self._crosslinewait = 0
if len(vehicles) > 0:
self._info_text += [
# '',
# '* ONLINE INFO *',
# "Speed : %.3f"%(vehicle_speed),
# "Throttle : %.2f"%(vehicle_throttle),
# "GearShift : %.2f"%(vehicle_gearshift),
# "Brake : %.2f"%(vehicle_brake),
# "Collision : %.2f"%(vehicle_collision),
# "Number of Vehicles : %d"%(vehicle_numofvehicles),
"Autopilot: %d "%(vehicle_autopilot),
# "CrossLineType : %d"%(vehicle_crosslinetype),
]
# "Throttle", "GearShift", "Speed", "Brake", "Collision", "Number of vehicles", "vec_auto", "vec_cross"
# if not cnn.isAlive:
# cnn.setDnn([vehicle_throttle, vehicle_gearshift, vehicle_speed, vehicle_brake, vehicle_numofvehicles])
# cnn.run()
if not udpsrv.isAlive:
udpsrv.setSendParam([vehicle_throttle, vehicle_speed,
vehicle_brake, vehicle_collision, vehicle_numofvehicles,
vehicle_autopilot, vehicle_crosslinetype,
vehicle_press_b,vehicle_steering,location_x,location_y,vehicle_press_z,vehicle_press_t])
udpsrv.run()
ctrlobj._b_pressed = False
ctrlobj._z_pressed = False
ctrlobj._t_pressed = False
def toggle_info(self):
self._show_info = not self._show_info
def notification(self, text, seconds=2.0):
self._notifications.set_text(text, seconds=seconds)
def error(self, text):
self._notifications.set_text('Error: %s' % text, (255, 0, 0))
def render(self, display):
if self._show_info:
info_surface = pygame.Surface((220, self.dim[1]))
info_surface.set_alpha(100)
display.blit(info_surface, (0, 0))
v_offset = 4
bar_h_offset = 100
bar_width = 106
for item in self._info_text:
if v_offset + 18 > self.dim[1]:
break
if isinstance(item, list):
if len(item) > 1:
points = [(x + 8, v_offset + 8 + (1.0 - y) * 30) for x, y in enumerate(item)]
pygame.draw.lines(display, (255, 136, 0), False, points, 2)
item = None
v_offset += 18
elif isinstance(item, tuple):
if isinstance(item[1], bool):
rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6))
pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1)
else:
rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6))
pygame.draw.rect(display, (255, 255, 255), rect_border, 1)
f = (item[1] - item[2]) / (item[3] - item[2])
if item[2] < 0.0:
rect = pygame.Rect((bar_h_offset + f * (bar_width - 6), v_offset + 8), (6, 6))
else:
rect = pygame.Rect((bar_h_offset, v_offset + 8), (f * bar_width, 6))
pygame.draw.rect(display, (255, 255, 255), rect)
item = item[0]
if item: # At this point has to be a str.
surface = self._font_mono.render(item, True, (255, 255, 255))
display.blit(surface, (8, v_offset))
v_offset += 18
self._notifications.render(display)
self.help.render(display)
class FadingText(object):
def __init__(self, font, dim, pos):
self.font = font
self.dim = dim
self.pos = pos
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
def set_text(self, text, color=(255, 255, 255), seconds=2.0):
text_texture = self.font.render(text, True, color)
self.surface = pygame.Surface(self.dim)
self.seconds_left = seconds
self.surface.fill((0, 0, 0, 0))
self.surface.blit(text_texture, (10, 11))
def tick(self, _, clock):
delta_seconds = 1e-3 * clock.get_time()
self.seconds_left = max(0.0, self.seconds_left - delta_seconds)
self.surface.set_alpha(500.0 * self.seconds_left)
def render(self, display):
display.blit(self.surface, self.pos)
class HelpText(object):
def __init__(self, font, width, height):
lines = __doc__.split('\n')
self.font = font
self.dim = (680, len(lines) * 22 + 12)
self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1])
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
self.surface.fill((0, 0, 0, 0))
for n, line in enumerate(lines):
text_texture = self.font.render(line, True, (255, 255, 255))
self.surface.blit(text_texture, (22, n * 22))
self._render = False
self.surface.set_alpha(220)
def toggle(self):
self._render = not self._render
def render(self, display):
if self._render:
display.blit(self.surface, self.pos)
class CollisionSensor(object):
def __init__(self, parent_actor, hud):
self.sensor = None
self.history = []
self._parent = parent_actor
self.hud = hud
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
def get_collision_history(self):
history = collections.defaultdict(int)
for frame, intensity in self.history:
history[frame] += intensity
return history
@staticmethod
def _on_collision(weak_self, event):
self = weak_self()
if not self:
return
actor_type = get_actor_display_name(event.other_actor)
self.hud.notification('Collision with %r' % actor_type)
impulse = event.normal_impulse
intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)
self.history.append((event.frame, intensity))
if len(self.history) > 4000:
self.history.pop(0)
class LaneInvasionSensor(object):
def __init__(self, parent_actor, hud):
self.sensor = None
self._parent = parent_actor
self.hud = hud
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.lane_invasion')
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event))
@staticmethod
def _on_invasion(weak_self, event):
self = weak_self()
if not self:
return
lane_types = set(x.type for x in event.crossed_lane_markings)
text = ['%r' % str(x).split()[-1] for x in lane_types]
self.hud.notification('Crossed line %s' % ' and '.join(text))
if text[0] == "'Broken'":
self.hud._crosslinetype = 1
elif text[0] == "'Solid'":
self.hud._crosslinetype = 2
elif text[0] == "'BrokenSolid'" or text[0] == "'SolidBroken'":
self.hud._crosslinetype = 3
elif text[0] == "'SolidSolid'":
self.hud._crosslinetype = 4
else:
self.hud._crosslinetype = 0
class GnssSensor(object):
def __init__(self, parent_actor):
self.sensor = None
self._parent = parent_actor
self.lat = 0.0
self.lon = 0.0
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.gnss')
self.sensor = world.spawn_actor(bp, carla.Transform(carla.Location(x=1.0, z=2.8)), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event))
@staticmethod
def _on_gnss_event(weak_self, event):
self = weak_self()
if not self:
return
self.lat = event.latitude
self.lon = event.longitude
class CameraManager(object):
def __init__(self, parent_actor, hud):
self.sensor = None
self.surface = None
self._parent = parent_actor
self.hud = hud
self.recording = False
self._camera_transforms = [
carla.Transform(carla.Location(x=0.45, y=-0.4, z=1.2), carla.Rotation(pitch=-1)),
carla.Transform(carla.Location(x=1.6, z=1.7))]
self.transform_index = 1
self.sensors = [
['sensor.camera.rgb', cc.Raw, 'Camera RGB'],
['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)'],
['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)'],
['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)'],
['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)'],
['sensor.camera.semantic_segmentation', cc.CityScapesPalette,
'Camera Semantic Segmentation (CityScapes Palette)'],
['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)']]
world = self._parent.get_world()
bp_library = world.get_blueprint_library()
for item in self.sensors:
bp = bp_library.find(item[0])
if item[0].startswith('sensor.camera'):
bp.set_attribute('image_size_x', str(hud.dim[0]))
bp.set_attribute('image_size_y', str(hud.dim[1]))
elif item[0].startswith('sensor.lidar'):
bp.set_attribute('range', '50')
item.append(bp)
self.index = None
def toggle_camera(self):
self.transform_index = (self.transform_index + 1) % len(self._camera_transforms)
self.sensor.set_transform(self._camera_transforms[self.transform_index])
def set_sensor(self, index, notify=True):
index = index % len(self.sensors)
needs_respawn = True if self.index is None \
else self.sensors[index][0] != self.sensors[self.index][0]
if needs_respawn:
if self.sensor is not None:
self.sensor.destroy()
self.surface = None
self.sensor = self._parent.get_world().spawn_actor(
self.sensors[index][-1],
self._camera_transforms[self.transform_index],
attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid
# circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image))
if notify:
self.hud.notification(self.sensors[index][2])
self.index = index
def next_sensor(self):
self.set_sensor(self.index + 1)
def toggle_recording(self):
self.recording = not self.recording
self.hud.notification('Recording %s' % ('On' if self.recording else 'Off'))
def render(self, display):
if self.surface is not None:
display.blit(self.surface, (0, 0))
@staticmethod
def _parse_image(weak_self, image):
self = weak_self()
if not self:
return
if self.sensors[self.index][0].startswith('sensor.lidar'):
points = np.frombuffer(image.raw_data, dtype=np.dtype('f4'))
points = np.reshape(points, (int(points.shape[0] / 4), 4))
lidar_data = np.array(points[:, :2])
lidar_data *= min(self.hud.dim) / 100.0
lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1])
lidar_data = np.fabs(lidar_data) # pylint: disable=E1111
lidar_data = lidar_data.astype(np.int32)
lidar_data = np.reshape(lidar_data, (-1, 2))
lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3)
lidar_img = np.zeros(lidar_img_size)
lidar_img[tuple(lidar_data.T)] = (255, 255, 255)
self.surface = pygame.surfarray.make_surface(lidar_img)
else:
image.convert(self.sensors[self.index][1])
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
array = np.reshape(array, (image.height, image.width, 4))
array = array[:, :, :3]
array = array[:, :, ::-1]
self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
if self.recording:
image.save_to_disk('_out/%08d' % image.frame)
def game_loop(args):
pygame.init()
pygame.font.init()
world = None
try:
client = carla.Client(args.host, args.port)
client.set_timeout(2.0)
# cnn = ThreadForDNN()
udpsrv = ThreadForUdpServer()
display = pygame.display.set_mode(
(args.width, args.height),
pygame.HWSURFACE | pygame.DOUBLEBUF)
hud = HUD(args.width, args.height)
world = World(client.get_world(), hud, udpsrv, args.filter) # cnn,
controller = DualControl(world, args.autopilot)
clock = pygame.time.Clock()
while True:
clock.tick_busy_loop(30)
if controller.parse_events(world, clock):
return
world.tick(clock, controller)
world.render(display)
pygame.display.flip()
finally:
if world is not None:
world.destroy()
pygame.quit()
class ThreadForUdpServer:
def __init__(self, UDP_IP = "127.0.0.1", UDP_PORT = 8009):
self.UDP_IP = UDP_IP
self.UDP_PORT = UDP_PORT
self.sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM)
self.paramList = None
self.isAlive = False
def setSendParam(self, paramlist):
if paramlist is None:
return
self.paramList = paramlist
def SendData(self):
self.isAlive = True
sndStr = ""
for itm in self.paramList:
sndStr = sndStr + str(itm) + ";"
sndStr = sndStr[:-1]
self.sock.sendto(sndStr.encode('ascii'), (self.UDP_IP, self.UDP_PORT))
time.sleep(0.1)
self.isAlive = False
def run(self):
thr = threading.Thread(target=self.SendData)
thr.start()
def main():
argparser = argparse.ArgumentParser(
description='CARLA Manual Control Client')
argparser.add_argument(
'-v', '--verbose',
action='store_true',
dest='debug',
help='print debug information')
argparser.add_argument(
'--host',
metavar='H',
default='127.0.0.1',
help='IP of the host server (default: 127.0.0.1)')
argparser.add_argument(
'-p', '--port',
metavar='P',
default=2000,
type=int,
help='TCP port to listen to (default: 2000)')
argparser.add_argument(
'-a', '--autopilot',
action='store_true',
help='enable autopilot')
argparser.add_argument(
'--res',
metavar='WIDTHxHEIGHT',
default='1280x720',
help='window resolution (default: 1280x720)')
argparser.add_argument(
'--filter',
metavar='PATTERN',
default='vehicle.*',
help='actor filter (default: "vehicle.*")')
args = argparser.parse_args()
args.width, args.height = [int(x) for x in args.res.split('x')]
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
logging.info('listening to server %s:%s', args.host, args.port)
print(__doc__)
try:
game_loop(args)
except KeyboardInterrupt:
print('\nCancelled by user. Bye!')
if __name__ == '__main__':
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Hi everyone,
I've been trying to make a collaboration between human and autonomous system in a way that human can take and give back the control of the vehicle often. In order to achieve this, I've modified Manual_Control_SteeringWheel script in a way that switching between the autopilot and manual control mode is done by pressing a button on the steeringwheel (added under pygame.JOYBUTTONDOWN instead of pygame.KEYUP).
Switching between manual and autopilot has not caused any problem. However, switching from autopilot to manual control does cause an issue. However, this is a random issue, meaning that once the system sets to Autopilot, switching to manual causes the pygame to quit without a specific pattern. Sometimes it crashes after 3rd time, sometimes after 5th time and sometimes even after 2nd time. Since it neither has a pattern nor make any error (just quit the game), I have not been able to debug and find out what causing the issue. So, I'm hoping you guys (@jackbart94 , @bernatx ) know the solution
In advance, I truly appreciate your help.
CARLA Version: 0.9.10 OS: Windows Processor: Intel i9-9900k 3.6GHz GPU: GeForce RTX2080 Super