`
import argparse
from txsim_module import *
from location_pb2 import Location
LOCATION_TOPIC = 'LOCATION'
class MyModule(SimModule):
def init(self):
super().init()
self.max_step_time = None
def on_init(self, helper):
print("========== my module on_init called. ==========")
v = helper.get_parameter('max_step')
self.max_step_time = 0
if len(v) > 0:
self.max_step_time = int(v) * 1000
print("max step time = {}s".format(v))
helper.subscribe(LOCATION_TOPIC)
def on_reset(self, helper):
print("========== my module on_reset called. ==========")
def on_step(self, helper):
location_msg = helper.get_subscribed_message(LOCATION_TOPIC)
if len(location_msg) > 0:
location = Location()
location.ParseFromString(location_msg)
print('received {} msg: {}'.format(
LOCATION_TOPIC, location))
t = helper.timestamp()
if 0 < self.max_step_time < t:
helper.stop_scenario("max step time reached!")
def on_stop(self, helper):
print("========== my module on_stop called. ==========")
if name == 'main':
parser = argparse.ArgumentParser()
parser.add_argument('--name', required=True)
args = parser.parse_args()
m = MyModule()
s = SimModuleService()
s.serve(args.name, m)
print("python sim module service exit.")
我开启了动力学模块,然后发送CONTROL_V2消息,接收LOCATION和VEHICLE_STATE消息,但是消息都是空的,为什么呢?
补充1:控制指令是生效的,可以从仿真观察出来。 补充2:试了同样的方法来获取Traffic消息,可以正常获取到 补充3:关闭动力学模块,用Perfect_Control,订阅LOCATION,也收不到任何消息
接收消息的代码如下:
` import argparse from txsim_module import * from location_pb2 import Location
LOCATION_TOPIC = 'LOCATION'
class MyModule(SimModule): def init(self): super().init() self.max_step_time = None
if name == 'main': parser = argparse.ArgumentParser() parser.add_argument('--name', required=True) args = parser.parse_args() m = MyModule() s = SimModuleService() s.serve(args.name, m) print("python sim module service exit.")
`