diff --git a/interoperability/mqtt/brokers/bridges/TCPBridges.py b/interoperability/mqtt/brokers/bridges/TCPBridges.py index 6efa9f1..93e440b 100644 --- a/interoperability/mqtt/brokers/bridges/TCPBridges.py +++ b/interoperability/mqtt/brokers/bridges/TCPBridges.py @@ -46,41 +46,46 @@ def clear(self): self.__init__() def disconnected(self, reasoncode, properties): - logging.info("disconnected %s %s", str(reasoncode), str(properties)) + logger.info("Bridge: disconnected %s %s", str(reasoncode), str(properties)) self.disconnects.append({"reasonCode" : reasoncode, "properties" : properties}) def connectionLost(self, cause): - logging.info("connectionLost %s" % str(cause)) + logger.info("Bridge: connectionLost %s" % str(cause)) def publishArrived(self, topicName, payload, qos, retained, msgid, properties=None): - logging.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties)) + logger.info("Bridge: publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties)) self.messages.append((topicName, payload, qos, retained, msgid, properties)) self.messagedicts.append({"topicname" : topicName, "payload" : payload, "qos" : qos, "retained" : retained, "msgid" : msgid, "properties" : properties}) # add to local broker - self.broker.broker.publish(aClientid, topic, message, qos, properties, receivedTime, retained) + self.broker.broker.publish("bridge", topicName, payload, qos, retained, properties, time.monotonic()) return True def published(self, msgid): - logging.info("published %d", msgid) + logger.info("Bridge: published %d", msgid) self.publisheds.append(msgid) def subscribed(self, msgid, data): - logging.info("subscribed %d", msgid) + logger.info("Bridge: subscribed %d", msgid) self.subscribeds.append((msgid, data)) def unsubscribed(self, msgid): - logging.info("unsubscribed %d", msgid) + logger.info("Bridge: unsubscribed %d", msgid) self.unsubscribeds.append(msgid) class Bridges: - def __init__(self, host, port): + def __init__(self, name="local", host="localhost", port=1883, topic="+", direction="both", localprefix="", remoteprefix=""): + self.name = name self.host = host - self.port = port - self.client = mqtt.clients.V5.Client("local") + self.port = int(port) + self.topic = topic + self.direction = direction + self.localprefix = localprefix.strip('\"') + self.remoteprefix = remoteprefix.strip('\"') + self.client = mqtt.clients.V5.Client(name) self.callback = Callbacks(broker5) self.client.registerCallback(self.callback) self.local_connect() @@ -88,20 +93,42 @@ def __init__(self, host, port): def local_connect(self): # connect locally with V5, so we get noLocal and retainAsPublished connect = MQTTV5.Connects() - connect.ClientIdentifier = "local" + connect.ClientIdentifier = self.name + logger.debug("Bridge: local_connect:"+connect.ClientIdentifier) broker5.connect(self, connect) subscribe = MQTTV5.Subscribes() options = MQTTV5.SubscribeOptions() options.noLocal = options.retainAsPublished = True - subscribe.data = [('+', options)] + subscribe.data = [(self.topic, options)] broker5.subscribe(self, subscribe) def connect(self): - self.client.connect(host=self.host, port=self.port, cleanstart=True) + logger.info("Bridge: connect: connecting to %s:%d"%(self.host, self.port)) + connected = False + retry=2 + while not connected: + try: + self.client.connect(host=self.host, port=self.port, cleanstart=True) + connected = True + except OSError as e: + #try again with a small amount of backoff, could ake this configurable + logger.debug("Bridge: failed to connect to remote end due to an OS Error (%s), retrying...",str(e)) + time.sleep(retry) + retry *= 2 + except MQTTV5.MQTTException as e: + #I think we'll retry this one too, the other end probably wasn't ready + logger.debug("Bridge: failed to connect to remote end due to an MQTT Error (%s), retrying...",str(e)) + time.sleep(retry) + retry *= 2 + + # subscribe if necessary options = MQTTV5.SubscribeOptions() options.noLocal = options.retainAsPublished = True - self.client.subscribe(["+"], [options]) + if self.direction == "both" or self.direction == "in": + self.client.subscribe([self.topic], [options]) + else: + logger.info("Bridge: not subscribing to remote") def getPacket(self): # get packet from remote @@ -109,12 +136,14 @@ def getPacket(self): def handlePacket(self, packet): # response from local broker - logger.info("from local broker %s", str(packet)) + logger.info("Bridge: from local broker %s", str(packet)) if packet.fh.PacketType == MQTTV5.PacketTypes.PUBLISH: - self.client.publish(packet.topicName, packet.data, packet.fh.QoS) #retained=False, properties=None) + logger.info("Bridge: sending on %s", self.remoteprefix+packet.topicName) + self.client.publish(self.remoteprefix+packet.topicName, packet.data, packet.fh.QoS) #retained=False, properties=None) def run(self): while True: + logger.info("Bridge: initiating connect logic") self.connect() time.sleep(300) self.shutdown() @@ -123,14 +152,14 @@ def setBroker5(aBroker5): global broker5 broker5 = aBroker5 -def create(port, host="", TLS=False, +def create(name="local", host="localhost", port=1883, topic="+", direction="both", localprefix="", remoteprefix="", TLS=False, cert_reqs=ssl.CERT_REQUIRED, ca_certs=None, certfile=None, keyfile=None): if host == "": host = "localhost" - logger.info("Starting TCP bridge for address '%s' port %d %s", host, port, "with TLS support" if TLS else "") - bridge = Bridges(host, port) + logger.info("Bridge: Starting TCP bridge '%s' for address '%s' port %d %s", name, host, int(port), "with TLS support" if TLS else "") + bridge = Bridges(name, host, port, topic, direction, localprefix, remoteprefix) thread = threading.Thread(target=bridge.run) thread.start() return bridge diff --git a/interoperability/mqtt/brokers/start.py b/interoperability/mqtt/brokers/start.py index 36070e7..d973e97 100644 --- a/interoperability/mqtt/brokers/start.py +++ b/interoperability/mqtt/brokers/start.py @@ -53,6 +53,7 @@ def setup_persistence(persistence_filename): def process_config(config): options = {} servers_to_create = [] + bridges_to_create = [] lineno = 0 while lineno < len(config): curline = config[lineno].strip() @@ -81,7 +82,8 @@ def process_config(config): if len(words) >= 4: if words[3] in ["mqttsn", "http"]: protocol = words[3] - while lineno < len(config) and not config[lineno].strip().startswith("listener"): + while lineno < len(config) and not (config[lineno].strip().startswith("listener") or + config[lineno].strip().startswith("connection") ): curline = config[lineno].strip() lineno += 1 if curline.startswith('#') or len(curline) == 0: @@ -104,8 +106,54 @@ def process_config(config): elif protocol == "http": servers_to_create.append((HTTPListeners, {"host":bind_address, "port":port, "TLS":TLS, "cert_reqs":cert_reqs, "ca_certs":ca_certs, "certfile":certfile, "keyfile":keyfile})) + elif words[0] == "connection": + # Bridge connection, pull out address, protocol and topic lines. + bridgename="local" + address = "localhost" + host = "localhost" + port = "1883" + protocol = "mqtt" + topic = "+" + direction = "both" + localprefix = "" + remoteprefix = "" + if len(words) > 1: + bridgename = words[1] + while lineno < len(config) and not (config[lineno].strip().startswith("listener") or + config[lineno].strip().startswith("connection")) : + curline = config[lineno].strip() + lineno+=1 + if curline.startswith('#') or len(curline) == 0: + continue + words = curline.split() + if words[0] == "protocol": + protocol = words[1] + elif words[0] == "address": + address = words[1] + parts = address.split(":") + host = parts[0] + if len(parts)>1: + port = int(parts[1]) + elif words[0] == "topic": + if len(words) > 1: + topic = words[1] + if len(words) > 2: + direction = words[2] + if len(words) > 3: + localprefix = words[3] + if len(words) > 4: + remoteprefix = words[4] + if protocol == "mqtt": + bridges_to_create.append((TCPBridges, {"name":bridgename, + "host":host, + "port":port, + "topic":topic, + "direction":direction, + "localprefix":localprefix, + "remoteprefix":remoteprefix})) + servers_to_create[-1][1]["serve_forever"] = True - return servers_to_create, options + return servers_to_create, options, bridges_to_create def run(config=None): global logger, broker3, broker5, brokerSN, server @@ -122,7 +170,7 @@ def run(config=None): options = {} if config != None: - servers_to_create, options = process_config(config) + servers_to_create, options, bridges_to_create = process_config(config) broker3 = MQTTV3Brokers(options=options, lock=lock, sharedData=sharedData) @@ -139,6 +187,7 @@ def run(config=None): brokerSN.setBroker5(broker5) servers = [] + bridges = [] UDPListeners.setBroker(brokerSN) TCPListeners.setBrokers(broker3, broker5) HTTPListeners.setBrokers(broker3, broker5, brokerSN) @@ -146,10 +195,15 @@ def run(config=None): try: if config == None: - #TCPBridges.setBroker5(broker5) - #TCPBridges.create(1886) +# TCPBridges.setBroker5(broker5) +# TCPBridges.create("bridge",1883,"172.16.0.4") servers.append(TCPListeners.create(1883, serve_forever=True)) else: + logger.debug("Starting bridges") + for bridge in bridges_to_create: + bridge[0].setBroker5(broker5) + bridges.append(bridge[0].create(**bridge[1])) + logger.debug("Starting servers") for server in servers_to_create: servers.append(server[0].create(**server[1])) except KeyboardInterrupt: diff --git a/interoperability/mqtt/clients/V5/main.py b/interoperability/mqtt/clients/V5/main.py index 36e33aa..595d3d1 100755 --- a/interoperability/mqtt/clients/V5/main.py +++ b/interoperability/mqtt/clients/V5/main.py @@ -35,7 +35,7 @@ logger.addHandler(ch) def sendtosocket(mysocket, data): - logger.debug("out: %s", str(data)) + logger.debug("MQTTv5 Client: sendtosocket: %s", str(data)) sent = 0 length = len(data) try: @@ -48,23 +48,23 @@ def sendtosocket(mysocket, data): class Callback: def connectionLost(self, cause): - logger.debug("default connectionLost %s", str(cause)) + logger.debug("MQTTv5 Client: default connectionLost %s", str(cause)) def publishArrived(self, topicName, payload, qos, retained, msgid): - logger.debug("default publishArrived %s %s %d %d %d", topicName, payload, qos, retained, msgid) + logger.debug("MQTTv5 Client: default publishArrived %s %s %d %d %d", topicName, payload, qos, retained, msgid) return True def published(self, msgid): - logger.debug("default published %d", msgid) + logger.debug("MQTTv5 Client: default published %d", msgid) def subscribed(self, msgid): - logger.debug("default subscribed %d", msgid) + logger.debug("MQTTv5 Client: default subscribed %d", msgid) def unsubscribed(self, msgid): - logger.debug("default unsubscribed %d", msgid) + logger.debug("MQTTv5 Client: default unsubscribed %d", msgid) def disconnected(self, reasoncode, properties): - logger.debug("default disconnected") + logger.debug("MQTTv5 Client: default disconnected") @@ -74,6 +74,7 @@ def getReceiver(self): return self.__receiver def __init__(self, clientid): + logger.debug("MQTTv5 Client: init(%s)", clientid) self.clientid = clientid self.msgid = 1 self.callback = None @@ -105,6 +106,7 @@ def registerCallback(self, callback): def connect(self, host="localhost", port=1883, cleanstart=True, keepalive=0, newsocket=True, protocolName=None, willFlag=False, willTopic=None, willMessage=None, willQoS=2, willRetain=False, username=None, password=None, properties=None, willProperties=None): + logger.debug("MQTTv5 Client: connect(host=%s, newsocket=%s)", host, newsocket) if newsocket: try: self.sock.close() @@ -181,6 +183,7 @@ def unsubscribe(self, topics): def publish(self, topic, payload, qos=0, retained=False, properties=None): + logger.debug("MQTTv5 Client: publish on topic '%s'", topic) publish = MQTTV5.Publishes() publish.fh.QoS = qos publish.fh.RETAIN = retained @@ -200,11 +203,12 @@ def publish(self, topic, payload, qos=0, retained=False, properties=None): def disconnect(self, properties=None): + logger.debug("MQTTv5 Client: disconnecting") if self.__receiver: self.__receiver.stopping = True count = 0 while (len(self.__receiver.inMsgs) > 0 or len(self.__receiver.outMsgs) > 0) and self.__receiver.paused == False: - logger.debug("disconnecting %s %s", self.__receiver.inMsgs, self.__receiver.outMsgs) + logger.debug("MQTTv5 Client: disconnecting %s %s", self.__receiver.inMsgs, self.__receiver.outMsgs) time.sleep(.2) count += 1 if count == 20: @@ -228,6 +232,7 @@ def disconnect(self, properties=None): self.__receiver.stopping = False def terminate(self): + logger.debug("MQTTv5 Client: terminating") if self.__receiver: self.__receiver.stopping = True self.sock.shutdown(socket.SHUT_RDWR) diff --git a/interoperability/udp.conf b/interoperability/udp.conf index 7929a5d..44ce9e5 100644 --- a/interoperability/udp.conf +++ b/interoperability/udp.conf @@ -3,3 +3,9 @@ loglevel debug listener 1883 listener 1883 INADDR_ANY mqttsn + +#connection loopback +# protocol mqtt +# address localhost +# topic + both "" "" +