-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
71 lines (56 loc) · 1.88 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from influxdb import InfluxDBClient
from datetime import datetime
import json
import pika
import pytz
def get_config():
with open('config.json', 'r') as config_file:
return json.load(config_file)
def process_event(ch, method, properties, body):
print(f'Message: {body}')
json_reading = json.loads(body.decode('UTF-8'))
insert_sensor_read(json_reading)
ch.basic_ack(delivery_tag=method.delivery_tag)
def insert_sensor_read(sensor_reading):
utc_timestamp = sensor_reading['timestamp']
reading_time = datetime.utcfromtimestamp(utc_timestamp)
try:
# sometimes, the sensor sends an incorrect timestamp e.g. 3, 5, ...
# when that happens, the timezone conversion fails.
# on those cases, i'll just ignore that data point.
reading_time.astimezone(timezone)
except OSError:
return
data_point = {
"measurement": config["influxdb"]["measurement"],
"time": reading_time,
"fields": {
"temperature": sensor_reading['temperature'],
"humidity": sensor_reading['humidity'],
"heatIndex": sensor_reading['heatIndex']
}
}
influxdb_client.write_points([data_point])
config = get_config()
timezone = pytz.timezone(config['general']['timezone'])
influxdb_client = InfluxDBClient(
host=config["influxdb"]["host"],
port=config["influxdb"]["port"],
database=config["influxdb"]["database"]
)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=config["rabbitmq"]["host"],
port=config["rabbitmq"]["port"],
credentials=pika.PlainCredentials(
config["rabbitmq"]["user"],
config["rabbitmq"]["pass"]
)
)
)
channel = connection.channel()
channel.basic_consume(
queue=config["rabbitmq"]["consume_from_queue"],
on_message_callback=process_event
)
channel.start_consuming()