Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
main.py
Go to the documentation of this file.
1
import
re
2
import
rclpy
3
from
rclpy.node
import
Node
4
from
rclpy.executors
import
SingleThreadedExecutor
5
from
rclpy.qos
import
QoSProfile
6
from
custom_interfaces.msg
import
DataLogInfo1
7
8
import
subprocess
9
import
signal
10
import
os
11
import
time
12
import
glob
13
import
shutil
14
15
class
Supervisor
(Node):
16
def
__init__
(self):
17
super().
__init__
(
'supervisor'
)
18
self.
creation_time
= time.time()
19
20
"""
21
******************** Related to keeping nodes alive ********************
22
"""
23
self.
node_names_to_watch
= [
'ros_can'
,
'xsens_mti_ros2_driver'
]
# or xsens_mti_ros2_driver
24
self.
node_start_cmds
= {
25
'ros_can'
:
'source install/setup.bash && ros2 run ros_can ros_can'
,
26
'xsens_mti_ros2_driver'
:
'cd ~/Xsens_MTi_ROS_Driver_and_Ntrip_Client && source ./install/setup.bash && ros2 launch xsens_mti_ros2_driver xsens_mti_node.launch.py'
27
}
28
self.
nodes_being_initialized
= set()
29
30
"""
31
******************** General purpose checkup timer ********************
32
"""
33
34
self.
timer_period_milliseconds
= 10
35
self.create_timer(self.
timer_period_milliseconds
/ 1000.0, self.
check_up
check_up
)
36
37
"""
38
******************** Related to recording rosbag ********************
39
"""
40
self.
rosbag_process
=
None
41
self.
master_topic
=
'/vehicle/data_log_info_1'
42
self.
bags_dir
= os.path.join(os.getcwd(),
"bags"
)
43
os.makedirs(self.
bags_dir
, exist_ok=
True
)
# Creates if missing, does nothing if exists
44
self.get_logger().info(f
"Dir: {self.bags_dir}"
)
45
self.
record_rosbag_command
=
'source install/setup.bash && ros2 bag record -s mcap -a --exclude /lidar_points'
46
self.
mission
=
"Unknown"
47
self.
consecutive_ts_on_count
= 0
48
self.
consecutive_ts_off_count
= 0
49
self.
last_received_master_msg_time
= 0
50
51
# Subscribe to the master
52
qos_profile = QoSProfile(depth=10)
53
self.
subscription
= self.create_subscription(
54
DataLogInfo1,
55
self.
master_topic
,
56
self.
master_callback
master_callback
,
57
qos_profile
58
)
59
"""
60
******************** General purpose checkup ********************
61
"""
62
def
check_up
(self):
63
# Check if nodes are running and restart if necessary
64
active_nodes_output = subprocess.check_output([
'ros2'
,
'node'
,
'list'
], text=
True
)
65
active_nodes = set(active_nodes_output.splitlines())
66
67
for
node
in
active_nodes:
68
if
node[1:]
in
self.
nodes_being_initialized
:
69
self.
nodes_being_initialized
.remove(node[1:])
70
current_time = time.time()
71
for
name
in
self.
node_names_to_watch
:
72
if
(f
'/{name}'
not
in
active_nodes )
and
(name
not
in
self.
nodes_being_initialized
)
and
(current_time - self.
creation_time
> 10):
73
self.get_logger().warn(f
'Node "{name}" not found! Restarting...'
)
74
self.
restart_node
(name)
75
76
# Check if the rosbag process is running for too long
77
if
self.
rosbag_process
is
not
None
:
78
if
current_time - self.
last_received_master_msg_time
> 2:
79
self.get_logger().warn(
'No master message received for over 2 seconds.'
)
80
self.
stop_rosbag
()
81
82
83
"""
84
******************** Related to keeping nodes alive ********************
85
"""
86
87
def
restart_node
(self, node_name):
88
cmd = self.
node_start_cmds
[node_name]
89
subprocess.Popen(
90
[
'bash'
,
'-c'
, cmd],
91
)
92
self.
nodes_being_initialized
.add(node_name)
93
94
"""
95
******************** Related to recording rosbag ********************
96
"""
97
def
convert_mission
(self,msg):
98
if
msg.mission
is
None
:
99
self.
mission
=
"Unknown"
100
return
101
if
msg.mission == 0:
102
self.
mission
=
"Manual"
103
elif
msg.mission == 1:
104
self.
mission
=
"DV Acceleration"
105
elif
msg.mission == 2:
106
self.
mission
=
"DV Skidpad"
107
elif
msg.mission == 3:
108
self.
mission
=
"DV Autocross"
109
elif
msg.mission == 4:
110
self.
mission
=
"Trackdrive"
111
elif
msg.mission == 5:
112
self.
mission
=
"EBS Test"
113
elif
msg.mission == 6:
114
self.
mission
=
"DV Inspection"
115
elif
msg.mission == 7:
116
self.
mission
=
"None"
117
else
:
118
self.
mission
=
"Unknown"
119
120
def
master_callback
(self, msg):
121
self.
last_received_master_msg_time
= time.time()
122
if
msg.mission
is
not
None
:
123
self.
convert_mission
(msg)
124
if
self.
rosbag_process
is
None
:
125
if
(msg.ts_on):
126
self.
consecutive_ts_on_count
+= 1
127
else
:
128
self.
consecutive_ts_on_count
= 0
129
self.
consecutive_ts_off_count
= 0
130
else
:
131
if
(msg.ts_on):
132
self.
consecutive_ts_off_count
= 0
133
else
:
134
self.
consecutive_ts_off_count
+= 1
135
self.
consecutive_ts_on_count
= 0
136
if
self.
should_start_recording
(msg):
137
self.
start_recording_rosbag
()
138
elif
self.
should_stop_recording
(msg):
139
self.
stop_rosbag
()
140
141
def
should_start_recording
(self, msg):
142
if
self.
rosbag_process
is
not
None
:
143
return
False
144
return
self.
consecutive_ts_on_count
>= 2
145
146
def
should_stop_recording
(self, msg):
147
if
self.
rosbag_process
is
None
:
148
return
False
149
return
self.
consecutive_ts_off_count
>= 6
150
151
def
get_rosbag_naming
(self):
152
rosbag_dir = self.
bags_dir
153
# Regex updated to look for .mcap files instead of directories
154
pattern = re.compile(rf
"^{re.escape(self.mission)}\s+(\d+)\.mcap$"
)
155
156
max_num = 0
157
for
name
in
os.listdir(rosbag_dir):
158
match = pattern.match(name)
159
if
match:
160
num = int(match.group(1))
161
max_num = max(max_num, num)
162
163
next_num = max_num + 1
164
return
f
"{self.mission} {next_num}"
165
166
def
start_recording_rosbag
(self):
167
rosbag_name = self.
get_rosbag_naming
()
168
self.
current_rosbag_name
= rosbag_name
# Save for the stop function
169
self.
temp_bag_dir
= os.path.join(self.
bags_dir
, f
"{rosbag_name}_temp"
)
170
171
print(
"Temp save path:"
, self.
temp_bag_dir
)
172
173
self.get_logger().info(f
'Starting rosbag recording: {self.temp_bag_dir}'
)
174
cmd = f
'{self.record_rosbag_command} -o "{self.temp_bag_dir}"'
175
self.
rosbag_process
= subprocess.Popen([
'bash'
,
'-c'
, cmd])
176
177
def
stop_rosbag
(self):
178
if
self.
rosbag_process
:
179
self.get_logger().info(
'Stopping rosbag recording...'
)
180
self.
rosbag_process
.send_signal(signal.SIGINT)
181
182
# CRITICAL: Wait for ROS 2 to finish closing the file before moving it
183
self.
rosbag_process
.wait()
184
self.
rosbag_process
=
None
185
186
self.get_logger().info(
'Extracting .mcap file from temp folder...'
)
187
188
# Find the generated .mcap file inside the temp folder (e.g. name_0.mcap)
189
mcap_files = glob.glob(os.path.join(self.
temp_bag_dir
,
'*.mcap'
))
190
191
if
mcap_files:
192
original_mcap = mcap_files[0]
193
final_mcap_path = os.path.join(self.
bags_dir
, f
"{self.current_rosbag_name}.mcap"
)
194
195
# Move and rename it to the main bags directory
196
shutil.move(original_mcap, final_mcap_path)
197
self.get_logger().info(f
'Moved clean .mcap to: {final_mcap_path}'
)
198
199
# Delete the temporary ROS 2 folder (and metadata.yaml)
200
shutil.rmtree(self.
temp_bag_dir
)
201
self.get_logger().info(
'Temporary folder cleaned up.'
)
202
else
:
203
self.get_logger().error(
'No .mcap file found in the temp directory!'
)
204
205
206
def
main
(args=None):
207
rclpy.init(args=args)
208
node =
Supervisor
()
209
executor = SingleThreadedExecutor()
210
executor.add_node(node)
211
try
:
212
executor.spin()
213
finally
:
214
node.destroy_node()
215
rclpy.shutdown()
216
217
218
if
__name__ ==
'__main__'
:
219
main
()
supervisor.main.Supervisor
Definition
main.py:15
supervisor.main.Supervisor.consecutive_ts_on_count
consecutive_ts_on_count
Definition
main.py:47
supervisor.main.Supervisor.bags_dir
bags_dir
Definition
main.py:42
supervisor.main.Supervisor.restart_node
restart_node(self, node_name)
Definition
main.py:87
supervisor.main.Supervisor.temp_bag_dir
temp_bag_dir
Definition
main.py:169
supervisor.main.Supervisor.start_recording_rosbag
start_recording_rosbag(self)
Definition
main.py:166
supervisor.main.Supervisor.rosbag_process
rosbag_process
Definition
main.py:40
supervisor.main.Supervisor.master_callback
master_callback
Definition
main.py:56
supervisor.main.Supervisor.consecutive_ts_off_count
consecutive_ts_off_count
Definition
main.py:48
supervisor.main.Supervisor.subscription
subscription
Definition
main.py:53
supervisor.main.Supervisor.should_start_recording
should_start_recording(self, msg)
Definition
main.py:141
supervisor.main.Supervisor.last_received_master_msg_time
last_received_master_msg_time
Definition
main.py:49
supervisor.main.Supervisor.get_rosbag_naming
get_rosbag_naming(self)
Definition
main.py:151
supervisor.main.Supervisor.check_up
check_up(self)
Definition
main.py:62
supervisor.main.Supervisor.should_stop_recording
should_stop_recording(self, msg)
Definition
main.py:146
supervisor.main.Supervisor.timer_period_milliseconds
timer_period_milliseconds
Definition
main.py:34
supervisor.main.Supervisor.mission
mission
Definition
main.py:46
supervisor.main.Supervisor.record_rosbag_command
record_rosbag_command
Definition
main.py:45
supervisor.main.Supervisor.creation_time
creation_time
Definition
main.py:18
supervisor.main.Supervisor.current_rosbag_name
current_rosbag_name
Definition
main.py:168
supervisor.main.Supervisor.stop_rosbag
stop_rosbag(self)
Definition
main.py:177
supervisor.main.Supervisor.check_up
check_up
Definition
main.py:35
supervisor.main.Supervisor.__init__
__init__(self)
Definition
main.py:16
supervisor.main.Supervisor.master_callback
master_callback(self, msg)
Definition
main.py:120
supervisor.main.Supervisor.nodes_being_initialized
nodes_being_initialized
Definition
main.py:28
supervisor.main.Supervisor.node_start_cmds
node_start_cmds
Definition
main.py:24
supervisor.main.Supervisor.master_topic
master_topic
Definition
main.py:41
supervisor.main.Supervisor.node_names_to_watch
node_names_to_watch
Definition
main.py:23
supervisor.main.Supervisor.convert_mission
convert_mission(self, msg)
Definition
main.py:97
main
Definition
main.py:1
src
supervisor
supervisor
main.py
Generated by
1.9.8