diff --git a/ait/core/server/broker.py b/ait/core/server/broker.py index 72c4e308..407a8c25 100644 --- a/ait/core/server/broker.py +++ b/ait/core/server/broker.py @@ -1,8 +1,8 @@ -import zmq.green as zmq import gevent -import gevent.monkey +from gevent import monkey +monkey.patch_all() -gevent.monkey.patch_all() +import zmq.green as zmq from typing import List, Any diff --git a/ait/core/server/handlers/ccsds_packet_handler.py b/ait/core/server/handlers/ccsds_packet_handler.py index f9335e89..73c02288 100644 --- a/ait/core/server/handlers/ccsds_packet_handler.py +++ b/ait/core/server/handlers/ccsds_packet_handler.py @@ -42,10 +42,8 @@ def __init__(self, input_type=None, output_type=None, **kwargs): tlm_dict = tlm.getDefaultDict() for packet_name in self.packet_types.values(): if packet_name not in tlm_dict.keys(): - msg = "CCSDSPacketHandler: Packet name {} not present in telemetry dictionary.".format( - packet_name - ) - msg += " Available packet types are {}".format(tlm_dict.keys()) + msg = f"CCSDSPacketHandler: Packet name {packet_name} not present in telemetry dictionary." + msg += f" Available packet types are {tlm_dict.keys()}" raise ValueError(msg) def handle(self, input_data): @@ -59,24 +57,20 @@ def handle(self, input_data): # Check if packet length is at least 7 bytes primary_header_length = 6 if len(input_data) < primary_header_length + 1: - ait.core.log.info( + ait.core.log.error( "CCSDSPacketHandler: Received packet length is less than minimum of 7 bytes." ) return # Extract APID from packet - packet_apid = str(bin(int(binascii.hexlify(input_data[0:2]), 16) & 0x07FF))[ - 2: - ].zfill(11) + packet_apid = str(bin(int(binascii.hexlify(input_data[0:2]), 16) & 0x07FF))[2:].zfill(11) # Check if packet_apid matches with an APID in the config config_apid = self.comp_apid(packet_apid) if not config_apid: - msg = "CCSDSPacketHandler: Packet APID {} not present in config.".format( - packet_apid - ) - msg += " Available packet APIDs are {}".format(self.packet_types.keys()) - ait.core.log.info(msg) + msg = f"CCSDSPacketHandler: Packet APID {packet_apid} not present in config." + msg += f" Available packet APIDs are {self.packet_types.keys()}" + ait.core.log.error(msg) return # Map APID to packet name in config to get UID from telemetry dictionary @@ -87,7 +81,7 @@ def handle(self, input_data): # Extract user data field from packet packet_data_length = int(binascii.hexlify(input_data[4:6]), 16) + 1 if len(input_data) < primary_header_length + packet_data_length: - ait.core.log.info( + ait.core.log.error( "CCSDSPacketHandler: Packet data length is less than stated length in packet primary header." ) return diff --git a/ait/core/server/handlers/packet_handler.py b/ait/core/server/handlers/packet_handler.py index 1ec845dc..b1d88cad 100644 --- a/ait/core/server/handlers/packet_handler.py +++ b/ait/core/server/handlers/packet_handler.py @@ -1,45 +1,63 @@ import pickle from ait.core.server.handler import Handler -from ait.core import tlm +from ait.core import tlm, log class PacketHandler(Handler): def __init__(self, input_type=None, output_type=None, **kwargs): """ + This handler provides a way to accept multiple packet types + (e.g. '1553_HS_Packet' and 'Ethernet_HS_Packet') single stream + and have them be processed. This handler takes a string of + raw binary data containing the packet data. It gets the UID from + the telemetry dictionary. A tuple of the UID and user data + field is returned. + Params: input_type: (optional) Specifies expected input type, used to validate handler workflow. Defaults to None. output_type: (optional) Specifies expected output type, used to validate handler workflow. Defaults to None **kwargs: - packet: (required) Name of packet, present in default tlm dict. + packet: (required) Type of packet (e.g. '1553_HS_Packet', 'Ethernet_HS_Packet') + Present in default tlm dict. Raises: - ValueError: If packet is not present in kwargs. + ValueError: If packet type is not present in kwargs. If packet is specified but not present in default tlm dict. - """ + """ super(PacketHandler, self).__init__(input_type, output_type) - self.packet = kwargs.get("packet", None) + self.packet_name = kwargs.get("packet", None) + self.tlm_dict = tlm.getDefaultDict() - if not self.packet: - msg = 'PacketHandler: No packet name provided in handler config as key "packet"' + if not self.packet_name: + msg = f'PacketHandler: No packet type provided in handler config as key {self.packet_name}' raise ValueError(msg) - tlm_dict = tlm.getDefaultDict() - if self.packet not in tlm_dict: - msg = "PacketHandler: Packet name {} not present in telemetry dictionary".format( - self.packet - ) - msg += " Available packet types are {}".format(tlm_dict.keys()) + if self.packet_name not in self.tlm_dict: + msg = f"PacketHandler: Packet name '{self.packet_name}' not present in telemetry dictionary." + msg += f" Available packet types are {self.tlm_dict.keys()}" raise ValueError(msg) - self._pkt_defn = tlm_dict[self.packet] + self._pkt_defn = self.tlm_dict[self.packet_name] def handle(self, input_data): """ + Test the input_data length against the length in the telemetry + Params: - input_data: message received by stream + input_data : byteArray + message received by stream (raw data) Returns: tuple of packet UID and message received by stream """ + + if self._pkt_defn.nbytes != len(input_data): + log.error( + f"PacketHandler: Packet data length does not match packet definition." + ) + return 0 + else: + return pickle.dumps((self._pkt_defn.uid, input_data), 2) + return pickle.dumps((self._pkt_defn.uid, input_data), 2) diff --git a/ait/core/server/server.py b/ait/core/server/server.py index 29bb8e6d..55fd3b35 100644 --- a/ait/core/server/server.py +++ b/ait/core/server/server.py @@ -54,15 +54,15 @@ def __init__(self): def wait(self): """ - Starts all greenlets and plugin-pocesses for concurrent processing. + Starts all greenlets and plugin-processes for concurrent processing. Joins over all greenlets that are not servers. """ - # Start all of the greenlets managed by this process + # Start all greenlets managed by this process for greenlet in self.greenlets + self.servers: log.info(f"Starting {greenlet} greenlet...") greenlet.start() - # Start all of the separate plugin processes + # Start all separate plugin processes for plugin_process in self.plugin_processes: log.info(f"Spawning {plugin_process} process...") plugin_process.spawn_process() diff --git a/ait/core/tlm.py b/ait/core/tlm.py index 45e0fa74..38f5c054 100644 --- a/ait/core/tlm.py +++ b/ait/core/tlm.py @@ -55,7 +55,6 @@ def __getitem__(self, key): """Returns the words in this wordarray at the given Python slice or word at the given integer index.""" length = len(self) - if isinstance(key, slice): return [self[n] for n in range(*key.indices(length))] @@ -371,6 +370,7 @@ def validate(self, value, messages=None): Validation error messages are appended to an optional messages array. """ + valid = True primitive = value @@ -520,6 +520,7 @@ def validate(self, messages=None): Validation error messages are appended to an optional messages array. """ + return self._defn.validate(self, messages) diff --git a/config/leapseconds.dat b/config/leapseconds.dat index 01a9b653..815ff2d1 100644 Binary files a/config/leapseconds.dat and b/config/leapseconds.dat differ diff --git a/tests/ait/core/server/test_handler.py b/tests/ait/core/server/test_handler.py index f4d32252..3fa1f5fe 100644 --- a/tests/ait/core/server/test_handler.py +++ b/tests/ait/core/server/test_handler.py @@ -1,4 +1,6 @@ import pickle +import _pickle +import pytest import unittest from unittest import mock @@ -8,13 +10,12 @@ class TestCCSDSPacketCheck(unittest.TestCase): - # Check if packet length is at least 7 bytes def test_ccsds_packet_length(self): handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"}) data = bytearray(b"\x02\xE7\x40\x00\x00\x00") with self.assertLogs("ait", level="INFO") as cm: - result = handler.handle(data) + handler.handle(data) self.assertIn("less than minimum of 7 bytes", cm.output[0]) # Check if APID match between config and packet @@ -22,7 +23,7 @@ def test_ccsds_packet_apid(self): handler = CCSDSPacketHandler(packet_types={"00000000000": "CCSDS_HEADER"}) data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01") with self.assertLogs("ait", level="INFO") as cm: - result = handler.handle(data) + handler.handle(data) self.assertIn("not present in config", cm.output[0]) # Check packet length vs header reported length @@ -30,12 +31,16 @@ def test_ccsds_packet_header(self): handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"}) data = bytearray(b"\x02\xE7\x40\x00\x00\x0F\x01") with self.assertLogs("ait", level="INFO") as cm: - result = handler.handle(data) + handler.handle(data) self.assertIn( "Packet data length is less than stated length in packet primary header", cm.output[0], ) + def test_packet_name_not_present(self): + with pytest.raises(ValueError): + CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_"}) + # Check if dumped uid match expected tlm dictionary uid def test_ccsds_packet_uid(self): handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"}) @@ -47,23 +52,110 @@ def test_ccsds_packet_uid(self): self.assertEqual(packet_uid, pickle.loads(result)[0]) -class TestHandlerClassWithInputOutputTypes(object): - handler = PacketHandler(packet="CCSDS_HEADER", input_type="int", output_type="str") +class TestCCSDSHandlerClassWithInputOutputTypes(object): + handler = CCSDSPacketHandler( + packet_types={"01011100111": "CCSDS_HEADER"}, + input_type="int", + output_type="str", + ) def test_handler_creation(self): assert self.handler.input_type is "int" assert self.handler.output_type is "str" @mock.patch( - "ait.core.server.handlers.PacketHandler.handle", return_value="SpecialReturn" + "ait.core.server.handlers.CCSDSPacketHandler.handle", + return_value="SpecialReturn", ) def test_execute_handler_returns_handle_return_on_input(self, handle_mock): - returned = self.handler.handle("2") + data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01") + returned = self.handler.handle(data) assert returned == "SpecialReturn" +class TestCCSDSHandlerClassWithoutInputOutputTypes(object): + handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"}) + + def test_ccsds_handler_default_params(self): + assert self.handler.input_type is None + assert self.handler.output_type is None + + @mock.patch( + "ait.core.server.handlers.CCSDSPacketHandler.handle", + return_value="SpecialReturn", + ) + def test_execute_handler_returns_handle_return_on_input(self, handle_mock): + data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01") + returned = self.handler.handle(data) + assert returned == "SpecialReturn" + + def test_handler_repr(self): + assert self.handler.__repr__() == "" + + +class TestHandlerClassWith1553HSPacket(unittest.TestCase): + tlm_dict = tlm.getDefaultDict() + pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x02\x03\x04") + pkt_1553 = tlm_dict['1553_HS_Packet'] + handler = PacketHandler(packet="1553_HS_Packet") + + def test_word_array(self): + packet = tlm.Packet(self.pkt_1553, self.pkt_data) + assert packet.words.__len__() == self.pkt_1553.nbytes/2 + + def test_1553_uid(self): + packet_uid = self.tlm_dict["1553_HS_Packet"].uid + result = self.handler.handle(self.pkt_data) + self.assertEqual(packet_uid, pickle.loads(result)[0]) + + # Send only 5 bytes 1553 Packet expects 10 bytes + def test_bad_packet_length(self): + pkt_data = bytearray(b"\x02\xE7\x40\x00\x00") + with self.assertLogs("ait", level="INFO") as cm: + self.handler.handle(pkt_data) + self.assertIn( + "Packet data length does not match packet definition.", + cm.output[0], + ) + + def test_packet_name_error_and_no_packet_type(self): + with pytest.raises(ValueError): + PacketHandler(packet="") + + def test_handler_class_with_bad_packet_type(self): + with pytest.raises(ValueError): + PacketHandler(packet="Ethernet_Packet") + + +class TestHandlerClassWithEthernetHSPacket(unittest.TestCase): + tlm_dict = tlm.getDefaultDict() + pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x40\x00\x03\x02\xE7\x40\x00\x00\x00\x01\x40\x00\x03" + b"\x02\xE7\x40\x00\x00\x00\x01\x40\x00\x03\x02\xE7\x40\x00\x00\x00\x01") + ethernet_pkt_def = tlm_dict['Ethernet_HS_Packet'] + handler = PacketHandler(packet="Ethernet_HS_Packet") + + def test_word_array(self): + e_packet = tlm.Packet(self.ethernet_pkt_def, self.pkt_data) + assert e_packet.words.__len__() == self.ethernet_pkt_def.nbytes/2.0 + + def test_1553_uid(self): + packet_uid = self.tlm_dict["Ethernet_HS_Packet"].uid + result = self.handler.handle(self.pkt_data) + self.assertEqual(packet_uid, pickle.loads(result)[0]) + + # Ethernet packet expects 37 bytes 1552 expects 10 + def test_bad_packet_length(self): + pkt_data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x07\x08\x0a") + with self.assertLogs("ait", level="INFO") as cm: + self.handler.handle(pkt_data) + self.assertIn( + "Packet data length does not match packet definition.", + cm.output[0], + ) + + class TestHandlerClassWithoutInputOutputTypes(object): - handler = PacketHandler(packet="CCSDS_HEADER") + handler = PacketHandler(packet="Ethernet_HS_Packet") def test_handler_default_params(self): assert self.handler.input_type is None @@ -80,9 +172,9 @@ def test_handler_repr(self): assert self.handler.__repr__() == "" -class TestCCSDSHandlerClassWithInputOutputTypes(object): - handler = CCSDSPacketHandler( - packet_types={"01011100111": "CCSDS_HEADER"}, +class TestHandlerClassWithInputOutputTypes(object): + handler = PacketHandler( + packet='1553_HS_Packet', input_type="int", output_type="str", ) @@ -92,30 +184,31 @@ def test_handler_creation(self): assert self.handler.output_type is "str" @mock.patch( - "ait.core.server.handlers.CCSDSPacketHandler.handle", + "ait.core.server.handlers.PacketHandler.handle", return_value="SpecialReturn", ) def test_execute_handler_returns_handle_return_on_input(self, handle_mock): - data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01") + data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x02\x03\x04") returned = self.handler.handle(data) assert returned == "SpecialReturn" -class TestCCSDSHandlerClassWithoutInputOutputTypes(object): - handler = CCSDSPacketHandler(packet_types={"01011100111": "CCSDS_HEADER"}) +class TestHandlerClassWithoutInputOutputTypes(object): + handler = PacketHandler(packet="Ethernet_HS_Packet") def test_ccsds_handler_default_params(self): assert self.handler.input_type is None assert self.handler.output_type is None @mock.patch( - "ait.core.server.handlers.CCSDSPacketHandler.handle", + "ait.core.server.handlers.PacketHandler.handle", return_value="SpecialReturn", ) def test_execute_handler_returns_handle_return_on_input(self, handle_mock): - data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01") + # Note: Using 'mock' handler, the data will not be tested for length. + data = bytearray(b"\x02\xE7\x40\x00\x00\x00\x01\x02\x03\x04") returned = self.handler.handle(data) assert returned == "SpecialReturn" def test_handler_repr(self): - assert self.handler.__repr__() == "" + assert self.handler.__repr__() == ""