-
Notifications
You must be signed in to change notification settings - Fork 1
/
ladspa_server.py
76 lines (63 loc) · 2.46 KB
/
ladspa_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python
# coding: utf8
"""
Ladspa server implementatoin
USAGE: import ladspa_server
"""
import socketserver
import threading
import numpy as np
__email__ = '[email protected]'
__author__ = 'Mohamed Elawadi'
__license__ = 'MIT License'
class LADSPA_TCPServer(socketserver.BaseRequestHandler):
def process(self, channel, sample_rate, sound):
"""Process one chunk of data coming from channel
Parameters
----------
channel : int
Channel Number from 0 to `k`.
sample_rate : int
Sample rate (ex. `44100`).
sound: numpy array of shape (n,) and type float32
Input sound
Returns
------
numpy array of shape (n,) and type float32
The processed sound
"""
raise NotImplementedError("process should be implemented")
def handle(self):
print("got a new request")
while True:
longdt = np.dtype('int64').newbyteorder('<')
floatdt = np.dtype('float32').newbyteorder('<')
channel_str = self.request.recv(8)
if len(channel_str) == 0:
print("request over")
return
channel = np.frombuffer(channel_str, dtype=longdt)[0]
sample_rate = np.frombuffer(self.request.recv(8), dtype=longdt)[0]
size = np.frombuffer(self.request.recv(8), dtype=longdt)[0]
received = self.request.recv(size * 4)
while len(received) < size * 4:
received += self.request.recv(size * 4 - len(received))
data = np.frombuffer(received, dtype=floatdt)
ret = self.process(channel, sample_rate, data)
if ret.shape != data.shape:
print("the shape of the returned sound: %s doesn't match the shape of the received one: %s", ret.shape, data.shape)
ret = data
if ret.dtype != 'float32':
print("the datatype of the returned data `%s` is not float32", ret.dtype)
ret = data
self.request.sendall(ret.tobytes())
@classmethod
def serve_forever(cls, port, host="127.0.0.1"):
tcp_server = socketserver.ThreadingTCPServer((host, port), cls)
tcp_server.serve_forever()
if __name__ == "__main__":
class Demo_TCPServer(LADSPA_TCPServer):
def process(self, channel, sample_rate, sound):
return sound
print("listening on 8083")
Demo_TCPServer.serve_forever(8083)