Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
main.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3from __future__ import print_function
4import threading
5import sys
6from select import select
7import termios
8import tty
9import math
10import time
11import custom_interfaces.msg._control_command
12
13from rclpy.node import Node
14import rclpy
15from pacsim.msg import StampedScalar
16
17lateral_command_msg = StampedScalar()
18longitudinal_command_msg = StampedScalar()
19
20help_msg = """
21w/s: increase/decrease acceleration
22a/d: increase/decrease steering angle
23q: quit
24CTRL-C to force quit
25"""
26
27
28class PublishThread(threading.Thread):
29 """!
30 @brief Thread to publish control commands to the robot
31 """
32
33 def __init__(self, rate):
34 """!
35 @brief Constructor for the PublishThread class
36
37 @param rate: Rate at which to publish control commands
38 """
39 super(PublishThread, self).__init__()
40 self.node = Node("car_keys")
41 self.control_publisher = self.node.create_publisher(
42 custom_interfaces.msg._control_command.ControlCommand,
43 "/control/command",
44 10,
45 )
46 self.acceleration: float = 0.0
47 self.steering_angle: float = 0.0
48 self.rate = rate
49
50 # Condition to lock the thread waiting to be notified
51 self.condition = threading.Condition()
52 self.new_command = False
53 self.done = False
54
55 self.start()
56
57 def update(self, acceleration: float, steering_angle: float):
58 """!
59 @brief Update the control commands to be published
60
61 @param acceleration: Acceleration value
62 @param steering_angle: Steering angle value
63 """
64 self.condition.acquire()
65 self.acceleration = acceleration
66 self.steering_angle = steering_angle
67 self.new_command = True
68 self.condition.notify()
69 self.condition.release()
70
71 def stop(self):
72 """!
73 @brief Stop the thread
74 """
75 self.done = True
76 self.update(0, 0)
77 self.join()
78
79 def run(self):
80 """!
81 @brief Run the thread to publish control commands
82 """
83 while not self.done:
84 # Lock the condition and wait for notification
85 self.condition.acquire()
86
87 # Wait for new command or timeout
88 if not self.new_command:
89 self.condition.wait(timeout=1.0 / self.rate)
90
91 if self.done:
92 self.condition.release()
93 break
94
95 # Create and publish message
96 message = custom_interfaces.msg._control_command.ControlCommand()
97 message.throttle_rl = self.acceleration
98 message.throttle_rr = self.acceleration
99 message.steering = self.steering_angle
100
101 # Release the condition and publish
102 self.condition.release()
103
104 self.control_publisher.publish(message)
105 self.new_command = False
106
107 # Sleep to maintain rate
108 time.sleep(1.0 / self.rate)
109
110 # Publish stop message when thread exits
111 message = custom_interfaces.msg._control_command.ControlCommand()
112 message.throttle_rl = 0.0
113 message.throttle_rr = 0.0
114 message.steering = 0.0
115 self.control_publisher.publish(message)
116
117
118def get_key(settings, timeout):
119 """!
120 @brief Get a single key from the user.
121 """
122 tty.setraw(sys.stdin.fileno())
123 rlist, _, _ = select([sys.stdin], [], [], timeout)
124 key = sys.stdin.read(1) if rlist else ""
125 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
126 return key
127
128
129def print_status(acceleration, steering):
130 """!
131 @brief Print current status without clearing the screen
132 """
133 # Move cursor to beginning of line and clear it
134 sys.stdout.write("\r")
135 sys.stdout.write(" " * 80) # Clear the line
136 sys.stdout.write("\r")
137
138 # Print current status
139 accel_bar = create_bar(acceleration, -1.0, 1.0, 20)
140 steer_bar = create_bar(steering, -0.34, 0.34, 20)
141
142 status = f"Throttle: {acceleration:+.2f} {accel_bar} | Steering: {steering:+.2f} {steer_bar}"
143 sys.stdout.write(status)
144 sys.stdout.flush()
145
146
147def create_bar(value, min_val, max_val, length):
148 """!
149 @brief Create a visual bar representation of a value
150 """
151 # Normalize value to 0-1 range
152 normalized = (value - min_val) / (max_val - min_val)
153 normalized = max(0, min(1, normalized)) # Clamp to 0-1
154
155 # Create bar
156 filled = int(normalized * length)
157 bar = "█" * filled + "░" * (length - filled)
158
159 return f"[{bar}]"
160
161
162def main():
163 """!
164 @brief Main function to control the pacsim robot using the keyboard
165
166 @details This function reads the keyboard inputs and updates
167 the control commands accordingly, for the other thread to publish them.
168 """
169
170 rclpy.init()
171 settings = termios.tcgetattr(sys.stdin)
172
173 key_timeout = 0.1
174 pub_thread = PublishThread(50) # 50 Hz publishing rate
175 acceleration: float = 0.0
176 steering: float = 0.0
177
178 # Print initial help and status
179 print(help_msg)
180 print("=" * 80)
181 print_status(acceleration, steering)
182
183 try:
184 pub_thread.update(acceleration, steering)
185 while True:
186 key: str = str(get_key(settings, key_timeout))
187
188 command_changed = False
189
190 if key == "w":
191 acceleration += 0.05
192 acceleration = min(acceleration, 1.0)
193 command_changed = True
194 elif key == "s":
195 acceleration -= 0.05
196 acceleration = max(acceleration, -1.0)
197 command_changed = True
198 elif key == "a":
199 steering += 0.05
200 steering = min(steering, 0.34)
201 command_changed = True
202 elif key == "d":
203 steering -= 0.05
204 steering = max(steering, -0.34)
205 command_changed = True
206 elif key == "q":
207 break
208 elif key == "\x03": # Ctrl+C
209 break
210
211 if command_changed:
212 pub_thread.update(acceleration, steering)
213 print_status(acceleration, steering)
214
215 except Exception as e:
216 print(f"\nError: {e}")
217
218 finally:
219 print("\nShutting down...")
220 pub_thread.stop()
221 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
222 rclpy.shutdown()
223
224
225if __name__ == "__main__":
226 main()
Thread to publish control commands to the robot.
Definition main.py:28
run(self)
Run the thread to publish control commands.
Definition main.py:79
update(self, float acceleration, float steering_angle)
Update the control commands to be published.
Definition main.py:57
__init__(self, rate)
Constructor for the PublishThread class.
Definition main.py:33
stop(self)
Stop the thread.
Definition main.py:71
create_bar(value, min_val, max_val, length)
Create a visual bar representation of a value.
Definition main.py:147
get_key(settings, timeout)
Get a single key from the user.
Definition main.py:118
main()
Main function to control the pacsim robot using the keyboard.
Definition main.py:162
print_status(acceleration, steering)
Print current status without clearing the screen.
Definition main.py:129
Definition main.py:1