-
Notifications
You must be signed in to change notification settings - Fork 0
/
code.py
230 lines (180 loc) · 6.67 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# SPDX-FileCopyrightText: 2024 Eric Z. Ayers
#
# SPDX-License-Identifier: Creative Commons Zero 1.0
"""Collect training data from a color sensor and publish it to Adafruit IO """
import board
import busio
import ipaddress
import os
import socketpool
import time
import wifi
import adafruit_tcs34725
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_io.adafruit_io import IO_MQTT
##################
# *EDIT*
# Set configurable values below
# Feed name for Adafruit IO
default_topic = "colorsensor-training-data"
# milliseconds to gather color data
sensor_integration_time = 150
# manually override the color sensor gain
sensor_gain = 4
# Collect this many samples each time we prompt the user
num_samples = 5
# Set the max data values to send per minute 30/min is the free AdafruitIO limit.
# When this rate is exceeded, the code will add a delay to slow down the rate.
max_send_rate = 30
#
# End of editable config values
##################
# Get wifi details and more from a secrets.py file
try:
from secrets import secrets
except ImportError:
print(
"WiFi secrets are kept in secrets.py, please add them there and don't commit them to git!"
)
raise
# Create sensor object, communicating over the board's default I2C bus
i2c = busio.I2C(board.GP1, board.GP0) # uses board's native I2C port
sensor = adafruit_tcs34725.TCS34725(i2c)
# Change sensor gain to 1, 4, 16, or 60
sensor.gain = sensor_gain
# Change sensor integration time to values between 2.4 and 614.4 milliseconds
sensor.integration_time = sensor_integration_time
def connect_to_wifi():
print()
print("Connecting to WiFi...", end="")
wifi.radio.connect(secrets["wifi_ssid"], secrets["wifi_password"])
# prints IP address to REPL
print("Connected: IP address %s", wifi.radio.ipv4_address, end="")
# pings Google DNS server to test connectivity
ipv4 = ipaddress.ip_address("8.8.4.4")
print(
" Internet up. Ping to google.com in: %f ms"
% (wifi.radio.ping(ipv4) * 1000)
)
###########
# Define MQTT callback methods which are called when events occur
def connected(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print(f"Connected to MQTT broker! Listening for topic changes on {default_topic}")
# Subscribe to all changes on the default_topic feed.
client.subscribe(default_topic)
def disconnected(client, userdata, rc):
# This method is called when the client is disconnected
print("Disconnected from MQTT Broker!")
def message(client, topic, message):
"""Method callled when a client's subscribed feed has a new
value.
:param str topic: The topic of the feed with a new value.
:param str message: The new value
"""
print(f"New message on topic {topic}: {message}")
# end MQTT callbacks
###########
def read_color_input():
"""Ask the human to select the color"""
while True:
print()
print("What color? (R)ed (P)urple (O)range (Y)ellow (G)reen [R/P/O/Y/G]: ", end="")
response = input().upper()
if response is "R":
return "red"
elif response is "P":
return "purple"
elif response is "O":
return "orange"
elif response is "Y":
return "yellow"
elif response is "G":
return "green"
def add_delay(elapsed_time):
"""Try not to exceed the Adafruit IO data rates by pausing between sending batches of data"""
# For the free plan that's 30 samples per minute.
# Compute the amount of time we should wait to send based on the max number of samples/minute
wait_time = (60 / max_send_rate) * num_samples
# print("Elapsed %d need to wait minimum of %d" % (elapsed_time, wait_time))
if wait_time > elapsed_time:
seconds_left = wait_time - elapsed_time
print(
" >>>Throttling data collection. Delaying %d seconds until next sample<<<"
% (seconds_left)
)
time.sleep(seconds_left)
print()
def read_samples(train_color):
"""Read from the color sensor 'num_samples' times and publish the datapoints"""
print(
"Reading %d samples for %s from the sensor and publishing them to AdafruitIO"
% (num_samples, train_color)
)
for i in range(num_samples):
print(
" Sample {0}: Read RGB color: {1} Temperature {2}".format(
i, sensor.color_rgb_bytes, sensor.color_temperature
)
)
# Send a new message
print(" Sending value to feed '%s'" % (default_topic))
# This is dummy data, but in the format I anticipate we will use for the training machine.
try:
io.publish(
default_topic,
"{'temperature': %d, 'r' : %d, 'g': %d, 'b': %d, 'lux' : %d, 'color': '%s'}"
% (
sensor.color_temperature,
sensor.color_rgb_bytes[0],
sensor.color_rgb_bytes[1],
sensor.color_rgb_bytes[2],
sensor.lux,
train_color,
),
)
except (ValueError, RuntimeError, OSError) as e:
print("Failed to send data, giving up and restarting microcontroller\n", e)
microcontroller.reset()
continue
# Initialize WiFI
connect_to_wifi()
# print("Setting up MQTT client for use with Adafruit IO")
mqtt_client = MQTT.MQTT(
broker="io.adafruit.com",
port=1883,
username=secrets["aio_username"],
password=secrets["aio_key"],
socket_pool=socketpool.SocketPool(wifi.radio),
)
# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message
# Initialize an Adafruit IO MQTT Client wrapper
io = IO_MQTT(mqtt_client)
print("Connecting to Adafruit IO MQTT server")
io.connect()
print("Connected!")
print()
print(
"View data at https://io.adafruit.com/%s/feeds/colorsensor-training-data"
% (secrets["aio_username"])
)
# Use this value to record the last time data was sent to make sure we don't send too many samples to AdafruitIO
last_data_send_time = 0
while True:
# Poll for incoming messages. We don't use incoming messages right now, but I think
# it keeps the library happy
try:
io.loop()
except (ValueError, RuntimeError, OSError) as e:
print("Failed to get data, giving up and restarting microcontroller\n", e)
microcontroller.reset()
continue
train_color = read_color_input()
elapsed_time = time.time() - last_data_send_time
add_delay(elapsed_time)
last_data_send_time = time.time()
read_samples(train_color)