-
Notifications
You must be signed in to change notification settings - Fork 45
/
ecoflow_exporter.py
367 lines (301 loc) · 15 KB
/
ecoflow_exporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
import logging as log
import sys
import os
import signal
import ssl
import time
import json
import re
import base64
import uuid
from queue import Queue
from threading import Timer
from multiprocessing import Process
import requests
import paho.mqtt.client as mqtt
from prometheus_client import start_http_server, REGISTRY, Gauge, Counter
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
class EcoflowMetricException(Exception):
pass
class EcoflowAuthentication:
def __init__(self, ecoflow_username, ecoflow_password, ecoflow_api_host):
self.ecoflow_username = ecoflow_username
self.ecoflow_password = ecoflow_password
self.ecoflow_api_host = ecoflow_api_host
self.mqtt_url = "mqtt.ecoflow.com"
self.mqtt_port = 8883
self.mqtt_username = None
self.mqtt_password = None
self.mqtt_client_id = None
self.authorize()
def authorize(self):
url = f"https://{self.ecoflow_api_host}/auth/login"
headers = {"lang": "en_US", "content-type": "application/json"}
data = {"email": self.ecoflow_username,
"password": base64.b64encode(self.ecoflow_password.encode()).decode(),
"scene": "IOT_APP",
"userType": "ECOFLOW"}
log.info(f"Login to EcoFlow API {url}")
request = requests.post(url, json=data, headers=headers)
response = self.get_json_response(request)
try:
token = response["data"]["token"]
user_id = response["data"]["user"]["userId"]
user_name = response["data"]["user"]["name"]
except KeyError as key:
raise Exception(f"Failed to extract key {key} from response: {response}")
log.info(f"Successfully logged in: {user_name}")
url = f"https://{self.ecoflow_api_host}/iot-auth/app/certification"
headers = {"lang": "en_US", "authorization": f"Bearer {token}"}
data = {"userId": user_id}
log.info(f"Requesting IoT MQTT credentials {url}")
request = requests.get(url, data=data, headers=headers)
response = self.get_json_response(request)
try:
self.mqtt_url = response["data"]["url"]
self.mqtt_port = int(response["data"]["port"])
self.mqtt_username = response["data"]["certificateAccount"]
self.mqtt_password = response["data"]["certificatePassword"]
self.mqtt_client_id = f"ANDROID_{str(uuid.uuid4()).upper()}_{user_id}"
except KeyError as key:
raise Exception(f"Failed to extract key {key} from {response}")
log.info(f"Successfully extracted account: {self.mqtt_username}")
def get_json_response(self, request):
if request.status_code != 200:
raise Exception(f"Got HTTP status code {request.status_code}: {request.text}")
try:
response = json.loads(request.text)
response_message = response["message"]
except KeyError as key:
raise Exception(f"Failed to extract key {key} from {response}")
except Exception as error:
raise Exception(f"Failed to parse response: {request.text} Error: {error}")
if response_message.lower() != "success":
raise Exception(f"{response_message}")
return response
class EcoflowMQTT():
def __init__(self, message_queue, device_sn, username, password, addr, port, client_id, timeout_seconds):
self.message_queue = message_queue
self.addr = addr
self.port = port
self.username = username
self.password = password
self.client_id = client_id
self.topic = f"/app/device/property/{device_sn}"
self.timeout_seconds = timeout_seconds
self.last_message_time = None
self.client = None
self.connect()
self.idle_timer = RepeatTimer(10, self.idle_reconnect)
self.idle_timer.daemon = True
self.idle_timer.start()
def connect(self):
if self.client:
self.client.loop_stop()
self.client.disconnect()
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, self.client_id)
self.client.username_pw_set(self.username, self.password)
self.client.tls_set(certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED)
self.client.tls_insecure_set(False)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
log.info(f"Connecting to MQTT Broker {self.addr}:{self.port} using client id {self.client_id}")
self.client.connect(self.addr, self.port)
self.client.loop_start()
def idle_reconnect(self):
if self.last_message_time and time.time() - self.last_message_time > self.timeout_seconds:
log.error(f"No messages received for {self.timeout_seconds} seconds. Reconnecting to MQTT")
# We pull the following into a separate process because there are actually quite a few things that can go
# wrong inside the connection code, including it just timing out and never returning. So this gives us a
# measure of safety around reconnection
while True:
connect_process = Process(target=self.connect)
connect_process.start()
connect_process.join(timeout=60)
connect_process.terminate()
if connect_process.exitcode == 0:
log.info("Reconnection successful, continuing")
# Reset last_message_time here to avoid a race condition between idle_reconnect getting called again
# before on_connect() or on_message() are called
self.last_message_time = None
break
else:
log.error("Reconnection errored out, or timed out, attempted to reconnect...")
def on_connect(self, client, userdata, flags, reason_code, properties):
# Initialize the time of last message at least once upon connection so that other things that rely on that to be
# set (like idle_reconnect) work
self.last_message_time = time.time()
match reason_code:
case "Success":
self.client.subscribe(self.topic)
log.info(f"Subscribed to MQTT topic {self.topic}")
case "Keep alive timeout":
log.error("Failed to connect to MQTT: connection timed out")
case "Unsupported protocol version":
log.error("Failed to connect to MQTT: unsupported protocol version")
case "Client identifier not valid":
log.error("Failed to connect to MQTT: invalid client identifier")
case "Server unavailable":
log.error("Failed to connect to MQTT: server unavailable")
case "Bad user name or password":
log.error("Failed to connect to MQTT: bad username or password")
case "Not authorized":
log.error("Failed to connect to MQTT: not authorised")
case _:
log.error(f"Failed to connect to MQTT: another error occured: {reason_code}")
return client
def on_disconnect(self, client, userdata, flags, reason_code, properties):
if reason_code > 0:
log.error(f"Unexpected MQTT disconnection: {reason_code}. Will auto-reconnect")
time.sleep(5)
def on_message(self, client, userdata, message):
self.message_queue.put(message.payload.decode("utf-8"))
self.last_message_time = time.time()
class EcoflowMetric:
def __init__(self, ecoflow_payload_key, device_name):
self.ecoflow_payload_key = ecoflow_payload_key
self.device_name = device_name
self.name = f"ecoflow_{self.convert_ecoflow_key_to_prometheus_name()}"
self.metric = Gauge(self.name, f"value from MQTT object key {ecoflow_payload_key}", labelnames=["device"])
def convert_ecoflow_key_to_prometheus_name(self):
# bms_bmsStatus.maxCellTemp -> bms_bms_status_max_cell_temp
# pd.ext4p8Port -> pd_ext4p8_port
key = self.ecoflow_payload_key.replace('.', '_')
new = key[0].lower()
for character in key[1:]:
if character.isupper() and not new[-1] == '_':
new += '_'
new += character.lower()
# Check that metric name complies with the data model for valid characters
# https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
if not re.match("[a-zA-Z_:][a-zA-Z0-9_:]*", new):
raise EcoflowMetricException(f"Cannot convert payload key {self.ecoflow_payload_key} to comply with the Prometheus data model. Please, raise an issue!")
return new
def set(self, value):
# According to best practices for naming metrics and labels, the voltage should be in volts and the current in amperes
# WARNING! This will ruin all Prometheus historical data and backward compatibility of Grafana dashboard
# value = value / 1000 if value.endswith("_vol") or value.endswith("_amp") else value
log.debug(f"Set {self.name} = {value}")
self.metric.labels(device=self.device_name).set(value)
def clear(self):
log.debug(f"Clear {self.name}")
self.metric.clear()
class Worker:
def __init__(self, message_queue, device_name, collecting_interval_seconds=10):
self.message_queue = message_queue
self.device_name = device_name
self.collecting_interval_seconds = collecting_interval_seconds
self.metrics_collector = []
self.online = Gauge("ecoflow_online", "1 if device is online", labelnames=["device"])
self.mqtt_messages_receive_total = Counter("ecoflow_mqtt_messages_receive_total", "total MQTT messages", labelnames=["device"])
def loop(self):
time.sleep(self.collecting_interval_seconds)
while True:
queue_size = self.message_queue.qsize()
if queue_size > 0:
log.info(f"Processing {queue_size} event(s) from the message queue")
self.online.labels(device=self.device_name).set(1)
self.mqtt_messages_receive_total.labels(device=self.device_name).inc(queue_size)
else:
log.info("Message queue is empty. Assuming that the device is offline")
self.online.labels(device=self.device_name).set(0)
# Clear metrics for NaN (No data) instead of last value
for metric in self.metrics_collector:
metric.clear()
while not self.message_queue.empty():
payload = self.message_queue.get()
log.debug(f"Recived payload: {payload}")
if payload is None:
continue
try:
payload = json.loads(payload)
params = payload['params']
except KeyError as key:
log.error(f"Failed to extract key {key} from payload: {payload}")
except Exception as error:
log.error(f"Failed to parse MQTT payload: {payload} Error: {error}")
continue
self.process_payload(params)
time.sleep(self.collecting_interval_seconds)
def get_metric_by_ecoflow_payload_key(self, ecoflow_payload_key):
for metric in self.metrics_collector:
if metric.ecoflow_payload_key == ecoflow_payload_key:
log.debug(f"Found metric {metric.name} linked to {ecoflow_payload_key}")
return metric
log.debug(f"Cannot find metric linked to {ecoflow_payload_key}")
return False
def process_payload(self, params):
log.debug(f"Processing params: {params}")
for ecoflow_payload_key in params.keys():
ecoflow_payload_value = params[ecoflow_payload_key]
if not isinstance(ecoflow_payload_value, (int, float)):
log.warning(f"Skipping unsupported metric {ecoflow_payload_key}: {ecoflow_payload_value}")
continue
metric = self.get_metric_by_ecoflow_payload_key(ecoflow_payload_key)
if not metric:
try:
metric = EcoflowMetric(ecoflow_payload_key, self.device_name)
except EcoflowMetricException as error:
log.error(error)
continue
log.info(f"Created new metric from payload key {metric.ecoflow_payload_key} -> {metric.name}")
self.metrics_collector.append(metric)
metric.set(ecoflow_payload_value)
if ecoflow_payload_key == 'inv.acInVol' and ecoflow_payload_value == 0:
ac_in_current = self.get_metric_by_ecoflow_payload_key('inv.acInAmp')
if ac_in_current:
log.debug("Set AC inverter input current to zero because of zero inverter voltage")
ac_in_current.set(0)
def signal_handler(signum, frame):
log.info(f"Received signal {signum}. Exiting...")
sys.exit(0)
def main():
# Register the signal handler for SIGTERM
signal.signal(signal.SIGTERM, signal_handler)
# Disable Process and Platform collectors
for coll in list(REGISTRY._collector_to_names.keys()):
REGISTRY.unregister(coll)
log_level = os.getenv("LOG_LEVEL", "INFO")
match log_level:
case "DEBUG":
log_level = log.DEBUG
case "INFO":
log_level = log.INFO
case "WARNING":
log_level = log.WARNING
case "ERROR":
log_level = log.ERROR
case _:
log_level = log.INFO
log.basicConfig(stream=sys.stdout, level=log_level, format='%(asctime)s %(levelname)-7s %(message)s')
device_sn = os.getenv("DEVICE_SN")
device_name = os.getenv("DEVICE_NAME") or device_sn
ecoflow_username = os.getenv("ECOFLOW_USERNAME")
ecoflow_password = os.getenv("ECOFLOW_PASSWORD")
ecoflow_api_host = os.getenv("ECOFLOW_API_HOST", "api.ecoflow.com")
exporter_port = int(os.getenv("EXPORTER_PORT", "9090"))
collecting_interval_seconds = int(os.getenv("COLLECTING_INTERVAL", "10"))
timeout_seconds = int(os.getenv("MQTT_TIMEOUT", "60"))
if (not device_sn or not ecoflow_username or not ecoflow_password):
log.error("Please, provide all required environment variables: DEVICE_SN, ECOFLOW_USERNAME, ECOFLOW_PASSWORD")
sys.exit(1)
try:
auth = EcoflowAuthentication(ecoflow_username, ecoflow_password, ecoflow_api_host)
except Exception as error:
log.error(error)
sys.exit(1)
message_queue = Queue()
EcoflowMQTT(message_queue, device_sn, auth.mqtt_username, auth.mqtt_password, auth.mqtt_url, auth.mqtt_port, auth.mqtt_client_id, timeout_seconds)
metrics = Worker(message_queue, device_name, collecting_interval_seconds)
start_http_server(exporter_port)
try:
metrics.loop()
except KeyboardInterrupt:
log.info("Received KeyboardInterrupt. Exiting...")
sys.exit(0)
if __name__ == '__main__':
main()