Skip to content
This repository was archived by the owner on Jul 3, 2019. It is now read-only.

Commit 2e11c56

Browse files
committed
Begin implementation of service discovery/state
Issue #2: implement mechanism for sharing state over broadcast address Issue #3: all servers broadcast on the same network so they will be in sync
1 parent 0967253 commit 2e11c56

File tree

1 file changed

+254
-0
lines changed

1 file changed

+254
-0
lines changed

sync.py

+254
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
"""
2+
Sync module for robocluster process manager.
3+
4+
Implements a context managers for the process manager server and for services.
5+
"""
6+
7+
__author__ = 'Jarrod Pas <[email protected]>'
8+
9+
import asyncio
10+
import json
11+
import socket
12+
import sys
13+
from argparse import ArgumentParser
14+
from ipaddress import ip_network
15+
from time import time
16+
17+
18+
class BaseSocketLoop:
19+
"""Base context for event loops using sockets."""
20+
21+
def __init__(self):
22+
"""Initialize the event loop."""
23+
self.loop = asyncio.new_event_loop()
24+
25+
def __enter__(self):
26+
"""Enter context manager."""
27+
return self
28+
29+
def __exit__(self, *exc):
30+
"""Exit context manager."""
31+
self.close()
32+
return False
33+
34+
def run(self):
35+
"""Run the event loop forever."""
36+
self.loop.run_forever()
37+
38+
def stop(self):
39+
"""Stop the event loop."""
40+
self.loop.stop()
41+
42+
def close(self):
43+
"""Close the event loop in a clean manner."""
44+
self.stop()
45+
for task in asyncio.Task.all_tasks(loop=self.loop):
46+
task.cancel()
47+
self.loop.run_forever() # wait for all tasks to be cancelled
48+
self.loop.close()
49+
50+
@staticmethod
51+
def socket(*args, **kwargs):
52+
"""Create a socket."""
53+
sock = socket.socket(*args, **kwargs)
54+
sock.setblocking(False)
55+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
56+
return sock
57+
58+
def accept(self, sock):
59+
"""Accept a connection."""
60+
return self.loop.sock_accept(sock)
61+
62+
def connect(self, sock, addressess):
63+
"""Connect to a remote socket at addressess."""
64+
return self.loop.sock_connect(sock, addressess)
65+
66+
async def recv(self, sock):
67+
"""Receive data from the socket."""
68+
packet = await self.loop.sock_recv(sock, 4096)
69+
return self.decode(packet)
70+
71+
def recvfrom(self, sock, future=None):
72+
"""Receive data from the socket."""
73+
fileno = sock.fileno()
74+
if future is None:
75+
future = self.loop.create_future()
76+
else:
77+
self.loop.remove_reader(fileno)
78+
79+
try:
80+
packet, address = sock.recvfrom(4096)
81+
except (BlockingIOError, InterruptedError):
82+
self.loop.add_reader(fileno,
83+
self.recvfrom, sock, future)
84+
else:
85+
data = self.decode(packet)
86+
future.set_result((data, address))
87+
88+
return future
89+
90+
def send(self, sock, data):
91+
"""Send data to the socket."""
92+
packet = json.dumps(data).encode('utf-8')
93+
return self.loop.sock_sendall(sock, packet)
94+
95+
def sendto(self, sock, data, address, future=None):
96+
"""Send data to the socket."""
97+
fileno = sock.fileno()
98+
if future is None:
99+
future = self.loop.create_future()
100+
else:
101+
self.loop.remove_writer(fileno)
102+
103+
if not data:
104+
return
105+
106+
try:
107+
packet = self.encode(data)
108+
nbytes = sock.sendto(packet, address)
109+
except (BlockingIOError, InterruptedError):
110+
self.loop.add_writer(fileno,
111+
self.sendto, sock, data, address, future)
112+
else:
113+
future.set_result(nbytes)
114+
115+
return future
116+
117+
@staticmethod
118+
def decode(packet):
119+
"""Decode a packet."""
120+
return packet
121+
122+
@staticmethod
123+
def encode(packet):
124+
"""Encode a packet."""
125+
return packet
126+
127+
128+
class JSONSocketLoop(BaseSocketLoop):
129+
"""Socket loop with JSON as transport."""
130+
131+
@staticmethod
132+
def decode(packet):
133+
"""Decode a packet from JSON."""
134+
try:
135+
return json.loads(packet)
136+
except json.JSONDecodeError:
137+
# "Doesn't look like anything to me." - Dolores
138+
return None
139+
140+
@staticmethod
141+
def encode(packet):
142+
"""Encode a packet to JSON."""
143+
return json.dumps(packet).encode('utf-8')
144+
145+
146+
class Server(JSONSocketLoop):
147+
"""Server to keep track of services."""
148+
149+
def __init__(self, subnet, port):
150+
"""Initialize server."""
151+
super().__init__()
152+
153+
self.broadcast = ip_network(subnet).broadcast_address.compressed
154+
self.port = port
155+
156+
def run(self):
157+
"""Run event loop forever."""
158+
self.loop.create_task(self.discover())
159+
self.loop.create_task(self.server())
160+
super().run()
161+
162+
async def discover(self):
163+
"""Loop for discovering services."""
164+
with self.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
165+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
166+
sock.bind((self.broadcast, self.port))
167+
while True:
168+
data, address = await self.recvfrom(sock)
169+
if data:
170+
print(f'discover: {address} > {data}')
171+
172+
async def server(self):
173+
"""Loop for serving requests for where services reside."""
174+
with self.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
175+
sock.bind(('localhost', self.port))
176+
sock.listen()
177+
while True:
178+
conn, address = await self.accept(sock)
179+
self.loop.create_task(self.handler(conn, address))
180+
181+
async def handler(self, conn, address):
182+
"""Handle request for services."""
183+
with conn:
184+
data = await self.recv(conn)
185+
if data:
186+
print(f'handle: {address} > {data}')
187+
188+
189+
class Service(JSONSocketLoop):
190+
"""Service for the server to track."""
191+
192+
def __init__(self, subnet, port):
193+
"""Initialize service."""
194+
super().__init__()
195+
196+
self.broadcast = ip_network(subnet).broadcast_address.compressed
197+
self.port = port
198+
199+
def run(self):
200+
"""Run the event loop forever."""
201+
self.loop.create_task(self.advertise())
202+
super().run()
203+
204+
async def advertise(self):
205+
"""Advertise service to server."""
206+
sock = self.socket(socket.AF_INET, socket.SOCK_DGRAM)
207+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
208+
while True:
209+
address = self.broadcast, self.port
210+
data = {'time': time()}
211+
await self.sendto(sock, data, address)
212+
await asyncio.sleep(1)
213+
214+
215+
def server(args):
216+
"""Run a server."""
217+
with Server(args.subnet, args.port) as server_:
218+
try:
219+
server_.run()
220+
except KeyboardInterrupt:
221+
pass
222+
223+
224+
def service(args):
225+
"""Run a service."""
226+
with Service(args.subnet, args.port) as service_:
227+
try:
228+
service_.run()
229+
except KeyboardInterrupt:
230+
pass
231+
232+
233+
def main(args):
234+
"""Parse arguments and run main routine."""
235+
parser = ArgumentParser()
236+
subparsers = parser.add_subparsers(dest='command')
237+
subparsers.required = True
238+
239+
server_parser = subparsers.add_parser('server')
240+
server_parser.add_argument('subnet')
241+
server_parser.add_argument('port', type=int)
242+
server_parser.set_defaults(func=server)
243+
244+
service_parser = subparsers.add_parser('service')
245+
service_parser.add_argument('subnet')
246+
service_parser.add_argument('port', type=int)
247+
service_parser.set_defaults(func=service)
248+
249+
args = parser.parse_args(args)
250+
return args.func(args)
251+
252+
253+
if __name__ == '__main__':
254+
exit(main(sys.argv[1:]))

0 commit comments

Comments
 (0)