Formula Student Autonomous Systems
The code for the main driverless system
Loading...
Searching...
No Matches
power_log_node.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2import rclpy
3from rclpy.node import Node
4import psutil
5from std_msgs.msg import Float32, Float32MultiArray, Int64MultiArray
6from custom_interfaces.msg import ProcessUsage, ProcessUsageArray
7import csv
8import signal
9import datetime
10import sys
11import time
12
13
14class PowerNode(Node):
15 def __init__(self):
16 super().__init__("power_log_node")
17 self.cpu_publisher = self.create_publisher(
18 Float32MultiArray, "cpu_core_usage", 10
19 )
20 self.memory_publisher = self.create_publisher(Float32, "memory_usage", 10)
21 self.temperature_publisher = self.create_publisher(
22 Float32, "cpu_temperature", 10
23 )
24 self.processes_usage_publisher = self.create_publisher(
25 ProcessUsageArray, "processes_usage", 10
26 )
27 self.create_timer(1.0, self.timer_callbacktimer_callback)
28
29 self.metrics_list = []
31
32 signal.signal(signal.SIGINT, self.signal_handlersignal_handler)
33
34 def signal_handler(self, sig, frame):
35 """!
36 Writes metrics to csv and exits when Ctrl+C is pressed.
37
38 Args:
39 sig (int): Signal number.
40 frame (frame): Current stack frame.
41 """
42
43 if self.metrics_list: # Check if the list is not empty
44 finish_time = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
45 datetime_filename = f"power_log_{finish_time}.csv"
46 self.metrics_to_csv(
47 self.metrics_list, "performance/power_log_metrics/" + datetime_filename
48 )
49 sys.exit(0)
50
51 def metrics_to_csv(self, metrics, filename):
52 """!
53 Converts metrics to csv and writes them to a file.
54
55 Args:
56 metrics (list): List of metrics dictionaries.
57 filename (str): Name of the file to write the metrics to.
58 """
59
60 # Add 'time' key to each metric
61 start_time = metrics[0]["timestamp"]
62 for metric in metrics:
63 elapsed_time = (metric["timestamp"] - start_time).total_seconds()
64 metric["time"] = elapsed_time # Add/Update 'time' key with elapsed time
65
66 # Write metrics to csv
67 with open(filename, "w", newline="") as csvfile:
68 fieldnames = metrics[0].keys()
69 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
70 writer.writeheader()
71 writer.writerows(metrics)
72
73 def get_process_usage(self, top_n=20):
74 # First call to cpu_percent initializes counters
75 for proc in psutil.process_iter(["pid", "name"]):
76 try:
77 proc.cpu_percent(interval=None)
78 except (psutil.NoSuchProcess, psutil.AccessDenied):
79 pass
80
81 processes = []
82
83 for proc in psutil.process_iter(["pid", "name"]):
84 try:
85 cpu = proc.cpu_percent(interval=None)
86 mem = proc.memory_info().rss / (1024 * 1024) # MB
87 processes.append((cpu, mem, proc.info["pid"], proc.info["name"]))
88 except (psutil.NoSuchProcess, psutil.AccessDenied):
89 continue
90
91 # Sort by CPU usage
92 processes.sort(key=lambda x: x[0], reverse=True)
93
94 return processes[:top_n] # Return top N processes
95
96 def pc_stats(self):
97 """!
98 Returns a dictionary with the CPU core usage, memory usage and CPU temperature.
99 """
100 temps = psutil.sensors_temperatures()
101 cpu_temp = None
102 if "k10temp" in temps:
103 for entry in temps["k10temp"]:
104 if entry.label == "Tctl":
105 cpu_temp = entry.current
106 break
107
108 return {
109 "cpu_core_usage": psutil.cpu_percent(interval=0.1, percpu=True),
110 "memory_usage": psutil.virtual_memory().percent,
111 "cpu_temperature": cpu_temp,
112 }
113
114 def timer_callback(self):
115 """!
116 Callback function that publishes CPU core usage, memory usage and CPU temperature.
117 """
118 stats = self.pc_stats()
119 processes = self.get_process_usage()
120
121 cpu_core_usage_msg = Float32MultiArray()
122 cpu_core_usage_msg.data = stats["cpu_core_usage"]
123 self.cpu_publisher.publish(cpu_core_usage_msg)
124
125 processes_usage_msg = ProcessUsageArray()
126 processes_usage_msg.header.stamp = self.get_clock().now().to_msg()
127 processes_usage_msg.process_usages = []
128 for cpu, mem, pid, name in processes:
129 process_usage = ProcessUsage()
130 process_usage.header.stamp = processes_usage_msg.header.stamp
131 process_usage.usage = cpu
132 process_usage.memory = mem
133 process_usage.process_name = name
134 processes_usage_msg.process_usages.append(process_usage)
135
136 self.processes_usage_publisher.publish(processes_usage_msg)
137
138 memory_msg = Float32()
139 memory_msg.data = stats["memory_usage"]
140 self.memory_publisher.publish(memory_msg)
141
142 temperature_msg = Float32()
143 temperature_msg.data = stats["cpu_temperature"]
144 self.temperature_publisher.publish(temperature_msg)
145
146 metrics = {
147 "timestamp": datetime.datetime.now(),
148 "memory_usage": stats["memory_usage"],
149 "cpu_temperature": stats["cpu_temperature"],
150 }
151
152 for i in range(16):
153 metrics[f"cpu_core_{i+1}_usage"] = (
154 stats["cpu_core_usage"][i] if i < len(stats["cpu_core_usage"]) else 0
155 )
156
157 self.metrics_list.append(metrics)
158
159
160def main(args=None):
161 rclpy.init(args=args)
162 node = PowerNode()
163 rclpy.spin(node)
164 rclpy.shutdown()
165
166
167if __name__ == "__main__":
168 main()
metrics_to_csv(self, metrics, filename)
Converts metrics to csv and writes them to a file.
pc_stats(self)
Returns a dictionary with the CPU core usage, memory usage and CPU temperature.
timer_callback(self)
Callback function that publishes CPU core usage, memory usage and CPU temperature.
signal_handler(self, sig, frame)
Writes metrics to csv and exits when Ctrl+C is pressed.
Definition main.py:1