Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
integrator_node.py
Go to the documentation of this file.
1import math
2import sys
3from select import select
4import termios
5import tty
6
7import rclpy
8from rclpy.node import Node
9from custom_interfaces.msg import Velocities
10from geometry_msgs.msg import Vector3Stamped
11
12
13def get_key(input_stream, settings, timeout):
14 tty.setraw(input_stream.fileno())
15 rlist, _, _ = select([input_stream], [], [], timeout)
16 key = input_stream.read(1) if rlist else ""
17 termios.tcsetattr(input_stream, termios.TCSADRAIN, settings)
18 return key
19
20
21def rotation_matrix_from_euler(roll, pitch, yaw):
22 cr = math.cos(roll)
23 sr = math.sin(roll)
24 cp = math.cos(pitch)
25 sp = math.sin(pitch)
26 cy = math.cos(yaw)
27 sy = math.sin(yaw)
28
29 return [
30 [cy * cp, cy * sp * sr - sy * cr, cy * sp * cr + sy * sr],
31 [sy * cp, sy * sp * sr + cy * cr, sy * sp * cr - cy * sr],
32 [-sp, cp * sr, cp * cr],
33 ]
34
35
36def mat_transpose(matrix):
37 return [
38 [matrix[0][0], matrix[1][0], matrix[2][0]],
39 [matrix[0][1], matrix[1][1], matrix[2][1]],
40 [matrix[0][2], matrix[1][2], matrix[2][2]],
41 ]
42
43
44def mat_mul(left, right):
45 return [
46 [
47 left[0][0] * right[0][0]
48 + left[0][1] * right[1][0]
49 + left[0][2] * right[2][0],
50 left[0][0] * right[0][1]
51 + left[0][1] * right[1][1]
52 + left[0][2] * right[2][1],
53 left[0][0] * right[0][2]
54 + left[0][1] * right[1][2]
55 + left[0][2] * right[2][2],
56 ],
57 [
58 left[1][0] * right[0][0]
59 + left[1][1] * right[1][0]
60 + left[1][2] * right[2][0],
61 left[1][0] * right[0][1]
62 + left[1][1] * right[1][1]
63 + left[1][2] * right[2][1],
64 left[1][0] * right[0][2]
65 + left[1][1] * right[1][2]
66 + left[1][2] * right[2][2],
67 ],
68 [
69 left[2][0] * right[0][0]
70 + left[2][1] * right[1][0]
71 + left[2][2] * right[2][0],
72 left[2][0] * right[0][1]
73 + left[2][1] * right[1][1]
74 + left[2][2] * right[2][1],
75 left[2][0] * right[0][2]
76 + left[2][1] * right[1][2]
77 + left[2][2] * right[2][2],
78 ],
79 ]
80
81
82def mat_vec_mul(matrix, vector):
83 return (
84 matrix[0][0] * vector[0] + matrix[0][1] * vector[1] + matrix[0][2] * vector[2],
85 matrix[1][0] * vector[0] + matrix[1][1] * vector[1] + matrix[1][2] * vector[2],
86 matrix[2][0] * vector[0] + matrix[2][1] * vector[1] + matrix[2][2] * vector[2],
87 )
88
89
90def gravity_from_euler(roll, pitch, gravity):
91 sin_roll = math.sin(roll)
92 cos_roll = math.cos(roll)
93 sin_pitch = math.sin(pitch)
94 cos_pitch = math.cos(pitch)
95
96 return (
97 -gravity * sin_pitch,
98 gravity * sin_roll * cos_pitch,
99 gravity * cos_roll * cos_pitch,
100 )
101
102
103class Integrator(Node):
104 def __init__(self):
105 super().__init__("integrator")
106 self.declare_parameter("imu_acceleration_topic", "/imu/acceleration")
107 self.declare_parameter("imu_euler_topic", "/filter/euler")
108 self.declare_parameter("velocity_topic", "/imu/integrated_velocity")
109 self.declare_parameter("calibration_samples", 1000)
110 self.declare_parameter("gravity_magnitude", 9.80920)
111 self.declare_parameter("euler_in_degrees", True)
112 self.declare_parameter("output_in_body_frame", True)
113 self.declare_parameter("output_frame_id", "imu_corrected")
114
115 imu_topic = self.get_parameter("imu_acceleration_topic").get_parameter_value().string_value
116 euler_topic = self.get_parameter("imu_euler_topic").get_parameter_value().string_value
117 velocity_topic = self.get_parameter("velocity_topic").get_parameter_value().string_value
118
120 1, int(self.get_parameter("calibration_samples").get_parameter_value().integer_value)
121 )
123 self.euler_in_degrees = bool(
124 self.get_parameter("euler_in_degrees").get_parameter_value().bool_value
125 )
127 self.get_parameter("output_in_body_frame").get_parameter_value().bool_value
128 )
130 self.get_parameter("output_frame_id").get_parameter_value().string_value
131 )
132
133 self.velocity_publisher = self.create_publisher(Velocities, velocity_topic, 10)
134 self.imu_subscription = self.create_subscription(
135 Vector3Stamped,
136 imu_topic,
138 10,
139 )
140 self.euler_subscription = self.create_subscription(
141 Vector3Stamped,
142 euler_topic,
144 10,
145 )
146
147 self.velocity_world = [0.0, 0.0, 0.0]
148 self.last_time = None
149
150 self.latest_euler = None
152
154 self._roll_sin_sum = 0.0
155 self._roll_cos_sum = 0.0
156 self._pitch_sin_sum = 0.0
157 self._pitch_cos_sum = 0.0
158 self._yaw_sin_sum = 0.0
159 self._yaw_cos_sum = 0.0
160 self._accel_sum = [0.0, 0.0, 0.0]
161 self._g_unit_sum = [0.0, 0.0, 0.0]
163
165 self.accel_bias = None
167
168 self.get_logger().info(
169 "Integrator started. IMU topic: "
170 f"{imu_topic} | Euler topic: {euler_topic} | velocity topic: {velocity_topic}"
171 )
172 self.get_logger().info(
173 "Calibration samples: "
174 f"{self.calibration_samples} | gravity estimated from IMU data"
175 )
176 self.get_logger().info(
177 "Velocity output frame: "
178 f"{'body' if self.output_in_body_frame else 'reference'}"
179 )
180
181 def euler_callback(self, msg: Vector3Stamped):
182 roll = msg.vector.x
183 pitch = msg.vector.y
184 yaw = msg.vector.z
185 if self.euler_in_degrees:
186 roll = math.radians(roll)
187 pitch = math.radians(pitch)
188 yaw = math.radians(yaw)
189 self.latest_euler = (roll, pitch, yaw)
190
191 def _accumulate_calibration(self, acceleration, euler):
192 roll, pitch, yaw = euler
193 accel_norm = math.sqrt(
194 acceleration[0] ** 2 + acceleration[1] ** 2 + acceleration[2] ** 2
195 )
196 g_unit = gravity_from_euler(roll, pitch, 1.0)
197 self._gravity_norm_sum += accel_norm
198 self._accel_sum[0] += acceleration[0]
199 self._accel_sum[1] += acceleration[1]
200 self._accel_sum[2] += acceleration[2]
201 self._g_unit_sum[0] += g_unit[0]
202 self._g_unit_sum[1] += g_unit[1]
203 self._g_unit_sum[2] += g_unit[2]
204
205 self._roll_sin_sum += math.sin(roll)
206 self._roll_cos_sum += math.cos(roll)
207 self._pitch_sin_sum += math.sin(pitch)
208 self._pitch_cos_sum += math.cos(pitch)
209 self._yaw_sin_sum += math.sin(yaw)
210 self._yaw_cos_sum += math.cos(yaw)
211
212 self._calibration_count += 1
214 return False
215
216 roll_avg = math.atan2(self._roll_sin_sum, self._roll_cos_sum)
217 pitch_avg = math.atan2(self._pitch_sin_sum, self._pitch_cos_sum)
218 yaw_avg = math.atan2(self._yaw_sin_sum, self._yaw_cos_sum)
219 self.initial_rotation = rotation_matrix_from_euler(roll_avg, pitch_avg, yaw_avg)
220
223 accel_avg = [value / self._calibration_count for value in self._accel_sum]
224 g_unit_avg = [value / self._calibration_count for value in self._g_unit_sum]
225 self.accel_bias = [
226 accel_avg[0] - self.gravity_magnitude * g_unit_avg[0],
227 accel_avg[1] - self.gravity_magnitude * g_unit_avg[1],
228 accel_avg[2] - self.gravity_magnitude * g_unit_avg[2],
229 ]
230
231 self.get_logger().info(
232 "IMU calibration complete. Estimated gravity: "
233 f"{self.estimated_gravity:.5f} m/s^2 | Accel bias: "
234 f"[{self.accel_bias[0]:.3f}, {self.accel_bias[1]:.3f}, "
235 f"{self.accel_bias[2]:.3f}]"
236 )
237 return True
238
239 def imu_callback(self, msg: Vector3Stamped):
240 current_time = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
241 if current_time == 0.0:
242 current_time = self.get_clock().now().nanoseconds * 1e-9
243
244 if self.latest_euler is None:
245 if not self._warned_no_orientation:
246 self.get_logger().warn(
247 "No IMU Euler data received yet; waiting before integrating."
248 )
249 self._warned_no_orientation = True
250 self.last_time = current_time
251 return
252
253 acceleration = (msg.vector.x, msg.vector.y, msg.vector.z)
254 if self.accel_bias is None:
255 calibrated = self._accumulate_calibration(acceleration, self.latest_euler)
256 if calibrated:
257 self.last_time = current_time
258 return
259
260 if self.last_time is None:
261 self.last_time = current_time
262 return
263
264 dt = current_time - self.last_time
265 if dt <= 0.0:
266 self.get_logger().warn("Received non-positive dt from IMU message; skipping update.")
267 self.last_time = current_time
268 return
269
270 roll, pitch, yaw = self.latest_euler
271 rotation = rotation_matrix_from_euler(roll, pitch, yaw)
272 gravity_imu = gravity_from_euler(roll, pitch, self.gravity_magnitude)
273 linear_acc = (
274 acceleration[0] - gravity_imu[0] - self.accel_bias[0],
275 acceleration[1] - gravity_imu[1] - self.accel_bias[1],
276 acceleration[2] - gravity_imu[2] - self.accel_bias[2],
277 )
278
279 if self.initial_rotation is not None:
280 rotation_rel = mat_mul(mat_transpose(self.initial_rotation), rotation)
281 else:
282 rotation_rel = rotation
283
284 linear_acc_world = mat_vec_mul(rotation_rel, linear_acc)
285
286 self.velocity_world[0] += linear_acc_world[0] * dt
287 self.velocity_world[1] += linear_acc_world[1] * dt
288 self.velocity_world[2] += linear_acc_world[2] * dt
289 self.last_time = current_time
290
291 self.publish_velocity(msg.header.stamp, msg.header.frame_id, rotation_rel)
292
293 def publish_velocity(self, stamp, frame_id, rotation_rel=None):
294 velocity_msg = Velocities()
295 velocity_msg.header.stamp = stamp
296 velocity_msg.header.frame_id = self.output_frame_id or frame_id
297 if self.output_in_body_frame and rotation_rel is not None:
298 velocity_body = mat_vec_mul(mat_transpose(rotation_rel), self.velocity_world)
299 else:
300 velocity_body = self.velocity_world
301 velocity_out = (-velocity_body[1], velocity_body[0], velocity_body[2])
302 velocity_msg.velocity_x = float(velocity_out[0])
303 velocity_msg.velocity_y = float(velocity_out[1])
304 velocity_msg.angular_velocity = 0.0
305 velocity_msg.covariance = [0.0] * 9
306 self.velocity_publisher.publish(velocity_msg)
307
308 def reset(self):
309 self.velocity_world = [0.0, 0.0, 0.0]
310 self.last_time = None
311 now = self.get_clock().now().to_msg()
312 self.publish_velocity(now, "", None)
313 self.get_logger().info("Integrator reset: velocity set to 0 and count reset.")
314
315
316def main(args=None):
317 rclpy.init(args=args)
318 node = Integrator()
319
320 input_stream = sys.stdin
321 input_stream_owned = False
322 settings = None
323 if not input_stream.isatty():
324 try:
325 input_stream = open("/dev/tty")
326 input_stream_owned = True
327 node.get_logger().info("Using /dev/tty for keyboard reset input.")
328 except OSError:
329 input_stream = None
330
331 if input_stream is not None and input_stream.isatty():
332 settings = termios.tcgetattr(input_stream)
333 else:
334 node.get_logger().warn("No TTY detected; keyboard reset (R) is disabled.")
335
336 try:
337 while rclpy.ok():
338 rclpy.spin_once(node, timeout_sec=0.05)
339 if settings is None:
340 continue
341
342 key = get_key(input_stream, settings, 0.0)
343 if key in ("r", "R"):
344 node.reset()
345 elif key == "q" or key == "\x03":
346 break
347 finally:
348 if settings is not None and input_stream is not None:
349 termios.tcsetattr(input_stream, termios.TCSADRAIN, settings)
350 if input_stream_owned and input_stream is not None:
351 input_stream.close()
352 node.destroy_node()
353 rclpy.shutdown()
354
355
356if __name__ == "__main__":
357 main()
publish_velocity(self, stamp, frame_id, rotation_rel=None)
Definition main.py:1
get_key(input_stream, settings, timeout)