Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
main.py
Go to the documentation of this file.
1import re
2import rclpy
3from rclpy.node import Node
4from rclpy.executors import SingleThreadedExecutor
5from rclpy.qos import QoSProfile
6from custom_interfaces.msg import DataLogInfo1
7
8import subprocess
9import signal
10import os
11import time
12import glob
13import shutil
14
15class Supervisor(Node):
16 def __init__(self):
17 super().__init__('supervisor')
18 self.creation_time = time.time()
19
20 """
21 ******************** Related to keeping nodes alive ********************
22 """
23 self.node_names_to_watch = ['ros_can', 'xsens_mti_ros2_driver'] # or xsens_mti_ros2_driver
25 'ros_can': 'source install/setup.bash && ros2 run ros_can ros_can',
26 'xsens_mti_ros2_driver': 'cd ~/Xsens_MTi_ROS_Driver_and_Ntrip_Client && source ./install/setup.bash && ros2 launch xsens_mti_ros2_driver xsens_mti_node.launch.py'
27 }
29
30 """
31 ******************** General purpose checkup timer ********************
32 """
33
35 self.create_timer(self.timer_period_milliseconds / 1000.0, self.check_upcheck_up)
36
37 """
38 ******************** Related to recording rosbag ********************
39 """
40 self.rosbag_process = None
41 self.master_topic = '/vehicle/data_log_info_1'
42 self.bags_dir = os.path.join(os.getcwd(), "bags")
43 os.makedirs(self.bags_dir, exist_ok=True) # Creates if missing, does nothing if exists
44 self.get_logger().info(f"Dir: {self.bags_dir}")
45 self.record_rosbag_command = 'source install/setup.bash && ros2 bag record -s mcap -a --exclude /lidar_points'
46 self.mission = "Unknown"
50
51 # Subscribe to the master
52 qos_profile = QoSProfile(depth=10)
53 self.subscription = self.create_subscription(
54 DataLogInfo1,
55 self.master_topic,
57 qos_profile
58 )
59 """
60 ******************** General purpose checkup ********************
61 """
62 def check_up(self):
63 # Check if nodes are running and restart if necessary
64 active_nodes_output = subprocess.check_output(['ros2', 'node', 'list'], text=True)
65 active_nodes = set(active_nodes_output.splitlines())
66
67 for node in active_nodes:
68 if node[1:] in self.nodes_being_initialized:
69 self.nodes_being_initialized.remove(node[1:])
70 current_time = time.time()
71 for name in self.node_names_to_watch:
72 if (f'/{name}' not in active_nodes ) and (name not in self.nodes_being_initialized) and (current_time - self.creation_time > 10):
73 self.get_logger().warn(f'Node "{name}" not found! Restarting...')
74 self.restart_node(name)
75
76 # Check if the rosbag process is running for too long
77 if self.rosbag_process is not None:
78 if current_time - self.last_received_master_msg_time > 2:
79 self.get_logger().warn('No master message received for over 2 seconds.')
80 self.stop_rosbag()
81
82
83 """
84 ******************** Related to keeping nodes alive ********************
85 """
86
87 def restart_node(self, node_name):
88 cmd = self.node_start_cmds[node_name]
89 subprocess.Popen(
90 ['bash', '-c', cmd],
91 )
92 self.nodes_being_initialized.add(node_name)
93
94 """
95 ******************** Related to recording rosbag ********************
96 """
97 def convert_mission(self,msg):
98 if msg.mission is None:
99 self.mission = "Unknown"
100 return
101 if msg.mission == 0:
102 self.mission = "Manual"
103 elif msg.mission == 1:
104 self.mission = "DV Acceleration"
105 elif msg.mission == 2:
106 self.mission = "DV Skidpad"
107 elif msg.mission == 3:
108 self.mission = "DV Autocross"
109 elif msg.mission == 4:
110 self.mission = "Trackdrive"
111 elif msg.mission == 5:
112 self.mission = "EBS Test"
113 elif msg.mission == 6:
114 self.mission = "DV Inspection"
115 elif msg.mission == 7:
116 self.mission = "None"
117 else:
118 self.mission = "Unknown"
119
120 def master_callback(self, msg):
121 self.last_received_master_msg_time = time.time()
122 if msg.mission is not None:
123 self.convert_mission(msg)
124 if self.rosbag_process is None:
125 if (msg.ts_on):
127 else:
130 else:
131 if (msg.ts_on):
133 else:
136 if self.should_start_recording(msg):
138 elif self.should_stop_recording(msg):
139 self.stop_rosbag()
140
142 if self.rosbag_process is not None:
143 return False
144 return self.consecutive_ts_on_count >= 2
145
146 def should_stop_recording(self, msg):
147 if self.rosbag_process is None:
148 return False
149 return self.consecutive_ts_off_count >= 6
150
152 rosbag_dir = self.bags_dir
153 # Regex updated to look for .mcap files instead of directories
154 pattern = re.compile(rf"^{re.escape(self.mission)}\s+(\d+)\.mcap$")
155
156 max_num = 0
157 for name in os.listdir(rosbag_dir):
158 match = pattern.match(name)
159 if match:
160 num = int(match.group(1))
161 max_num = max(max_num, num)
162
163 next_num = max_num + 1
164 return f"{self.mission} {next_num}"
165
167 rosbag_name = self.get_rosbag_naming()
168 self.current_rosbag_name = rosbag_name # Save for the stop function
169 self.temp_bag_dir = os.path.join(self.bags_dir, f"{rosbag_name}_temp")
170
171 print("Temp save path:", self.temp_bag_dir)
172
173 self.get_logger().info(f'Starting rosbag recording: {self.temp_bag_dir}')
174 cmd = f'{self.record_rosbag_command} -o "{self.temp_bag_dir}"'
175 self.rosbag_process = subprocess.Popen(['bash', '-c', cmd])
176
177 def stop_rosbag(self):
178 if self.rosbag_process:
179 self.get_logger().info('Stopping rosbag recording...')
180 self.rosbag_process.send_signal(signal.SIGINT)
181
182 # CRITICAL: Wait for ROS 2 to finish closing the file before moving it
183 self.rosbag_process.wait()
184 self.rosbag_process = None
185
186 self.get_logger().info('Extracting .mcap file from temp folder...')
187
188 # Find the generated .mcap file inside the temp folder (e.g. name_0.mcap)
189 mcap_files = glob.glob(os.path.join(self.temp_bag_dir, '*.mcap'))
190
191 if mcap_files:
192 original_mcap = mcap_files[0]
193 final_mcap_path = os.path.join(self.bags_dir, f"{self.current_rosbag_name}.mcap")
194
195 # Move and rename it to the main bags directory
196 shutil.move(original_mcap, final_mcap_path)
197 self.get_logger().info(f'Moved clean .mcap to: {final_mcap_path}')
198
199 # Delete the temporary ROS 2 folder (and metadata.yaml)
200 shutil.rmtree(self.temp_bag_dir)
201 self.get_logger().info('Temporary folder cleaned up.')
202 else:
203 self.get_logger().error('No .mcap file found in the temp directory!')
204
205
206def main(args=None):
207 rclpy.init(args=args)
208 node = Supervisor()
209 executor = SingleThreadedExecutor()
210 executor.add_node(node)
211 try:
212 executor.spin()
213 finally:
214 node.destroy_node()
215 rclpy.shutdown()
216
217
218if __name__ == '__main__':
219 main()
restart_node(self, node_name)
Definition main.py:87
should_start_recording(self, msg)
Definition main.py:141
should_stop_recording(self, msg)
Definition main.py:146
master_callback(self, msg)
Definition main.py:120
convert_mission(self, msg)
Definition main.py:97
Definition main.py:1