forked from djax666/mystrombutton2mqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqttlib.py
75 lines (59 loc) · 2.8 KB
/
mqttlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# MQTT
import ssl
import paho.mqtt.client as mqtt
import logging
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.DEBUG)
class MqttConnection(object):
def __init__(self, settings, callback=None):
self._ip = settings["brokeraddress"]
self._port = settings["brokerport"]
self._connected = False
self._subscribed_topics = settings["subscribed_topics"]
self._callback = callback
self._mqttc = mqtt.Client()
self._mqttc.on_connect = self._onconnect
self._mqttc.on_disconnect = self._ondisconnect
self._mqttc.on_message = self._onmessage
if "brokerusername" in settings:
self._mqttc.username_pw_set(settings["brokerusername"], settings["brokerpassword"])
if settings["brokerssl"]:
self._mqttc.tls_set(ca_certs=settings["cafilepath"], certfile=settings["certfilepath"],
keyfile=settings["keyfilepath"], cert_reqs=ssl.CERT_NONE)
self._mqttc.tls_insecure_set(True)
logging.debug('MQTTClient initialized')
def _onconnect(self, mqttc, userdata, flags, rc):
self._connected = True
logging.debug('MqttConnection.onConnected()')
for t in self._subscribed_topics:
self.subscribe(t)
def _ondisconnect(self, mqttc, userdata, rc):
self._connected = False
if rc != 0:
logging.debug('MqttConnection.onDisconnected() -> error! Reconnecting...')
# self.connect() # Done automatically!
def _onmessage(self, mqttc, obj, msg):
logging.debug('MqttConnection.onMessage() %s %s' % (str(msg.topic), str(msg.payload)))
if self._callback:
self._callback(msg.topic, msg.payload)
def isConnected(self):
return self._connected
def connect(self):
self._mqttc.connect(self._ip, self._port)
self._mqttc.loop_start()
logging.debug('MQTTClient connected')
def disconnect(self):
self._mqttc.loop_stop(True)
logging.debug('MQTTClient disconnected')
def publish(self, topic, payload=None,retain=False):
logging.info('MqttConnection.publish(%s, %s) Retain: %s' % (str(topic), str(payload), str(retain)))
a, b = self._mqttc.publish(topic=topic, payload=payload,retain=retain)
return a == mqtt.MQTT_ERR_SUCCESS
def subscribe(self, topic):
logging.info('MqttConnection.subscribe(%s)' % str(topic))
a,b = self._mqttc.subscribe(topic)
return a == mqtt.MQTT_ERR_SUCCESS
def unsubscribe(self, topic):
logging.info('MqttConnection.unsubscribe(%s)' % str(topic))
a,b = self._mqttc.unsubscribe(topic)
return a == mqtt.MQTT_ERR_SUCCESS
# ------------------------------------------------------------------------------------------------------------------