-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathknihaui.py
328 lines (270 loc) · 8.02 KB
/
knihaui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
from gpiozero.pins.mock import MockFactory
from gpiozero.pins.pigpio import PiGPIOFactory
from gpiozero import Button, Device
from time import sleep
from mpd import MPDClient
from contextlib import contextmanager
import alsaaudio
import os
import logging
import sys
import time
import signal
import evdev
Device.pin_factory = PiGPIOFactory()
DATA_DIR = "/data"
BTN_PLAY = 6
BTN_PREV = 13
BTN_NEXT = 5
# rotary is mapped via dtoverlay which work way better
ROTARY_INPUT = "/dev/input/event1"
# ROT_A = 19
# ROT_B = 26
BOUNCE_TIME = 0.02
HOLD_TIME = 1.0
RADIOS = [
"tmp/.anziko.mp3",
"http://live.slovakradio.sk:8000/Slovensko_128.mp3",
#"https://icecast.stv.livebox.sk/slovensko_128.mp3",
"http://live.slovakradio.sk:8000/Litera_128.mp3",
#"https://icecast.stv.livebox.sk/litera_128.mp3",
"tmp/.zurnal.mp3",
]
class State:
mixer = None
held_btn_play = False
held_btn_prev = False
held_btn_next = False
is_radio_mode = True
song_position = 0
state = State()
@contextmanager
def mpd_client():
global state
try:
client = MPDClient()
client.connect("::", 6600, 1)
yield client
finally:
client.close()
client.disconnect()
def set_mpd_volume(dev=False):
if not dev:
logging.info("Setting volume to 100")
with mpd_client() as mpd:
mpd.setvol(100)
def prev_held():
global state
logging.info("<< held")
with mpd_client() as mpd:
mpd.play(0)
state.held_btn_prev = True
def next_held():
global state
logging.info(">> held")
with mpd_client() as mpd:
status = mpd.status()
pos = int(status["song"]) + 10
if pos > int(status["playlistlength"]):
pos = 0
mpd.play(pos)
state.held_btn_next = True
def play_held(btn):
global state
logging.info("|| held")
if state.is_radio_mode:
# switch to player mode
state.is_radio_mode = False
setup_player(state.song_position)
else:
# switch to radio mode
with mpd_client() as mpd:
status = mpd.status()
state.song_position = int(status["song"])
state.is_radio_mode = True
setup_radio()
state.held_btn_play = True
def prev_released():
global state
if state.held_btn_prev:
logging.info("<< was previously held")
state.held_btn_prev = False
return
logging.info("<< released")
with mpd_client() as mpd:
status = mpd.status()
pos = int(status["song"])
try:
mpd.previous()
except Exception as e:
logging.error(e)
safe_call(mpd, mpd.play, pos)
def next_released():
global state
if state.held_btn_next:
logging.info(">> was previously held")
state.held_btn_next = False
return
logging.info(">> released")
with mpd_client() as mpd:
status = mpd.status()
pos = int(status["song"])
try:
mpd.next()
except Exception as e:
logging.error(e)
safe_call(mpd, mpd.play, pos)
def play_released():
global state
if state.held_btn_play:
logging.info("|> was previously held")
state.held_btn_play = False
return
logging.info("|> released")
with mpd_client() as mpd:
mpd.pause()
def setup_buttons(dev=False):
if dev:
Device.pin_factory = MockFactory()
global btn_next
global btn_prev
global btn_play
btn_next = Button(
BTN_NEXT, pull_up=True, bounce_time=BOUNCE_TIME, hold_time=HOLD_TIME
)
btn_next.when_released = next_released
btn_next.when_held = next_held
btn_prev = Button(
BTN_PREV, pull_up=True, bounce_time=BOUNCE_TIME, hold_time=HOLD_TIME
)
btn_prev.when_released = prev_released
btn_prev.when_held = prev_held
btn_play = Button(
BTN_PLAY, pull_up=True, bounce_time=BOUNCE_TIME, hold_time=HOLD_TIME
)
btn_play.when_released = play_released
btn_play.when_held = play_held
def safe_call(mpd, f, *args, **kwargs):
try:
f(*args, **kwargs)
status = mpd.status()
error = status.get("Error")
if error:
logging.warning(error)
songid = status.get("songid")
if songid:
mpd.deleteid(songid)
f(*args, **kwargs)
except Exception as e:
logging.exception(e)
def setup_player(song_position=0, play=True):
global state
# ignore .files.mp3
files = sorted(
f for f in os.listdir(DATA_DIR) if f.endswith(".mp3") and not f.startswith(".")
)
if files:
with mpd_client() as mpd:
mpd.update() # Make MPD re-read data dir
mpd.clear()
mpd.repeat(1)
mpd.single(0)
for f in files:
logging.info("Adding file %s", f)
mpd.add(f)
safe_call(mpd, mpd.play, song_position)
logging.info("Added: %d files", len(files))
state.song_position = song_position
def setup_mpd(dev=False):
set_mpd_volume(dev)
with mpd_client() as mpd:
update_job = mpd.update()
logging.info("Update: %s", update_job)
while True:
status = mpd.status()
logging.info("Status %s", status)
updating_db = status.get("updating_db")
if not updating_db:
break
else:
logging.info("Waiting for update to finish")
time.sleep(0.2)
def setup_radio():
global state
with mpd_client() as mpd:
mpd.update() # Make MPD re-read data dir
mpd.clear()
mpd.repeat(1) # in an attempt to auto-recover the stream
mpd.single(1)
for r in RADIOS:
try:
logging.info(f"Adding radio {r}")
mpd.add(r)
except Exception as e:
logging.exception(e)
safe_call(mpd, mpd.play, 0)
def setup_mixer():
mixers = alsaaudio.mixers(cardindex=1)
logging.info("Available mixers: %s", mixers)
global state
state.mixer = alsaaudio.Mixer("PCM", 0, 1)
logging.info("Mixer: %s", state.mixer)
def process_direct_command(cmd):
btns = {
"<": Device.pin_factory.pin(BTN_PREV),
"{": Device.pin_factory.pin(BTN_PREV),
"o": Device.pin_factory.pin(BTN_PLAY),
"O": Device.pin_factory.pin(BTN_PLAY),
">": Device.pin_factory.pin(BTN_NEXT),
"}": Device.pin_factory.pin(BTN_NEXT),
}
long = ["{", "O", "}"]
btn = btns.get(cmd)
if not btn:
logging.warn("%s unknown Write <o> for press and {O} for long press.", cmd)
return
logging.info("Pressing %s", cmd)
btn.drive_low()
if cmd in long:
time.sleep(HOLD_TIME * 1.2)
btn.drive_high()
def change_volume(mixer, diff, step=3, lowest=20, highest=100):
x = mixer.getvolume(alsaaudio.PCM_PLAYBACK)
v = x[0] + (diff * step) # use only the first channel
if v < lowest:
v = lowest
if v > highest:
v = highest
logging.info(
"%s volume %d -> %d",
"Increase" if diff>0 else "Decrease", x[0], v
)
r = mixer.setvolume(v)
def eventloop():
global state
device = evdev.InputDevice(ROTARY_INPUT)
logging.debug("Input: %s", device)
for event in device.read_loop():
logging.debug("Event: %s", event)
if event.type == 2:
diff = int(event.value)
change_volume(state.mixer, diff)
def main(dev=False):
logging.basicConfig(level=logging.DEBUG if dev else logging.INFO)
setup_mixer()
setup_mpd(dev=dev)
setup_buttons(dev=dev)
setup_player(play=False)
setup_radio()
if dev:
logging.info(
"Run input stream reader. Write <o> for press and {O} for long press."
)
for line in sys.stdin:
process_direct_command(line.strip())
else:
eventloop() # we would do signal.pause() if we didn't have evloop
logging.info("Wrap up")
if __name__ == "__main__":
dev = len(sys.argv) > 1 and sys.argv[1] == "--dev"
main(dev=dev)