Skip to content

Commit 9971b74

Browse files
committed
device_message_listaner mqtt python mysql
1 parent aa78e0d commit 9971b74

File tree

3 files changed

+126
-5
lines changed

3 files changed

+126
-5
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ A simple example of using mqtt to receive and display telemetry data from device
4343

4444
* [HTML+JS+CSS](mqtt-telemetry-dashboard) with [mqtt.js](https://github.com/mqttjs/MQTT.js)
4545

46-
## MQTT channel message handler
46+
## MQTT device message handler
4747

4848
* [Python + MySQL](mqtt-message-handler/python)
4949

mqtt-message-handler/python/README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
# flespi.io MQTT channel message listener
1+
# flespi.io MQTT device message listener
22

33
MySQL database, [gmqtt](https://github.com/wialon/gmqtt) library, python 3.5+
44

5-
**Example:** [channel_message_listener.py](channel_message_listener.py)
5+
**Example:** [device_message_listener.py](device_message_listener.py)
66

77
First of all you should **create a new table** in your database to handle messages.
88

@@ -27,7 +27,7 @@ First of all you should **create a new table** in your database to handle messag
2727

2828
**Also you should configure the listener by changing these configuration lines:**
2929

30-
channel_id = '12345'
30+
device_id = '12345'
3131
flespi_mqtt_host = 'mqtt.flespi.io'
3232
flespi_token = 'YOUR FLESPI TOKEN'
3333
mysql_host = 'localhost'
@@ -41,4 +41,4 @@ First of all you should **create a new table** in your database to handle messag
4141

4242
**Then start the example:**
4343

44-
python3 channel_message_listener.py
44+
python3 device_message_listener.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
device_id = '12345' # you can use + to subscrube to all devices or list device IDs separated by commas (123,124,135)
2+
flespi_mqtt_host = 'mqtt.flespi.io'
3+
flespi_token = 'YOUR FLESPI TOKEN'
4+
mysql_host = 'localhost'
5+
mysql_user = 'root'
6+
mysql_passwd = 'password'
7+
mysql_db = 'message_receiver'
8+
9+
# First of all you should create a new table in your database to handle messages.
10+
# You can change columns as you wish. But then you should change the respective insert query in "on_message" function.
11+
#
12+
# CREATE TABLE message_receiver.python_message_listener (
13+
# ident varchar(100) NOT NULL,
14+
# `server.timestamp` DOUBLE NOT NULL,
15+
# `position.longitude` DOUBLE NULL,
16+
# `position.latitude` DOUBLE NULL,
17+
# `timestamp` DOUBLE NULL,
18+
# `position.altitude` DOUBLE NULL,
19+
# `position.direction` DOUBLE NULL,
20+
# `position.speed` DOUBLE NULL,
21+
# `position.satellites` INT NULL,
22+
# CONSTRAINT python_message_listener_PK PRIMARY KEY (ident,`server.timestamp`,`timestamp`)
23+
# )
24+
# ENGINE=MyISAM
25+
# DEFAULT CHARSET=utf8
26+
# COLLATE=utf8_general_ci;
27+
28+
29+
import asyncio
30+
import os
31+
import signal
32+
import time
33+
import json
34+
import pymysql
35+
import uvloop
36+
from gmqtt import Client as MQTTClient
37+
38+
39+
40+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
41+
STOP = asyncio.Event()
42+
con = None
43+
counter = 0
44+
45+
def on_connect(client, flags, rc, properties):
46+
print('Connected')
47+
client.subscribe('flespi/message/gw/devcies/' + device_id, qos=0)
48+
49+
50+
def on_message(client, topic, payload, qos, properties):
51+
global counter, con
52+
data = json.loads(payload)
53+
if con is not None:
54+
cursor = con.cursor()
55+
cursor.execute(
56+
"""
57+
INSERT INTO python_message_listener (
58+
`ident`,
59+
`server.timestamp`,
60+
`position.longitude`,
61+
`position.latitude`,
62+
`timestamp`,
63+
`position.altitude`,
64+
`position.direction`,
65+
`position.speed`,
66+
`position.satellites`
67+
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
68+
""",
69+
(
70+
data["ident"],
71+
data["server.timestamp"],
72+
data["position.longitude"],
73+
data["position.latitude"],
74+
data["timestamp"],
75+
data["position.altitude"],
76+
data["position.direction"],
77+
data["position.speed"],
78+
data["position.satellites"]
79+
)
80+
)
81+
counter+=1
82+
con.commit()
83+
else:
84+
print('Not inserted', json.dumps(data, indent=4))
85+
86+
87+
def on_disconnect(client, packet, exc=None):
88+
print('Disconnected')
89+
if con is not None:
90+
con.close()
91+
92+
def on_subscribe(client, mid, qos, properties):
93+
print('SUBSCRIBED')
94+
95+
def ask_exit(*args):
96+
print('Inserted:', counter)
97+
STOP.set()
98+
99+
async def main(broker_host, token):
100+
global con
101+
con = pymysql.connect(host = mysql_host, user = mysql_user, passwd = mysql_passwd, db = mysql_db, autocommit=True)
102+
client = MQTTClient('message_listener')
103+
104+
client.on_connect = on_connect
105+
client.on_message = on_message
106+
client.on_disconnect = on_disconnect
107+
client.on_subscribe = on_subscribe
108+
109+
client.set_auth_credentials(token, None)
110+
await client.connect(broker_host)
111+
112+
await STOP.wait()
113+
await client.disconnect()
114+
115+
116+
if __name__ == '__main__':
117+
loop = asyncio.get_event_loop()
118+
loop.add_signal_handler(signal.SIGINT, ask_exit)
119+
loop.add_signal_handler(signal.SIGTERM, ask_exit)
120+
121+
loop.run_until_complete(main(flespi_mqtt_host, flespi_token))

0 commit comments

Comments
 (0)