Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
main.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3from __future__ import print_function
4import threading
5import sys
6from select import select
7import termios
8import tty
9import math
10import time
11
12from rclpy.node import Node
13import rclpy
14from pacsim.msg import StampedScalar
15from pacsim.msg import Wheels
16
17lateral_command_msg = StampedScalar()
18longitudinal_command_msg = Wheels()
19
20help_msg = """
21w/s: increase/decrease acceleration
22a/d: increase/decrease steering angle
23q: quit
24CTRL-C to force quit
25"""
26
27
28class PublishThread(threading.Thread):
29 """!
30 @brief Thread to publish control commands to the robot
31 """
32
33 def __init__(self, rate):
34 """!
35 @brief Constructor for the PublishThread class
36
37 @param rate: Rate at which to publish control commands
38 """
39 super(PublishThread, self).__init__()
40 self.node = Node("pacsim_keys")
41
42 self.steering_publisher = self.node.create_publisher(
43 StampedScalar, "/pacsim/steering_setpoint", 10
44 )
45 self.acceleration_publisher = self.node.create_publisher(
46 Wheels, "/pacsim/throttle_setpoint", 10
47 )
48
49 self.acceleration: float = 0.0
50 self.steering_angle: float = 0.0
51
52 self.condition = threading.Condition()
53 self.done = False
54 self.timeout = 1.0 / rate
55 self.start()
56
57 def update(self, acceleration: float, steering_angle: float):
58 """!
59 @brief Update the control commands to be published
60
61 @param acceleration: Acceleration value
62 @param steering_angle: Steering angle value
63 """
64 with self.condition:
65 self.acceleration = acceleration
66 self.steering_angle = steering_angle
67
68 def stop(self):
69 self.done = True
70 self.join()
71
72 def run(self):
73 while not self.done:
74 with self.condition:
75 acc = self.acceleration
76 steer = self.steering_angle
77 # Set all wheels to 0 except rear for throttle
78 lateral_command_msg.value = steer
79 longitudinal_command_msg.fl = 0.0
80 longitudinal_command_msg.fr = 0.0
81 longitudinal_command_msg.rl = acc
82 longitudinal_command_msg.rr = acc
83 self.acceleration_publisher.publish(longitudinal_command_msg)
84 self.steering_publisher.publish(lateral_command_msg)
85 time.sleep(self.timeout)
86 # Publish stop message when thread exits
87 lateral_command_msg.value = 0.0
88 longitudinal_command_msg.fl = 0.0
89 longitudinal_command_msg.fr = 0.0
90 longitudinal_command_msg.rl = 0.0
91 longitudinal_command_msg.rr = 0.0
92 self.acceleration_publisher.publish(longitudinal_command_msg)
93 self.steering_publisher.publish(lateral_command_msg)
94
95
96def get_key(settings, timeout):
97 tty.setraw(sys.stdin.fileno())
98 rlist, _, _ = select([sys.stdin], [], [], timeout)
99 key = sys.stdin.read(1) if rlist else ""
100 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
101 return key
102
103
104def main():
105 rclpy.init()
106 settings = termios.tcgetattr(sys.stdin)
107
108 print(help_msg)
109
110 pub_thread = PublishThread(20) # 20 Hz update rate
111
112 acceleration: float = 0.0
113 steering: float = 0.0
114
115 update_needed = True
116
117 try:
118 while 1:
119 key = get_key(settings, 0.1) # Faster timeout for smoother feel
120
121 if key == "w":
122 acceleration += 0.1
123 acceleration = min(acceleration, 1.0)
124 update_needed = True
125 elif key == "s":
126 acceleration -= 0.1
127 acceleration = max(acceleration, -1.0)
128 update_needed = True
129 elif key == "a":
130 steering += math.pi / 32
131 steering = min(steering, math.pi / 8)
132 update_needed = True
133 elif key == "d":
134 steering += -math.pi / 32
135 steering = max(steering, -math.pi / 8)
136 update_needed = True
137 elif key == "q" or key == "\x03":
138 break
139
140 if update_needed:
141 pub_thread.update(acceleration, steering)
142 print(f"\rAccel: {acceleration:.2f} | Steer: {steering:.2f}", end="\n")
143 update_needed = False
144
145 except Exception as e:
146 print(e)
147
148 finally:
149 pub_thread.stop()
150 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
151 rclpy.shutdown()
152
153
154if __name__ == "__main__":
155 main()
Thread to publish control commands to the robot.
Definition main.py:28
__init__(self, rate)
Constructor for the PublishThread class.
Definition main.py:33
update(self, float acceleration, float steering_angle)
Update the control commands to be published.
Definition main.py:57
Definition main.py:1
get_key(settings, timeout)
Definition main.py:96