Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
pacsim_adapter.py
Go to the documentation of this file.
1from evaluator.adapter import Adapter
2import numpy as np
3import datetime
4import rclpy
5import message_filters
6from custom_interfaces.msg import ConeArray, PathPointArray, Pose, Velocities
7from visualization_msgs.msg import MarkerArray
8from pacsim.msg import PerceptionDetections
9from geometry_msgs.msg import TwistWithCovarianceStamped, TransformStamped
10from evaluator.formats import (
11 format_vehicle_pose_msg,
12 format_velocities_msg,
13 format_cone_array_msg,
14 format_transform_stamped_msg,
15 format_twist_with_covariance_stamped_msg,
16 format_marker_array_msg,
17 format_path_point_array_msg,
18 get_blue_and_yellow_cones_after_msg_treatment,
19)
20
21
23 """!
24 Adapter class to manage the use of Pacsim.
25 """
26
27 def __init__(self, node: rclpy.node.Node):
28 """!
29 Initializes the PacSim Adapter.
30
31 Args:
32 node (Node): ROS2 node instance.
33 """
34 super().__init__(node)
36 self.node.groundtruth_map_subscription_ = self.node.create_subscription(
37 MarkerArray,
38 "/pacsim/map",
40 10,
41 ) # because the map gets published only once at the beginning
42
43 self.node.velocities_subscription_ = self.node.create_subscription(
44 Velocities,
45 "/state_estimation/velocities",
47 10,
48 )
49
51 self.node.velocities_groundtruth_subscription_ = self.node.create_subscription(
52 TwistWithCovarianceStamped,
53 "/pacsim/velocity",
55 10,
56 )
57
59 self.node.pose_groundtruth_subscription_ = self.node.create_subscription(
60 TwistWithCovarianceStamped,
61 "/pacsim/pose",
63 10,
64 )
65
66 # self._time_sync_slam_ = message_filters.ApproximateTimeSynchronizer(
67 # [
68 # self.node.vehicle_pose_subscription_,
69 # self.node.map_subscription_,
70 # ],
71 # 10,
72 # 0.1,
73 # )
74 self.node.vehicle_pose_subscription_.registerCallback(self.pose_callbackpose_callback)
75 self.node.map_subscription_.registerCallback(self.map_callbackmap_callback)
76
77 # self._time_sync_velocities_.registerCallback(self.velocities_callback)
78
79 self._planning_time_sync_ = message_filters.ApproximateTimeSynchronizer(
80 [
81 self.node.planning_subscription_,
82 self.node.planning_gt_subscription_,
83 ],
84 10,
85 0.5,
86 )
87
89
90 def map_callback(self, map: ConeArray):
91 """!
92 Callback function to process map messages.
93
94 Args:
95 map (ConeArray): Map data.
96 """
97 if self._groundtruth_map_ is None:
98 self.node.get_logger().warn("Groundtruth map not received")
99 return
100 map_treated: np.ndarray = format_cone_array_msg(map)
101 groundtruth_map_treated: np.ndarray = format_marker_array_msg(
103 )
104 self.node.compute_and_publish_map(
105 map_treated,
106 groundtruth_map_treated,
107 )
108
109 def pose_callback(self, vehicle_pose: Pose):
110 """!
111 Callback function to process vehicle pose messages.
112
113 Args:
114 vehicle_pose (Pose): Vehicle pose data.
115 """
116 if self._groundtruth_pose_ is None:
117 self.node.get_logger().warn("Groundtruth pose not received")
118 return
119
120 groundtruth_pose_treated: np.ndarray = format_twist_with_covariance_stamped_msg(
122 )
123 pose_treated: np.ndarray = format_vehicle_pose_msg(vehicle_pose)
124 self.node.compute_and_publish_pose(
125 pose_treated,
126 groundtruth_pose_treated,
127 )
128
130 self,
131 velocities: Velocities,
132 ):
133 """!
134 Callback function to process synchronized messages
135 and compute state estimation metrics.
136
137 Args:
138 velocities (Velocities): Vehicle velocities estimation data.
139 """
140
141 if self._groundtruth_velocity_ is None:
142 self.node.get_logger().warn("Groundtruth velocity not received")
143 return
144
145 groundtruth_velocities_treated: np.ndarray = (
146 format_twist_with_covariance_stamped_msg(self._groundtruth_velocity_)
147 ) # [vx, vy, w]
148 velocities_treated = format_velocities_msg(velocities) # [vx, vy, w]
149 self.node.compute_and_publish_velocities(
150 velocities_treated,
151 groundtruth_velocities_treated,
152 )
153
155 self,
156 planning_output: PathPointArray,
157 path_ground_truth: PathPointArray,
158 ):
159 """!
160 Callback function to process synchronized messages and compute planning metrics.
161
162 Args:
163 planning_output (PathPointArray): Planning output.
164 path_ground_truth (PathPointArray): Groundtruth path message.
165 """
166 if self._groundtruth_map_ is None:
167 return
168 map_ground_truth_treated: np.ndarray = format_marker_array_msg(
170 )
171 blue_cones, yellow_cones = get_blue_and_yellow_cones_after_msg_treatment(
172 map_ground_truth_treated
173 )
174 planning_output_treated: np.ndarray = format_path_point_array_msg(
175 planning_output
176 )
177 path_ground_truth_treated: np.ndarray = format_path_point_array_msg(
178 path_ground_truth
179 )
180 self.node.compute_and_publish_planning(
181 planning_output_treated,
182 path_ground_truth_treated,
183 blue_cones,
184 yellow_cones,
185 )
186
187 def groundtruth_map_callback(self, groundtruth_map: MarkerArray):
188 """!
189 Callback function to process ground truth map messages.
190 It also marks the planning's initial timestamp
191
192 Args:
193 groundtruth_map (MarkerArray): Ground truth map data.
194 """
195 self._groundtruth_map_: MarkerArray = groundtruth_map
196
198 self, groundtruth_velocity: TwistWithCovarianceStamped
199 ):
200 """!
201 Callback function to process ground truth velocity messages.
202
203 Args:
204 groundtruth_velocity (TwistWithCovarianceStamped): Ground truth velocity data.
205 """
206 self._groundtruth_velocity_: TwistWithCovarianceStamped = groundtruth_velocity
207 if self.node.use_simulated_velocities_:
208 self.node.absolute_velocity = (
209 self._groundtruth_velocity_.twist.twist.linear.x**2
210 + self._groundtruth_velocity_.twist.twist.linear.y**2
211 ) ** 0.5
212
213 def groundtruth_pose_callback(self, groundtruth_pose: TwistWithCovarianceStamped):
214 """!
215 Callback function to process ground truth pose messages.
216
217 Args:
218 groundtruth_pose (TwistWithCovarianceStamped): Ground truth pose data.
219 """
220 self._groundtruth_pose_: TwistWithCovarianceStamped = groundtruth_pose
221 if self.node.use_simulated_se_:
222 self.node.position = [
223 groundtruth_pose.twist.twist.linear.x,
224 groundtruth_pose.twist.twist.linear.y,
225 ]
Class for subscribing to a point cloud topic and synchronizing messages with a ROS2 node.
Definition adapter.py:7
Adapter class to manage the use of Pacsim.
groundtruth_velocity_callback(self, TwistWithCovarianceStamped groundtruth_velocity)
Callback function to process ground truth velocity messages.
groundtruth_pose_callback(self, TwistWithCovarianceStamped groundtruth_pose)
Callback function to process ground truth pose messages.
planning_callback(self, PathPointArray planning_output, PathPointArray path_ground_truth)
Callback function to process synchronized messages and compute planning metrics.
groundtruth_map_callback(self, MarkerArray groundtruth_map)
Callback function to process ground truth map messages.
__init__(self, rclpy.node.Node node)
Initializes the PacSim Adapter.
map_callback(self, ConeArray map)
Callback function to process map messages.
velocities_callback(self, Velocities velocities)
Callback function to process synchronized messages and compute state estimation metrics.
pose_callback(self, Pose vehicle_pose)
Callback function to process vehicle pose messages.