-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_python.py
313 lines (269 loc) · 11.4 KB
/
test_python.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
from dataclasses import dataclass
from datetime import datetime, time, timezone
import pytz
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
import os
import json
INFLUXDB_CONFIG = {
"url": "https://influxdb.endide.com", # URL de l'instanceù InfluxDB
"token": "QFKdKWJHe9ir4doaKlPBFKpxl7JUGR14YMDa-wjcKQ18aw_0b2hZaRDypBoXKjHvKpU9eWzXuZf9eCnbupklyw==", # Jeton d'accès (InfluxDB v2+)
"org": "sae", # Organisation (InfluxDB v2+)
"bucket": "sensors", # Bucket cible
}
@dataclass
class ListCapteur:
capteur : list
start_time : datetime
end_time : datetime
tri_value : str
tri_parametre : str
@dataclass(frozen=True)
class CapteurResult:
time : datetime
time_fr : str
value : float
field : str
room_id : str
sensor_id : str
sensor_type : str
def afficher(self):
texte = f"time: {self.time_fr}\nvalue: {self.value}\nfield: {self.field}\nroom_id: {self.room_id}\nsensor_type: {self.sensor_type}\n----------------"
print(texte)
def compare_to_other(self, other) -> bool:
"""
Compare cette instance avec une autre instance de CapteurResult,
en ignorant les champs 'time' et 'time_fr'.
:param other: Une autre instance de CapteurResult
:return: True si les champs (sauf time et time_fr) sont identiques, False sinon
"""
if not isinstance(other, CapteurResult):
raise TypeError("La comparaison est uniquement possible avec une autre instance de CapteurResult.")
return (
self.value == other.value and
self.field == other.field and
self.room_id == other.room_id and
self.sensor_id == other.sensor_id and
self.sensor_type == other.sensor_type
)
def compare_to_list(self, other_list) -> bool:
"""
Compare cette instance avec une liste d'autre instance de CapteurResult,
en ignorant les champs 'time' et 'time_fr'.
:param other_list: Une liste d'autre instance de CapteurResult
:return: True si les champs (sauf time et time_fr) sont identiques, False sinon
"""
result = False
for other in other_list:
result = self.compare_to_other(other=other)
if result: return result
return result
def get_same(self, other_list):
"""
Compare cette instance avec une liste d'autre instance de CapteurResult,
en ignorant les champs 'time' et 'time_fr'.
:param other_list: Une liste d'autre instance de CapteurResult
:return: True si les champs (sauf time et time_fr) sont identiques, False sinon
"""
result = False
for other in other_list:
result = self.compare_to_other(other=other)
if result: return other
return self
class InfluxDB:
def __init__(self):
self.client = None
self.config = {
"url":INFLUXDB_CONFIG["url"],
"token":INFLUXDB_CONFIG["token"],
"org":INFLUXDB_CONFIG["org"],
"bucket":INFLUXDB_CONFIG["bucket"]
}
self.connexion(
self.config["url"],
self.config["token"],
self.config["org"],
self.config["bucket"]
)
self._last_result = None
def __call__(self, flux_query):
"""
flux_query : String, code Flux
Exécute une requête Flux et retourne les résultats.
Retourne None si une erreur survient.
"""
try:
self.reconnect() # S'assure que la connexion est active
result = self.query_api.query(org=self.config["org"], query=flux_query)
data = [{"time": record.get_time(), "value": record.get_value(), "fields": record.values} for table in result for record in table.records]
return data
except Exception as error:
print("Requête échouée : ", error)
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def connexion(self, url, token, org, bucket):
"""
Connecte l'instance au serveur InfluxDB.
"""
self.config = {
"url": url,
"token": token,
"org": org,
"bucket": bucket
}
try:
self.client = InfluxDBClient(
url=url,
token=token,
org=org
)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
print("Connexion réussie")
except Exception as error:
print("Erreur lors de la connexion :", error)
def reconnect(self):
"""
Réouvre la connexion si elle est fermée.
"""
if self.client is None:
try:
self.connexion(
self.config["url"],
self.config["token"],
self.config["org"],
self.config["bucket"]
)
print("Connexion rétablie")
except Exception as error:
print("Erreur lors de la reconnexion :", error)
def write_data(self, measurement, data):
"""
Écrit des données dans le bucket InfluxDB.
:param measurement: Nom de la mesure
:param data: Données au format dictionnaire
"""
try:
point = {
"measurement": measurement,
"tags": data.get("tags", {}),
"fields": data["fields"],
"time": data.get("time") # Optionnel
}
self.write_api.write(bucket=self.config["bucket"], org=self.config["org"], record=point)
print("Données écrites avec succès")
except Exception as error:
print("Erreur lors de l'écriture des données :", error)
def is_true(self, flux_query):
"""
Retourne True si la requête Flux ne renvoie pas de résultats, False sinon.
"""
data = self(flux_query)
return not data # True si la liste est vide
def close(self):
"""
Ferme la connexion au client InfluxDB.
"""
if self.client is not None:
self.client.close()
print("Connexion fermée")
def format_list(self, values):
"""
Convertit une liste Python en une liste Flux avec des guillemets doubles.
"""
return "[" + ", ".join([f'"{value}"' for value in values]) + "]"
def is_instance_filter_link(self, pf_key, pf_value, pf_all_query):
if isinstance(pf_value, str):
pf_value = [pf_value]
res_filter = f'|> filter(fn: (r) => contains(value: r["{pf_key}"], set: {self.format_list(pf_value)}))'
pf_all_query += f"\n{res_filter}"
return pf_all_query
def get(self, room_id=[], sensor_id=[], sensor_type=[], field=[], start_time=None, end_time=None, last=False, return_object=False) -> dict:
"""
Récupère les données depuis InfluxDB avec des filtres personnalisés et permet de choisir les champs à retourner.
"""
all_query = """from(bucket: "sensors")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")"""
if room_id:
all_query = self.is_instance_filter_link("room_id", room_id, all_query)
if sensor_id:
all_query = self.is_instance_filter_link("sensor_id", sensor_id, all_query)
if sensor_type:
all_query = self.is_instance_filter_link("sensor_type", sensor_type, all_query)
if field:
all_query = self.is_instance_filter_link("_field", field, all_query)
if start_time and end_time:
range_filter = f'|> range(start: {start_time}, stop: {end_time})'
all_query = all_query.replace('|> range(start: 0)', range_filter)
if last:
all_query += '\n|> last()'
if return_object:
result = self(all_query)
self._last_result = self.transform_json_to_dataclass(result)
return self._last_result
return self(all_query)
def transform_json_to_dataclass(self, data_entry):
contenu = json.load(data_entry) if isinstance(data_entry, str) else data_entry
dataclass_tab = []
for item in contenu:
time = item['time']
paris_tz = pytz.timezone("Europe/Paris")
time_in_paris = time.astimezone(paris_tz)
try:
data = CapteurResult(
time=item['time'], # Assurez-vous que 'time' est déjà au format datetime
time_fr=time_in_paris.strftime("%Y-%m-%d %H:%M:%S"),
value=item['fields'].get('_value', 0), # Valeur par défaut si absent
field=item['fields'].get('_field', 'unknown'),
room_id=item['fields'].get('room_id', 'unknown'),
sensor_id=item['fields'].get('sensor_id', 'unknown'),
sensor_type=item['fields'].get('sensor_type', 'unknown')
)
except KeyError as e:
print(f"Erreur dans les données : champ manquant {e}")
continue
dataclass_tab.append(data)
return dataclass_tab
def get_all_last(self, resultat: list[CapteurResult]=None) -> list[CapteurResult]:
"""
Conserve uniquement les éléments uniques dans la liste basée sur les champs
autres que 'time' et 'time_fr', en gardant la version la plus récente (basée sur 'time').
:param resultat: Une liste d'instances de CapteurResult.
:return: Une liste d'instances uniques de CapteurResult avec le plus récent pour chaque cas.
"""
all_differents = []
if resultat is None:
resultat = self._last_result
for item in resultat:
existing = item.get_same(all_differents) # Trouve un élément similaire dans la liste
if existing == item: # Si aucun élément similaire n'existe
all_differents.append(item)
else: # Si un élément similaire existe, compare les dates
if item.time > existing.time: # Si l'élément actuel est plus récent
all_differents.remove(existing) # Retirer l'ancien élément
all_differents.append(item) # Ajouter le plus récent
return all_differents
def get_capteur_by_list(self, parametre_tri : str, liste_capteur, start_time=None, end_time=None):
dictionnaire_capteur = {}
for capteur in liste_capteur:
param_attribut = getattr(capteur, parametre_tri)
if param_attribut not in dictionnaire_capteur:
dictionnaire_capteur[param_attribut] = listCapteur(
capteur=[capteur],
start_time=start_time,
end_time=end_time,
tri_value=param_attribut,
tri_parametre=parametre_tri
)
else:
dictionnaire_capteur[param_attribut].capteur += [capteur]
return dictionnaire_capteur
client = InfluxDB()
result = client.get(room_id="C104", field=['temperature', 'humidity'], start_time="2025-01-13T21:15:12.224Z", end_time="2025-01-13T21:36:26.004Z", return_object=True)
print(len(result))
for r in result:
print(r.afficher())