Skip to content

Commit

Permalink
install pre-commit now
Browse files Browse the repository at this point in the history
  • Loading branch information
cjjacks committed Oct 23, 2024
1 parent b8eac95 commit f54bb72
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 96 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ logs:
@cd docker; docker compose logs --follow

bash:
@cd docker; docker compose exec ait-server bash
@cd docker; docker compose exec ait-server bash
84 changes: 31 additions & 53 deletions ait/core/server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):

super(UDPOutputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
Expand All @@ -140,15 +139,16 @@ def __init__(
if type(output) is int:
self.addr_spec = ("localhost", output)
elif utils.is_valid_address_spec(output):
protocol,hostname,port = output.split(":")
protocol, hostname, port = output.split(":")
if protocol.lower() != "udp":
raise (ValueError(f"UDPOutputClient: Invalid Specification {output}"))
self.addr_spec = (hostname,int(port))
raise (
ValueError(f"UDPOutputClient: Invalid Specification {output}")
)
self.addr_spec = (hostname, int(port))
else:
raise (ValueError(f"UDPOutputClient: Invalid Specification {output}"))
else:
raise (ValueError(f"UDPOutputClient: Invalid Specification"))

raise (ValueError("UDPOutputClient: Invalid Specification"))

self.context = zmq_context
# override pub to be udp socket
Expand All @@ -158,6 +158,7 @@ def publish(self, msg):
self.pub.sendto(msg, self.addr_spec)
log.debug("Published message from {}".format(self))


class TCPOutputClient(ZMQInputClient):
"""
This is the parent class for all outbound streams which publish
Expand All @@ -172,33 +173,31 @@ def __init__(
zmq_proxy_xpub_url=ait.SERVER_DEFAULT_XPUB_URL,
**kwargs,
):

super(TCPOutputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
if "output" in kwargs:
output = kwargs["output"]
if utils.is_valid_address_spec(output):
protocol,hostname,port = output.split(":")
protocol, hostname, port = output.split(":")
if protocol.lower() != "tcp":
raise (ValueError(f"TCPOutputClient: Invalid Specification {output}"))
self.addr_spec = (hostname,int(port))
raise (
ValueError(f"TCPOutputClient: Invalid Specification {output}")
)
self.addr_spec = (hostname, int(port))
else:
raise (ValueError(f"TCPOutputClient: Invalid Specification {output}"))
else:
raise (ValueError(f"TCPOutputClient: Invalid Specification"))

raise (ValueError("TCPOutputClient: Invalid Specification"))

self.context = zmq_context
self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


def publish(self, msg):
self.pub.connect(self.addr_spec)
self.pub.sendall(msg)



class UDPInputServer(ZMQClient, gs.DatagramServer):
"""
This is the parent class for all inbound streams which receive messages
Expand All @@ -218,7 +217,7 @@ def __init__(
if type(input) is int:
host_spec = input
elif utils.is_valid_address_spec(input):
protocol,hostname,port = input.split(":")
protocol, hostname, port = input.split(":")
if protocol.lower() != "udp":
raise (ValueError(f"UDPInputServer: Invalid Specification {input}"))
if hostname in ["127.0.0.1", "localhost"]:
Expand All @@ -227,7 +226,7 @@ def __init__(
host_spec = f"0.0.0.0:{port}"
else:
raise (ValueError(f"UDPInputServer: Invalid Specification {input}"))

else:
raise (ValueError(f"UDPInputServer: Invalid Specification {input}"))
super(UDPInputServer, self).__init__(
Expand Down Expand Up @@ -266,26 +265,21 @@ def __init__(
if "input" in kwargs:
input = kwargs["input"]
if not utils.is_valid_address_spec(input):
raise (
ValueError(
f"TCPInputServer: Invalid Specification {input}"
)
)
protocol,hostname,port = input.split(":")
if protocol.lower() != "tcp" or hostname not in ["127.0.0.1", "localhost", "server", "0.0.0.0"]:
raise (
ValueError(
f"TCPInputServer: Invalid Specification {input}"
)
)
raise (ValueError(f"TCPInputServer: Invalid Specification {input}"))
protocol, hostname, port = input.split(":")
if protocol.lower() != "tcp" or hostname not in [
"127.0.0.1",
"localhost",
"server",
"0.0.0.0",
]:
raise (ValueError(f"TCPInputServer: Invalid Specification {input}"))

self.sub = gevent.socket.socket(
gevent.socket.AF_INET, gevent.socket.SOCK_STREAM
)
hostname = (
"127.0.0.1"
if hostname in ["127.0.0.1", "localhost"]
else "0.0.0.0"
"127.0.0.1" if hostname in ["127.0.0.1", "localhost"] else "0.0.0.0"
)
super(TCPInputServer, self).__init__(
zmq_context,
Expand All @@ -294,11 +288,7 @@ def __init__(
listener=(hostname, int(port)),
)
else:
raise (
ValueError(
"TCPInputServer: Invalid Specification"
)
)
raise (ValueError("TCPInputServer: Invalid Specification"))

def handle(self, socket, address):
self.cur_socket = socket
Expand Down Expand Up @@ -344,18 +334,10 @@ def __init__(
if "input" in kwargs:
input = kwargs["input"]
if not utils.is_valid_address_spec(input):
raise (
ValueError(
f"TCPInputClient: Invalid Specification {input}"
)
)
protocol,hostname,port = input.split(":")
raise (ValueError(f"TCPInputClient: Invalid Specification {input}"))
protocol, hostname, port = input.split(":")
if protocol.lower() != "tcp":
raise (
ValueError(
f"TCPInputClient: Invalid Specification {input}"
)
)
raise (ValueError(f"TCPInputClient: Invalid Specification {input}"))
super(TCPInputClient, self).__init__(
zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url
)
Expand All @@ -364,14 +346,10 @@ def __init__(

self.hostname = hostname
self.port = int(port)
self.address = (hostname,int(port))
self.address = (hostname, int(port))

else:
raise (
ValueError(
"TCPInputClient: Invalid Specification"
)
)
raise (ValueError("TCPInputClient: Invalid Specification"))

def __exit__(self):
try:
Expand Down
2 changes: 1 addition & 1 deletion ait/core/server/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .ccsds_packet_handler import * # noqa
from .debug_handler import * # noqa
from .packet_handler import * # noqa
from .debug_handler import * # noqa
2 changes: 1 addition & 1 deletion ait/core/server/handlers/debug_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ def __init__(self, input_type=None, output_type=None, **kwargs):

def handle(self, input_data):
ait.core.log.info(f"{self.handler_name} received {len(input_data)} bytes")
return input_data
return input_data
54 changes: 41 additions & 13 deletions ait/core/server/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import ait.core.log
from .client import UDPOutputClient, TCPOutputClient, TCPInputClient, TCPInputServer, UDPInputServer, ZMQInputClient
import ait
from .client import TCPInputClient
from .client import TCPInputServer
from .client import TCPOutputClient
from .client import UDPInputServer
from .client import UDPOutputClient
from .client import ZMQInputClient
from .utils import is_valid_address_spec


Expand Down Expand Up @@ -107,16 +111,24 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None):
if type(outputs) is not list or (type(outputs) is list and len(outputs) == 0):
raise ValueError(f"Output stream specification invalid: {outputs}")
# backwards compatability with original UDP spec
if type(outputs) is list and type(outputs[0]) is int and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT:
if (
type(outputs) is list
and type(outputs[0]) is int
and ait.MIN_PORT <= outputs[0] <= ait.MAX_PORT
):
ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args)
elif is_valid_address_spec(outputs[0]):
protocol,hostname,port = outputs[0].split(':')
protocol, hostname, port = outputs[0].split(":")
if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT:
raise ValueError(f"Output stream specification invalid: {outputs}")
if protocol.lower() == "udp":
ostream = UDPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args)
ostream = UDPOutputStream(
name, inputs, outputs[0], handlers, zmq_args=zmq_args
)
elif protocol.lower() == "tcp":
ostream = TCPOutputStream(name, inputs, outputs[0], handlers, zmq_args=zmq_args)
ostream = TCPOutputStream(
name, inputs, outputs[0], handlers, zmq_args=zmq_args
)
else:
raise ValueError(f"Output stream specification invalid: {outputs}")
else:
Expand All @@ -127,7 +139,6 @@ def output_stream_factory(name, inputs, outputs, handlers, zmq_args=None):
return ostream



def input_stream_factory(name, inputs, handlers, zmq_args=None):
"""
This factory preempts the creating of streams directly. It accepts
Expand All @@ -140,22 +151,38 @@ def input_stream_factory(name, inputs, handlers, zmq_args=None):

if type(inputs) is not list or (type(inputs) is list and len(inputs) == 0):
raise ValueError(f"Input stream specification invalid: {inputs}")

# backwards compatability with original UDP server spec
if type(inputs) is list and type(inputs[0]) is int and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT:
if (
type(inputs) is list
and type(inputs[0]) is int
and ait.MIN_PORT <= inputs[0] <= ait.MAX_PORT
):
stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args)
elif is_valid_address_spec(inputs[0]):
protocol,hostname,port = inputs[0].split(':')
protocol, hostname, port = inputs[0].split(":")
if int(port) < ait.MIN_PORT or int(port) > ait.MAX_PORT:
raise ValueError(f"Input stream specification invalid: {inputs}")
if protocol.lower() == "tcp":
if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]:
if hostname.lower() in [
"server",
"localhost",
"127.0.0.1",
"0.0.0.0",
]:
stream = TCPInputServerStream(name, inputs[0], handlers, zmq_args)
else:
stream = TCPInputClientStream(name, inputs[0], handlers, zmq_args)
else:
if hostname.lower() in ["server","localhost","127.0.0.1","0.0.0.0",]:
stream = UDPInputServerStream(name, inputs[0], handlers, zmq_args=zmq_args)
if hostname.lower() in [
"server",
"localhost",
"127.0.0.1",
"0.0.0.0",
]:
stream = UDPInputServerStream(
name, inputs[0], handlers, zmq_args=zmq_args
)
else:
raise ValueError(f"Input stream specification invalid: {inputs}")
elif all(isinstance(item, str) for item in inputs):
Expand Down Expand Up @@ -216,6 +243,7 @@ def __init__(self, name, inputs, output, handlers, zmq_args=None):
name, inputs, handlers, zmq_args, output=output
)


class TCPOutputStream(Stream, TCPOutputClient):
"""
This stream type listens for messages from another stream or plugin and
Expand Down
1 change: 1 addition & 0 deletions ait/core/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def decode_message(msg):

return (tpc, msg)


def is_valid_address_spec(address):
if type(address) is not str:
return False
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ ENTRYPOINT ["/usr/bin/bash","-c"]
# && echo 'export AIT_CONFIG=${PROJECT_HOME}/docker/network-test-config.yaml' >> ~/.bashrc \
# && echo 'export POETRY_VIRTUALENVS_CREATE=false' >> ~/.bashrc \
# && poetry install --no-interaction --no-ansi
# ENTRYPOINT ["/usr/bin/bash","-c"]
# ENTRYPOINT ["/usr/bin/bash","-c"]
2 changes: 1 addition & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ services:

networks:
ait:
name: ait
name: ait
1 change: 0 additions & 1 deletion docker/network-test-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,3 @@ default:
- input_stream_debug_2
output:
- "UDP:udp-server:1238"

Loading

0 comments on commit f54bb72

Please sign in to comment.