-
Notifications
You must be signed in to change notification settings - Fork 0
/
rx_message_handler.py
127 lines (102 loc) · 4.28 KB
/
rx_message_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from meshtastic import mqtt_pb2, portnums_pb2, mesh_pb2, telemetry_pb2
from meshtastic import protocols
from encryption import decrypt_packet
# from load_config import config
from load_config import ConfigLoader
def on_message(client, userdata, msg):
config = ConfigLoader.get_config()
se = mqtt_pb2.ServiceEnvelope()
try:
se.ParseFromString(msg.payload)
print ("")
print ("Service Envelope:")
print (se)
mp = se.packet
except Exception as e:
print(f"*** ServiceEnvelope: {str(e)}")
return
if mp.HasField("encrypted") and not mp.HasField("decoded"):
decoded_data = decrypt_packet(mp, config.channel.key)
mp.decoded.CopyFrom(decoded_data)
print ("")
print ("Message Packet:")
print(mp)
if mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP:
try:
from_str = getattr(mp, "from")
from_id = '!' + hex(from_str)[2:]
text_payload = mp.decoded.payload.decode("utf-8")
print ("")
print ("Message Payload:")
print (mp.decoded)
# print(f"{from_id}: {text_payload}")
except Exception as e:
print(f"*** TEXT_MESSAGE_APP: {str(e)}")
elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP:
info = mesh_pb2.User()
try:
info.ParseFromString(mp.decoded.payload)
print("")
print("NodeInfo:")
print(info)
except Exception as e:
print(f"*** NODEINFO_APP: {str(e)}")
elif mp.decoded.portnum == portnums_pb2.POSITION_APP:
pos = mesh_pb2.Position()
try:
pos.ParseFromString(mp.decoded.payload)
print("")
print("Position:")
print(pos)
except Exception as e:
print(f"*** POSITION_APP: {str(e)}")
elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP:
telem = telemetry_pb2.Telemetry()
try:
# Parse the payload into the main telemetry message
telem.ParseFromString(mp.decoded.payload)
# Check and parse device_metrics if available
if telem.HasField("device_metrics"):
device_metrics = telem.device_metrics
print("Device Metrics:")
print(device_metrics)
# Check and parse environment_metrics if available
if telem.HasField("environment_metrics"):
environment_metrics = telem.environment_metrics
print("Environment Metrics:")
print(environment_metrics)
# Check and parse power_metrics if available
if telem.HasField("power_metrics"):
power_metrics = telem.power_metrics
print("Power Metrics:")
print(power_metrics)
except Exception as e:
print(f"*** TELEMETRY_APP: {str(e)}")
else:
# Attempt to process the decrypted or encrypted payload
portNumInt = mp.decoded.portnum if mp.HasField("decoded") else None
handler = protocols.get(portNumInt) if portNumInt else None
pb = None
if handler is not None and handler.protobufFactory is not None:
pb = handler.protobufFactory()
pb.ParseFromString(mp.decoded.payload)
if pb:
# Clean and update the payload
pb_str = str(pb).replace('\n', ' ').replace('\r', ' ').strip()
mp.decoded.payload = pb_str.encode("utf-8")
print(mp)
# rssi = getattr(mp, "rx_rssi")
# # Device Metrics
# device_metrics_dict = {
# 'Battery Level': telem.device_metrics.battery_level,
# 'Voltage': round(telem.device_metrics.voltage, 2),
# 'Channel Utilization': round(telem.device_metrics.channel_utilization, 1),
# 'Air Utilization': round(telem.device_metrics.air_util_tx, 1)
# }
# # Environment Metrics
# environment_metrics_dict = {
# 'Temp': round(telem.environment_metrics.temperature, 2),
# 'Humidity': round(telem.environment_metrics.relative_humidity, 0),
# 'Pressure': round(telem.environment_metrics.barometric_pressure, 2),
# 'Gas Resistance': round(telem.environment_metrics.gas_resistance, 2)
# }