Skip to content

Commit

Permalink
add 3 unittests
Browse files Browse the repository at this point in the history
  • Loading branch information
Richardvdketterij committed Jan 16, 2024
1 parent c13e96a commit b12bbda
Showing 1 changed file with 48 additions and 3 deletions.
51 changes: 48 additions & 3 deletions diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,34 @@

import unittest
import time
import rclpy

from rclpy.node import Node
from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_updater import DiagnosticStatusWrapper
from diagnostic_updater import DiagnosticArray, Updater
from diagnostic_common_diagnostics.cpu_monitor import CpuTask
import time

class TestCPUMonitor(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init(args=None)

@classmethod
def tearDownClass(cls):
if rclpy.ok():
rclpy.shutdown()

def diagnostics_callback(self, msg):
self.message_recieved = True
self.assertEqual(len(msg.status), 1)

def test_ok(self):
# In this case is recommended for accuracy that psutil.cpu_percent()
# function be called with at least 0.1 seconds between calls.
time.sleep(0.1)

warning_percentage = 100
task = CpuTask(warning_percentage)
stat = DiagnosticStatusWrapper()
Expand All @@ -52,7 +74,9 @@ def test_ok(self):
self.assertGreaterEqual(len(stat.values), 2)

def test_warn(self):
# time.sleep(0.1)
# In this case is recommended for accuracy that psutil.cpu_percent()
# function be called with at least 0.1 seconds between calls.
time.sleep(0.1)

warning_percentage2 = 0
task2 = CpuTask(warning_percentage2)
Expand All @@ -64,9 +88,30 @@ def test_warn(self):

# Check for at least 1 CPU Load Average and 1 CPU Load
self.assertGreaterEqual(len(stat2.values), 2)
# time.sleep(0.25)

# def test_updater(self):
def test_updater(self):
# In this case is recommended for accuracy that psutil.cpu_percent()
# function be called with at least 0.1 seconds between calls.
time.sleep(0.1)

self.message_recieved = False

node = Node('cpu_monitor_test')
updater = Updater(node)
updater.setHardwareID("test_id")
updater.add(CpuTask())

node.create_subscription(DiagnosticArray, '/diagnostics', self.diagnostics_callback, 10)

start_time = time.time()
timeout = 5.0 # Timeout in seconds

while not self.message_recieved:
rclpy.spin_once(node)
time.sleep(0.1)
elapsed_time = time.time() - start_time
if elapsed_time >= timeout:
self.fail("No diagnostics received")


if __name__ == "__main__":
Expand Down

0 comments on commit b12bbda

Please sign in to comment.