Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
fsds_adapter.py
Go to the documentation of this file.
1from evaluator.adapter import Adapter
2import message_filters
3from nav_msgs.msg import Odometry
4from geometry_msgs.msg import Quaternion
5import numpy as np
6import datetime
7from sensor_msgs.msg import PointCloud2
8from custom_interfaces.msg import ConeArray, VehicleState, PathPointArray
9from evaluator.formats import (
10 format_vehicle_state_msg,
11 format_cone_array_msg,
12 format_nav_odometry_msg,
13 format_path_point_array_msg,
14 get_blue_and_yellow_cones_after_msg_treatment,
15)
16import rclpy
17
18
20 """!
21 Adapter class for adapting/synchronizing computations' data with ground truth when using the FSDS simulator.
22 """
23
24 def __init__(self, node: rclpy.node.Node):
25 """!
26 Initializes the FSDSAdapter.
27
28 Args:
29 node (Node): ROS2 node instance.
30 point_cloud_topic (str): Topic for point cloud data.
31 """
32 super().__init__(node)
33
34 # Subscription to odometry messages
35 self.node.odometry_subscription_ = message_filters.Subscriber(
36 self.node,
37 Odometry,
38 "/testing_only/odom",
39 )
40
41 self.node.odometry_ground_truth_subscription = self.node.create_subscription(
42 Odometry,
43 "/testing_only/odom",
45 10,
46 )
47
48 # ApproximateTimeSynchronizer for synchronizing perception, point cloud, and odometry messages
49 self._perception_sync_ = message_filters.ApproximateTimeSynchronizer(
50 [
51 self.node.perception_subscription_,
52 self.node.point_cloud_subscription_,
53 self.node.odometry_subscription_,
54 ],
55 10,
56 0.1,
57 )
58 self._state_estimation_sync_ = message_filters.ApproximateTimeSynchronizer(
59 [
60 self.node.vehicle_state_subscription_,
61 self.node.map_subscription_,
62 self.node.odometry_subscription_,
63 ],
64 10,
65 0.1,
66 )
67 self._planning_time_sync_ = message_filters.ApproximateTimeSynchronizer(
68 [
69 self.node.planning_subscription_,
70 self.node.planning_gt_subscription_,
71 ],
72 10,
73 0.5,
74 )
75
79
80 # List to store track data
81 self.track = []
82
83 # Read track data from a CSV file
84 self.read_track("src/evaluator/evaluator/tracks/track_droneport.csv")
85
87 self, perception: ConeArray, point_cloud: PointCloud2, odometry: Odometry
88 ):
89 """!
90 Callback function to process synchronized messages and compute perception metrics.
91
92 Args:
93 perception (ConeArray): Cone array perception data.
94 point_cloud (PointCloud2): Point cloud data.
95 odometry (Odometry): Odometry data.
96 """
97
98 perception_ground_truth: np.ndarray = self.create_perception_ground_truth(
99 odometry
100 )
101 perception_output: np.ndarray = format_cone_array_msg(perception)
102 self.node.compute_and_publish_perception(
103 perception_output, perception_ground_truth
104 )
105
106 def odometry_callback(self, odometry: Odometry):
107 """!
108 Callback function to mark the planning's initial timestamp
109
110 Args:
111 odometry (Odometry): Behicle's odometry information.
112 """
113
114 if self.node.use_simulated_se_:
115 self.node.map_receive_time_ = datetime.datetime.now()
116
118 self, vehicle_state: VehicleState, map: ConeArray, odometry: Odometry
119 ):
120 """!
121 Callback function to process synchronized messages and compute perception metrics.
122
123 Args:
124 vehicle_state (VehicleState): Vehicle state estimation message.
125 map (ConeArray): Cone array message.
126 odometry (Odometry): Odometry message.
127 """
128 pose_treated, velociies_treated = format_vehicle_state_msg(vehicle_state)
129 map_treated: np.ndarray = format_cone_array_msg(map)
130 groundtruth_pose_treated: np.ndarray = format_nav_odometry_msg(odometry)
131 groundtruth_map_treated: np.ndarray = np.array(self.track)
132 empty_groundtruth_velocity_treated = np.array([0, 0, 0])
133 self.node.compute_and_publish_state_estimation(
134 pose_treated,
135 groundtruth_pose_treated,
136 velociies_treated,
137 empty_groundtruth_velocity_treated,
138 map_treated,
139 groundtruth_map_treated,
140 )
141
143 self,
144 planning_output: PathPointArray,
145 path_ground_truth: PathPointArray,
146 ):
147 """!
148 Callback function to process synchronized messages and compute planning metrics.
149
150 Args:
151 planning_output (PathPointArray): Planning output.
152 path_ground_truth (PathPointArray): Groundtruth path message.
153 """
154 map_ground_truth_treated: np.ndarray = format_nav_odometry_msg(self.track)
155 blue_cones, yellow_cones = get_blue_and_yellow_cones_after_msg_treatment(
156 map_ground_truth_treated
157 )
158 planning_output_treated: np.ndarray = format_path_point_array_msg(
159 planning_output
160 )
161 path_ground_truth_treated: np.ndarray = format_path_point_array_msg(
162 path_ground_truth
163 )
164 self.node.compute_and_publish_planning(
165 planning_output_treated,
166 path_ground_truth_treated,
167 blue_cones,
168 yellow_cones,
169 )
170
171 def create_perception_ground_truth(self, odometry: Odometry) -> list:
172 """!
173 Creates ground truth from odometry data and a predefined track.
174
175 Args:
176 odometry (Odometry): Odometry data.
177
178 Returns:
179 list: List of ground truth cone positions (relative positions to the car).
180 """
181 rotation_matrix = FSDSAdapter.quaternion_to_rotation_matrix(
182 odometry.pose.pose.orientation
183 )
184 perception_ground_truth = []
185
186 for cone in self.track:
187 cone_position = np.array([cone[0], cone[1], 0])
188 transformed_position = np.dot(rotation_matrix, cone_position) + np.array(
189 [odometry.pose.pose.position.x, odometry.pose.pose.position.y, 0]
190 )
191
192 perception_ground_truth.append(np.append(transformed_position[:2], 0))
193
194 return np.array(perception_ground_truth)
195
196 def read_track(self, filename: str) -> None:
197 """!
198 Reads track data from a CSV file.
199
200 Args:
201 filename (str): Path to the CSV file containing track data.
202 """
203 with open(filename, "r") as file:
204 for line in file:
205 self.track.append(self.parse_track_cone(line.strip()))
206
207 @staticmethod
208 def parse_track_cone(line: str) -> np.ndarray:
209 """!
210 Parses a line from the track CSV file to extract cone data.
211
212 Args:
213 line (str): A line from the track CSV file.
214
215 Returns:
216 numpy.ndarray: Array containing cone position data.
217 """
218 cone_color_dictionary: dict[str, int] = {
219 "blue": 0,
220 "yellow": 1,
221 "orange": 2,
222 "big_orange": 3,
223 "unknown": 4,
224 }
225 words = line.split(",")
226 y = float(words[1])
227 x = float(words[2])
228 color = cone_color_dictionary[words[0]]
229 return np.array([x, y, color, 1]) # 4 for unknown color
230
231 @staticmethod
232 def quaternion_to_rotation_matrix(quaternion: Quaternion) -> np.ndarray:
233 """!
234 Converts quaternion to a rotation matrix.
235
236 Args:
237 quaternion (Quaternion): Quaternion representing rotation.
238
239 Returns:
240 numpy.ndarray: Rotation matrix.
241 """
242 w, x, y, z = quaternion.w, quaternion.x, quaternion.y, quaternion.z
243
244 # First row of the rotation matrix
245 r00 = 1 - 2 * (y**2 + z**2)
246 r01 = 2 * (x * y - z * w)
247 r02 = 2 * (x * z + y * w)
248
249 # Second row of the rotation matrix
250 r10 = 2 * (x * y + z * w)
251 r11 = 1 - 2 * (x**2 + z**2)
252 r12 = 2 * (y * z - x * w)
253
254 # Third row of the rotation matrix
255 r20 = 2 * (x * z - z * w)
256 r21 = 2 * (y * z + x * w)
257 r22 = 1 - 2 * (x**2 + y**2)
258
259 rot_matrix = np.array([[r00, r01, r02], [r10, r11, r12], [r20, r21, r22]])
260 return rot_matrix
Class for subscribing to a point cloud topic and synchronizing messages with a ROS2 node.
Definition adapter.py:7
Adapter class for adapting/synchronizing computations' data with ground truth when using the FSDS sim...
__init__(self, rclpy.node.Node node)
Initializes the FSDSAdapter.
perception_callback(self, ConeArray perception, PointCloud2 point_cloud, Odometry odometry)
Callback function to process synchronized messages and compute perception metrics.
state_estimation_callback(self, VehicleState vehicle_state, ConeArray map, Odometry odometry)
Callback function to process synchronized messages and compute perception metrics.
np.ndarray parse_track_cone(str line)
Parses a line from the track CSV file to extract cone data.
planning_callback(self, PathPointArray planning_output, PathPointArray path_ground_truth)
Callback function to process synchronized messages and compute planning metrics.
list create_perception_ground_truth(self, Odometry odometry)
Creates ground truth from odometry data and a predefined track.
np.ndarray quaternion_to_rotation_matrix(Quaternion quaternion)
Converts quaternion to a rotation matrix.
None read_track(self, str filename)
Reads track data from a CSV file.
odometry_callback(self, Odometry odometry)
Callback function to mark the planning's initial timestamp.