Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
eufs_adapter.py
Go to the documentation of this file.
1from evaluator.adapter import Adapter
2from eufs_msgs.msg import ConeArrayWithCovariance, CarState
3from nav_msgs.msg import Odometry
4from custom_interfaces.msg import ConeArray, VehicleState, PathPointArray
5import rclpy
6import datetime
7import message_filters
8import numpy as np
9from evaluator.formats import (
10 format_cone_array_msg,
11 format_vehicle_state_msg,
12 format_eufs_cone_array_with_covariance_msg,
13 format_nav_odometry_msg,
14 format_car_state_msg,
15 format_path_point_array_msg,
16 get_blue_and_yellow_cones_after_msg_treatment,
17)
18from math import sqrt
19
20
22 """!
23 Adapter class for subscribing to EUFS topics
24 """
25
26 def __init__(self, node: rclpy.node.Node):
27 """!
28 Initializes the EUFS Adapter.
29
30 Args:
31 node (Node): ROS2 node instance.
32 """
33
34 super().__init__(node)
36 self.groundtruth_map_ = None
38 self.node.groundtruth_map_subscription_ = self.node.create_subscription(
39 ConeArrayWithCovariance,
40 "/ground_truth/track",
42 10,
43 )
44
45 self.node.groundtruth_state_subscription_ = self.node.create_subscription(
46 Odometry,
47 "/ground_truth/odom",
49 10,
50 )
51
52 self.node.simulated_state_subscription = self.node.create_subscription(
53 CarState,
54 "/odometry_integration/car_state",
56 10,
57 )
58
59 self._se_time_sync_ = message_filters.ApproximateTimeSynchronizer(
60 [
61 self.node.vehicle_state_subscription_,
62 self.node.map_subscription_,
63 ],
64 10,
65 0.5,
66 )
67
69
70 self.node.groundtruth_perception_subscription_ = message_filters.Subscriber(
71 self.node, ConeArrayWithCovariance, "/ground_truth/cones"
72 )
73 self._perception_time_sync_ = message_filters.ApproximateTimeSynchronizer(
74 [
75 self.node.perception_subscription_,
76 self.node.groundtruth_perception_subscription_,
77 ],
78 10,
79 0.1,
80 )
81
83
84 self._planning_time_sync_ = message_filters.ApproximateTimeSynchronizer(
85 [
86 self.node.planning_subscription_,
87 self.node.planning_gt_subscription_,
88 ],
89 10,
90 0.5,
91 )
92
94
95 def simulated_vehicle_state_callback(self, msg: CarState):
96 """!
97 Callback function to mark the initial timestamp of the control execution
98
99 Args:
100 msg (CarState): Car state coming from EUFS simulator
101 """
102 self.simulated_vehicle_state_ = msg
103
105 self,
106 vehicle_state: VehicleState,
107 map: ConeArray,
108 ):
109 """!
110 Callback function to process synchronized messages and compute state estimation metrics.
111
112 Args:
113 vehicle_state (VehicleState): Vehicle state estimation message.
114 map (ConeArray): Cone array message.
115 """
116 if (
117 self.groundtruth_vehicle_state_ is None
118 or self.groundtruth_map_ is None
119 or self.simulated_vehicle_state_ is None
120 ):
121 return
122 pose_treated, velocities_treated = format_vehicle_state_msg(vehicle_state)
123 map_treated: np.ndarray = format_cone_array_msg(map)
124 groundtruth_pose_treated, groundtruth_velocity_treated = (
125 format_nav_odometry_msg(self.groundtruth_vehicle_state_)
126 )
127 groundtruth_map_treated: np.ndarray = (
128 format_eufs_cone_array_with_covariance_msg(self.groundtruth_map_)
129 )
130 self.node.compute_and_publish_state_estimation(
131 pose_treated,
132 groundtruth_pose_treated,
133 velocities_treated,
134 groundtruth_velocity_treated,
135 map_treated,
136 groundtruth_map_treated,
137 )
138
140 self, perception_output: ConeArray, ground_truth: ConeArrayWithCovariance
141 ):
142 """!
143 Callback function to process synchronized messages and compute perception metrics.
144
145 Args:
146 perception_output (ConeArray): Perception Output.
147 """
148
149 perception_treated: np.ndarray = format_cone_array_msg(perception_output)
150
151 groundtruth_perception_treated: np.ndarray = (
152 format_eufs_cone_array_with_covariance_msg(ground_truth)
153 )
154
155 self.node.compute_and_publish_perception(
156 perception_treated, groundtruth_perception_treated
157 )
158
160 self,
161 planning_output: PathPointArray,
162 path_ground_truth: PathPointArray,
163 ):
164 """!
165 Callback function to process synchronized messages and compute planning metrics.
166
167 Args:
168 map_ground_truth (ConeArrayWithCovariance): Groundtruth map message.
169 planning_output (PathPointArray): Planning output.
170 path_ground_truth (PathPointArray): Groundtruth path message.
171 """
172 if self.groundtruth_map_ is None:
173 return
174 map_ground_truth_treated: np.ndarray = (
175 format_eufs_cone_array_with_covariance_msg(self.groundtruth_map_)
176 )
177 self.node.get_logger().debug(
178 "treated gt with size: %d" % len(map_ground_truth_treated)
179 )
180 blue_cones, yellow_cones = get_blue_and_yellow_cones_after_msg_treatment(
181 map_ground_truth_treated
182 )
183 self.node.get_logger().debug(
184 "n blue cones: %d; n yellow cones : %d"
185 % (len(blue_cones), len(yellow_cones))
186 )
187 planning_output_treated: np.ndarray = format_path_point_array_msg(
188 planning_output
189 )
190 path_ground_truth_treated: np.ndarray = format_path_point_array_msg(
191 path_ground_truth
192 )
193 self.node.compute_and_publish_planning(
194 planning_output_treated, path_ground_truth_treated, blue_cones, yellow_cones
195 )
196
197 def groundtruth_map_callback(self, track: ConeArrayWithCovariance):
198 """!
199 Callback function to process groundtruth map messages.
200
201 Args:
202 track (ConeArrayWithCovariance): Groundtruth track data.
203 """
204 self.node.get_logger().debug(
205 "Received groundtruth map with size: %d" % len(track.blue_cones)
206 )
207 self.groundtruth_map_ = track
208
209 def groundtruth_vehicle_state_callback(self, vehicle_state: Odometry):
210 """!
211 Callback function to process groundtruth vehicle_state messages.
212
213 Args:
214 vehicle_state (Odometry): Groundtruth vehicle_state data.
215 """
216 self.node.get_logger().debug("Received groundtruth vehicle state")
217 self.groundtruth_vehicle_state_ = vehicle_state
218 if self.node.use_simulated_se_:
219 self.node.position = [
220 vehicle_state.pose.pose.position.x,
221 vehicle_state.pose.pose.position.y,
222 ]
223
224 if self.node.use_simulated_velocities_:
225 self.node.absolute_velocity = sqrt(
226 vehicle_state.twist.twist.linear.x**2
227 + vehicle_state.twist.twist.linear.y**2
228 )
Class for subscribing to a point cloud topic and synchronizing messages with a ROS2 node.
Definition adapter.py:7
Adapter class for subscribing to EUFS topics.
perception_callback(self, ConeArray perception_output, ConeArrayWithCovariance ground_truth)
Callback function to process synchronized messages and compute perception metrics.
groundtruth_map_callback(self, ConeArrayWithCovariance track)
Callback function to process groundtruth map messages.
simulated_vehicle_state_callback(self, CarState msg)
Callback function to mark the initial timestamp of the control execution.
state_estimation_callback(self, VehicleState vehicle_state, ConeArray map)
Callback function to process synchronized messages and compute state estimation metrics.
groundtruth_vehicle_state_callback(self, Odometry vehicle_state)
Callback function to process groundtruth vehicle_state messages.
planning_callback(self, PathPointArray planning_output, PathPointArray path_ground_truth)
Callback function to process synchronized messages and compute planning metrics.
__init__(self, rclpy.node.Node node)
Initializes the EUFS Adapter.