diff --git a/Makefile b/Makefile index c336b3af..3137234a 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,4 @@ logs: @cd docker; docker compose logs --follow bash: - @cd docker; docker compose exec ait-server bash \ No newline at end of file + @cd docker; docker compose exec ait-server bash diff --git a/ait/core/server/client.py b/ait/core/server/client.py index adff4f3f..60718d2d 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -131,7 +131,6 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - super(UDPOutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) @@ -140,15 +139,16 @@ def __init__( if type(output) is int: self.addr_spec = ("localhost", output) elif utils.is_valid_address_spec(output): - protocol,hostname,port = output.split(":") + protocol, hostname, port = output.split(":") if protocol.lower() != "udp": - raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) - self.addr_spec = (hostname,int(port)) + raise ( + ValueError(f"UDPOutputClient: Invalid Specification {output}") + ) + self.addr_spec = (hostname, int(port)) else: raise (ValueError(f"UDPOutputClient: Invalid Specification {output}")) else: - raise (ValueError(f"UDPOutputClient: Invalid Specification")) - + raise (ValueError("UDPOutputClient: Invalid Specification")) self.context = zmq_context # override pub to be udp socket @@ -158,6 +158,7 @@ def publish(self, msg): self.pub.sendto(msg, self.addr_spec) log.debug("Published message from {}".format(self)) + class TCPOutputClient(ZMQInputClient): """ This is the parent class for all outbound streams which publish @@ -172,33 +173,31 @@ def __init__( zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL, **kwargs, ): - super(TCPOutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) if "output" in kwargs: output = kwargs["output"] if utils.is_valid_address_spec(output): - protocol,hostname,port = output.split(":") + protocol, hostname, port = output.split(":") if protocol.lower() != "tcp": - raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) - self.addr_spec = (hostname,int(port)) + raise ( + ValueError(f"TCPOutputClient: Invalid Specification {output}") + ) + self.addr_spec = (hostname, int(port)) else: raise (ValueError(f"TCPOutputClient: Invalid Specification {output}")) else: - raise (ValueError(f"TCPOutputClient: Invalid Specification")) - + raise (ValueError("TCPOutputClient: Invalid Specification")) self.context = zmq_context self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - def publish(self, msg): self.pub.connect(self.addr_spec) self.pub.sendall(msg) - class UDPInputServer(ZMQClient, gs.DatagramServer): """ This is the parent class for all inbound streams which receive messages @@ -218,7 +217,7 @@ def __init__( if type(input) is int: host_spec = input elif utils.is_valid_address_spec(input): - protocol,hostname,port = input.split(":") + protocol, hostname, port = input.split(":") if protocol.lower() != "udp": raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) if hostname in ["127.0.0.1", "localhost"]: @@ -227,7 +226,7 @@ def __init__( host_spec = f"0.0.0.0:{port}" else: raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) - + else: raise (ValueError(f"UDPInputServer: Invalid Specification {input}")) super(UDPInputServer, self).__init__( @@ -266,26 +265,21 @@ def __init__( if "input" in kwargs: input = kwargs["input"] if not utils.is_valid_address_spec(input): - raise ( - ValueError( - f"TCPInputServer: Invalid Specification {input}" - ) - ) - protocol,hostname,port = input.split(":") - if protocol.lower() != "tcp" or hostname not in ["127.0.0.1", "localhost", "server", "0.0.0.0"]: - raise ( - ValueError( - f"TCPInputServer: Invalid Specification {input}" - ) - ) + raise (ValueError(f"TCPInputServer: Invalid Specification {input}")) + protocol, hostname, port = input.split(":") + if protocol.lower() != "tcp" or hostname not in [ + "127.0.0.1", + "localhost", + "server", + "0.0.0.0", + ]: + raise (ValueError(f"TCPInputServer: Invalid Specification {input}")) self.sub = gevent.socket.socket( gevent.socket.AF_INET, gevent.socket.SOCK_STREAM ) hostname = ( - "127.0.0.1" - if hostname in ["127.0.0.1", "localhost"] - else "0.0.0.0" + "127.0.0.1" if hostname in ["127.0.0.1", "localhost"] else "0.0.0.0" ) super(TCPInputServer, self).__init__( zmq_context, @@ -294,11 +288,7 @@ def __init__( listener=(hostname, int(port)), ) else: - raise ( - ValueError( - "TCPInputServer: Invalid Specification" - ) - ) + raise (ValueError("TCPInputServer: Invalid Specification")) def handle(self, socket, address): self.cur_socket = socket @@ -344,18 +334,10 @@ def __init__( if "input" in kwargs: input = kwargs["input"] if not utils.is_valid_address_spec(input): - raise ( - ValueError( - f"TCPInputClient: Invalid Specification {input}" - ) - ) - protocol,hostname,port = input.split(":") + raise (ValueError(f"TCPInputClient: Invalid Specification {input}")) + protocol, hostname, port = input.split(":") if protocol.lower() != "tcp": - raise ( - ValueError( - f"TCPInputClient: Invalid Specification {input}" - ) - ) + raise (ValueError(f"TCPInputClient: Invalid Specification {input}")) super(TCPInputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) @@ -364,14 +346,10 @@ def __init__( self.hostname = hostname self.port = int(port) - self.address = (hostname,int(port)) + self.address = (hostname, int(port)) else: - raise ( - ValueError( - "TCPInputClient: Invalid Specification" - ) - ) + raise (ValueError("TCPInputClient: Invalid Specification")) def __exit__(self): try: diff --git a/ait/core/server/handlers/__init__.py b/ait/core/server/handlers/__init__.py index d7f65ebd..cbd5094f 100644 --- a/ait/core/server/handlers/__init__.py +++ b/ait/core/server/handlers/__init__.py @@ -1,3 +1,3 @@ from .ccsds_packet_handler import * # noqa +from .debug_handler import * # noqa from .packet_handler import * # noqa -from .debug_handler import * # noqa diff --git a/ait/core/server/handlers/debug_handler.py b/ait/core/server/handlers/debug_handler.py index 32f19231..aab598b4 100644 --- a/ait/core/server/handlers/debug_handler.py +++ b/ait/core/server/handlers/debug_handler.py @@ -9,4 +9,4 @@ def __init__(self, input_type=None, output_type=None, **kwargs): def handle(self, input_data): ait.core.log.info(f"{self.handler_name} received {len(input_data)} bytes") - return input_data \ No newline at end of file + return input_data diff --git a/ait/core/server/stream.py b/ait/core/server/stream.py index 35b72cc3..93b4af6b 100644 --- a/ait/core/server/stream.py +++ b/ait/core/server/stream.py @@ -1,6 +1,10 @@ import ait.core.log -from .client import UDPOutputClient, TCPOutputClient, TCPInputClient, TCPInputServer, UDPInputServer, ZMQInputClient -import ait +from .client import TCPInputClient +from .client import TCPInputServer +from .client import TCPOutputClient +from .client import UDPInputServer +from .client import UDPOutputClient +from .client import ZMQInputClient from .utils import is_valid_address_spec @@ -107,16 +111,24 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): if type(outputs) is not list or (type(outputs) is list and len(outputs) == 0): raise ValueError(f"Output stream specification invalid: {outputs}") # backwards compatability with original UDP spec - if type(outputs) is list and type(outputs[0]) is int and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT: + if ( + type(outputs) is list + and type(outputs[0]) is int + and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT + ): ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) elif is_valid_address_spec(outputs[0]): - protocol,hostname,port = outputs[0].split(':') + protocol, hostname, port = outputs[0].split(":") if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: raise ValueError(f"Output stream specification invalid: {outputs}") if protocol.lower() == "udp": - ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + ostream = UDPOutputStream( + name, inputs, outputs[0], handlers, zmq_args=zmq_args + ) elif protocol.lower() == "tcp": - ostream = TCPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args) + ostream = TCPOutputStream( + name, inputs, outputs[0], handlers, zmq_args=zmq_args + ) else: raise ValueError(f"Output stream specification invalid: {outputs}") else: @@ -127,7 +139,6 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None): return ostream - def input_stream_factory(name, inputs, handlers, zmq_args=None): """ This factory preempts the creating of streams directly. It accepts @@ -140,22 +151,38 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None): if type(inputs) is not list or (type(inputs) is list and len(inputs) == 0): raise ValueError(f"Input stream specification invalid: {inputs}") - + # backwards compatability with original UDP server spec - if type(inputs) is list and type(inputs[0]) is int and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT: + if ( + type(inputs) is list + and type(inputs[0]) is int + and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT + ): stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) elif is_valid_address_spec(inputs[0]): - protocol,hostname,port = inputs[0].split(':') + protocol, hostname, port = inputs[0].split(":") if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT: raise ValueError(f"Input stream specification invalid: {inputs}") if protocol.lower() == "tcp": - if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]: + if hostname.lower() in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: stream = TCPInputServerStream(name, inputs[0], handlers, zmq_args) else: stream = TCPInputClientStream(name, inputs[0], handlers, zmq_args) else: - if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]: - stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args) + if hostname.lower() in [ + "server", + "localhost", + "127.0.0.1", + "0.0.0.0", + ]: + stream = UDPInputServerStream( + name, inputs[0], handlers, zmq_args=zmq_args + ) else: raise ValueError(f"Input stream specification invalid: {inputs}") elif all(isinstance(item, str) for item in inputs): @@ -216,6 +243,7 @@ def __init__(self, name, inputs, output, handlers, zmq_args=None): name, inputs, handlers, zmq_args, output=output ) + class TCPOutputStream(Stream, TCPOutputClient): """ This stream type listens for messages from another stream or plugin and diff --git a/ait/core/server/utils.py b/ait/core/server/utils.py index b711bb20..7f5c2ff3 100644 --- a/ait/core/server/utils.py +++ b/ait/core/server/utils.py @@ -64,6 +64,7 @@ def decode_message(msg): return (tpc, msg) + def is_valid_address_spec(address): if type(address) is not str: return False diff --git a/docker/Dockerfile b/docker/Dockerfile index cd579644..9a1cebb4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -100,4 +100,4 @@ ENTRYPOINT ["/usr/bin/bash","-c"] # && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \ # && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \ # && poetry install --no-interaction --no-ansi -# ENTRYPOINT ["/usr/bin/bash","-c"] \ No newline at end of file +# ENTRYPOINT ["/usr/bin/bash","-c"] diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index a4b7e317..b256b38f 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -88,4 +88,4 @@ services: networks: ait: - name: ait \ No newline at end of file + name: ait diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml index 2fa67b04..8cc3cb1c 100644 --- a/docker/network-test-config.yaml +++ b/docker/network-test-config.yaml @@ -59,4 +59,3 @@ default: - input_stream_debug_2 output: - "UDP:udp-server:1238" - \ No newline at end of file diff --git a/tests/ait/core/server/test_client.py b/tests/ait/core/server/test_client.py index 2fc39f3b..8cf64d6e 100644 --- a/tests/ait/core/server/test_client.py +++ b/tests/ait/core/server/test_client.py @@ -1,55 +1,40 @@ # import gevent - # from ait.core.server.broker import Broker # from ait.core.server.client import TCPInputClient # from ait.core.server.client import TCPInputServer - # broker = Broker() # TEST_BYTES = "Howdy".encode() # TEST_PORT = 6666 - - # class SimpleServer(gevent.server.StreamServer): # def handle(self, socket, address): # socket.sendall(TEST_BYTES) - - # class TCPServer(TCPInputServer): # def __init__(self, name, inputs, **kwargs): # super(TCPServer, self).__init__(broker.context, input=inputs) - # def process(self, input_data): # self.cur_socket.sendall(input_data) - - # class TCPClient(TCPInputClient): # def __init__(self, name, inputs, **kwargs): # super(TCPClient, self).__init__( # broker.context, input=inputs, protocol=gevent.socket.SOCK_STREAM # ) # self.input_data = None - # def process(self, input_data): # self.input_data = input_data # self._exit() - - # class TestTCPServer: # def setup_method(self): # self.server = TCPServer("test_tcp_server", inputs=["server", TEST_PORT]) # self.server.start() # self.client = gevent.socket.create_connection(("127.0.0.1", TEST_PORT)) - # def teardown_method(self): # self.server.stop() # self.client.close() - # def test_TCP_server(self): # nbytes = self.client.send(TEST_BYTES) # response = self.client.recv(len(TEST_BYTES)) # assert nbytes == len(TEST_BYTES) # assert response == TEST_BYTES - # def test_null_send(self): # nbytes1 = self.client.send(b"") # nbytes2 = self.client.send(TEST_BYTES) @@ -57,8 +42,6 @@ # assert nbytes1 == 0 # assert nbytes2 == len(TEST_BYTES) # assert response == TEST_BYTES - - # class TestTCPClient: # def setup_method(self): # self.server = SimpleServer(("127.0.0.1", 0)) @@ -66,15 +49,12 @@ # self.client = TCPClient( # "test_tcp_client", inputs=["127.0.0.1", self.server.server_port] # ) - # def teardown_method(self): # self.server.stop() - # def test_TCP_client(self): # self.client.start() # gevent.sleep(1) # assert self.client.input_data == TEST_BYTES - # def test_bad_connection(self): # self.client.port = 1 # self.client.connection_reattempts = 2 diff --git a/tests/ait/core/server/test_server.py b/tests/ait/core/server/test_server.py index 8592f6d3..28ce004b 100644 --- a/tests/ait/core/server/test_server.py +++ b/tests/ait/core/server/test_server.py @@ -552,4 +552,4 @@ def rewrite_and_reload_config(filename, yaml): class FakeStream(object): def __init__(self, name, input_=None, handlers=None, zmq_args=None): - self.name = name \ No newline at end of file + self.name = name diff --git a/tests/ait/core/server/test_stream.py b/tests/ait/core/server/test_stream.py index 8b48bcd0..cf0491a1 100644 --- a/tests/ait/core/server/test_stream.py +++ b/tests/ait/core/server/test_stream.py @@ -59,7 +59,7 @@ class TestStream: "tcp_server", { "name": "some_tcp_stream_server", - "inputs": 'TCP:server:1234', + "inputs": "TCP:server:1234", "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -72,7 +72,7 @@ class TestStream: "tcp_client", { "name": "some_tcp_stream_client", - "inputs": 'TCP:127.0.0.1:1234', + "inputs": "TCP:127.0.0.1:1234", "handlers_len": 1, "handler_type": PacketHandler, "broker_context": broker.context, @@ -257,4 +257,4 @@ def test_invalid_input_stream_factory(self, args, expected): # {"zmq_context": broker.context}, # ] # with pytest.raises(expected): - # _ = output_stream_factory(*full_args) \ No newline at end of file + # _ = output_stream_factory(*full_args)