-
Notifications
You must be signed in to change notification settings - Fork 0
/
bda_filters.py
322 lines (289 loc) · 13.2 KB
/
bda_filters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import sys
import plotly.graph_objects as go
import itertools
import copy
from settings import filterWidth, channelWidth, filterCountMax
from gui import drawSplit, drawUncoveredChannel, drawConflict, renderGUI
channelWidthHalf = int(channelWidth/2)
filterWidthHalf = int(filterWidth/2)
searchGranularity = int(channelWidthHalf/1) #TODO find minimum value
class Channel:
def __init__(self, freqCenter, width=channelWidth):
self.freqCenter = freqCenter
self.freqLow = freqCenter - int(width/2)
self.freqHigh = freqCenter + int(width/2)
def __lt__(self, other):
return self.freqCenter < other.freqCenter
def __str__(self):
return str(self.freqCenter)
class Filter:
def __init__(self, freqCenter, width=filterWidth):
self.freqCenter = freqCenter
self.freqLow = freqCenter - int(width/2)
self.freqHigh = freqCenter + int(width/2)
self.width = width
self.channels = []
self.channelScore = 0
self.centerScore = 0
def __str__(self):
return str(self.freqCenter)
def calcFilterChannelScore(self):
self.channelScore = -1 * (len(self.channels) * len(self.channels))
return
def calcFilterCenterScore(self):
freqSum = 0
for channel in self.channels:
freqSum += channel.freqCenter
freqAverage = int(freqSum / len(self.channels))
delta = (self.freqCenter - freqAverage) / 1000000
self.centerScore = -1 * (delta * delta)
return
class SubSolution:
def __init__(self, filters, channels):
self.filters = filters
self.filterCount = len(filters)
self.channels = channels
self.channelScore = sum(f.channelScore for f in filters) / self.filterCount
self.centerScore = sum(f.centerScore for f in filters) / self.filterCount
class Solution:
def __init__(self, subSolutions):
self.subSolutions = subSolutions
self.filterCount = sum(s.filterCount for s in subSolutions)
self.channelScore = sum(s.channelScore for s in subSolutions) / len(subSolutions)
self.centerScore = sum(s.centerScore for s in subSolutions) / len(subSolutions)
# Given channels and a filter, return all filters passed by the channel
def getChannelsInFilter(filter, channels):
channelsInFilter = []
for channel in channels:
if(checkChannelInFilter(filter, channel)):
channelsInFilter.append(channel)
return channelsInFilter
# Given a potential filter, existing filters, and channels, check if new filter is valid.
# If filter is valid, return True.
# If not valid, return False.
def validateFilter(newFilter, filters, channels):
filterValid = True
# Filter can't split a channel. Channel must be fully in or out of filter.
for channel in channels:
filterValid = filterValid and checkFilterSplit(newFilter, channel)
# Filter can't overlap any other filter.
for filter in filters:
filterValid = filterValid and not checkFilterOverlap(newFilter, filter)
# Filter must fully pass at least one channel.
filterContainsChannel = False
for channel in channels:
filterContainsChannel = filterContainsChannel or checkChannelInFilter(newFilter, channel)
filterValid = filterValid and filterContainsChannel
return filterValid
# Given a filter and channel, check if channel is within filter
# Given a filter and channel, check if the channel is fully passed by filter.
# If channel fully passes, return True.
# If not, return False.
def checkChannelInFilter(filter, channel):
return (filter.freqLow <= channel.freqLow <= filter.freqHigh)\
and (filter.freqLow <= channel.freqHigh <= filter.freqHigh)
# Given a filter and channel, check if the filter edge falls within the channel
def checkFilterSplit(filter, channel):
return not ((channel.freqLow < filter.freqLow < channel.freqHigh)\
or (channel.freqLow < filter.freqHigh < channel.freqHigh))
# Given two filters of identical width, check if filters overlap.
# If filters overlap (invalid), return True.
# If filters don't overlap, return False.
def checkFilterOverlap(filter1, filter2):
return (filter1.freqLow < filter2.freqHigh) and (filter2.freqLow < filter1.freqHigh)
# Check if channel is fully passed by only one filter in filter list
def checkChannel(fig, channel, filters):
filterCount = 0
for filter in filters:
if (filter.freqLow <= channel.freqLow <= filter.freqHigh)\
and (filter.freqLow <= channel.freqHigh <= filter.freqHigh):
filterCount += 1
if filterCount == 0:
print("Channel", channel.freqCenter, "not passed!")
drawUncoveredChannel(fig, channel)
elif filterCount > 1:
print("Channel", channel.freqCenter, "covered by multiple filters!")
drawUncoveredChannel(fig, channel)
return filterCount == 1
# Check if set of channels and filters are valid.
# Solution is valid if no filters overlap and if all channels are fully passed.
# Return True if valid.
def checkSolution(fig, channels, filters):
validSolution = True
filterCombos = list(itertools.combinations(filters, 2))
for filterPair in filterCombos:
if checkFilterOverlap(filterPair[0], filterPair[1]):
# Channels overlap
validSolution = False
print("Filter overlap between ", filterPair[0], filterPair[1])
drawConflict(fig, filterPair[0])
drawConflict(fig, filterPair[1])
for channel in channels:
if checkChannel(fig, channel, filters) == False:
validSolution = False
print("Valid solution:", validSolution)
return validSolution
# Check if channels are far enough apart such that no valid filter on one could affect the other.
# If indpendent, return True.
# If not, return False.
def channelsAreIndependent(channel1, channel2):
return (channel2.freqCenter-channel1.freqCenter) + channelWidth >= 2 * filterWidth
# Given a list of channels, split channels into independent groups.
# Return list of list of channels.
def splitChannels(fig, channels):
channels.sort()
channelRanges = []
lowIndex = 0
for i in range(0, len(channels)+1):
if i == len(channels):
channelRanges.append(channels[lowIndex:i])
elif i != lowIndex:
if channelsAreIndependent(channels[i-1], channels[i]):
channelRanges.append(channels[lowIndex:i])
drawSplit(fig, 0.5*(channels[i].freqCenter+channels[i-1].freqCenter))
lowIndex = i
return channelRanges
# Recursive portion of solveChannels.
def solveChannelsRec(subChannels, solutions, channelsCopy, newFilters):
# If no channels remain to be covered, we're done.
if len(channelsCopy) == 0:
solution = SubSolution(newFilters, subChannels)
# Reduce solutions to best solution per number of filters
if solution.filterCount in solutions:
# If we already have a solution for this number of filters, only save best.
solutionOptions = [solution, solutions[solution.filterCount]]
solutionOptions.sort(key=lambda i: (i.filterCount, i.channelScore, i.centerScore))
solutions[solution.filterCount] = solutionOptions[-1]
else:
solutions[solution.filterCount] = solution
return
# Find range of possible filter frequencies.
fLow = min(channelsCopy).freqCenter + channelWidthHalf - filterWidthHalf
fHigh = (max(channelsCopy).freqCenter - channelWidthHalf) + filterWidthHalf
# Create filter and check if valid.
for grid in range(fLow, fHigh + channelWidthHalf, searchGranularity):
newFiltersCopy = copy.copy(newFilters)
channelsCopyRec = copy.copy(channelsCopy)
newFilter = Filter(grid)
if(validateFilter(newFilter, newFiltersCopy, channelsCopy)):
channelsInFilter = getChannelsInFilter(newFilter, channelsCopyRec)
newFilter.channels = channelsInFilter
newFilter.calcFilterCenterScore()
newFilter.calcFilterChannelScore()
# Add filter to existing filter list.
newFiltersCopy.append(newFilter)
# Remove channels covered by new filter to create new channel subset.
for channel in channelsInFilter:
channelsCopyRec.remove(channel)
# Solve remaining channels recurrsively.
solveChannelsRec(channelsCopy, solutions, channelsCopyRec, newFiltersCopy)
return
# Initial recursive driver to find solutions for a set of channels.
# Return dict mapping filterCount to SubSolution.
def solveChannels(channels):
# Find range of possible filter frequencies.
fLow = min(channels).freqCenter + channelWidthHalf - filterWidthHalf
fHigh = (max(channels).freqCenter - channelWidthHalf) + filterWidthHalf
solutions = {}
# Create filter and check if valid.
for grid in range(fLow, fHigh + searchGranularity, searchGranularity):
channelsCopy = copy.copy(channels)
newFilters = []
# Pick filter
newFilter = Filter(grid)
if(validateFilter(newFilter, newFilters, channelsCopy)):
channelsInFilter = getChannelsInFilter(newFilter, channelsCopy)
newFilter.channels = channelsInFilter
newFilter.calcFilterCenterScore()
newFilter.calcFilterChannelScore()
# Add filter to existing filter list.
newFilters.append(newFilter)
# Remove channels covered by new filter to create new channel subset.
for channel in channelsInFilter:
channelsCopy.remove(channel)
# Solve remaining channels recurrsively.
solveChannelsRec(channels, solutions, channelsCopy, newFilters)
return solutions
def getChannelsFromFile(filePath, channelsAll):
f = open(filePath,"r")
for line in f.read().splitlines():
freq = float(line.replace(",",""))
# If input is in MHz, convert to Hz.
if freq < 1000:
freq = freq * 1000000
# Check if input is within US Dl ranges.
if not((758000000 <= freq <= 775000000) or (851000000 <= freq <= 869000000)):
print("Error:", freq, "is outside of US DL ranges.")
exit(1)
freq = int(freq)
# Check for duplicate channels
if any(channel.freqCenter == freq for channel in channelsAll):
print("Error:", freq, "appears in input twice.")
exit(1)
# Add channel to list
channelsAll.append(Channel(freq))
return
def main():
if(len(sys.argv)==1):
print("Error: Please provide path to channel lists (csv/txt) as argument.")
exit(1)
# Get channels from file arguments
channelsAll = []
for path in sys.argv[1:]:
getChannelsFromFile(path, channelsAll)
print("Channels:", len(channelsAll))
fig = go.Figure()
# Split channels into independent subgroups.
# For each subgroup, find best solution per filter count.
channelSubgroups = splitChannels(fig, channelsAll)
subgroupSolutions = [None] * len(channelSubgroups)
for i in range(0, len(channelSubgroups)):
channelSubgroup = channelSubgroups[i]
print("Solving subset of size", len(channelSubgroup))
if len(channelSubgroup) == 1:
# Add filters 1:1 to single isolated channels
newFilter = Filter(channelSubgroup[0].freqCenter)
newFilter.channels = [channelSubgroup[0]]
newFilter.calcFilterChannelScore()
newFilter.calcFilterCenterScore()
subgroupSolutions[i] = {1: SubSolution([newFilter], channelSubgroup)}
else:
subgroupSolutions[i] = solveChannels(channelSubgroup)
# Get all combinations of subgroupSolutions.
# Create Solution for each combination.
solutionCombos = list(itertools.product(*subgroupSolutions))
solutions = []
for s in solutionCombos:
subSolutionList = []
for i in range(0, len(s)):
subSolutionList.append(subgroupSolutions[i][s[i]])
solutions.append(Solution(subSolutionList))
# Sort solutions from worst to best.
solutions.sort(key=lambda i: (i.filterCount, i.channelScore, i.centerScore))
print("Max filter solution uses", solutions[-1].filterCount, "filters.")
print("Min filter solution uses", solutions[0].filterCount, "filters.")
# Traverse solution list until filter count is within limit or until end of list.
filters = []
for i in reversed(range(0, len(solutions))):
if solutions[i].filterCount <= filterCountMax:
# Solution within limits has been found.
# Print results and write to file.
# Render GUI.
print("Best solution within limits:", solutions[i].filterCount,\
solutions[i].channelScore, solutions[i].centerScore)
for s in solutions[i].subSolutions:
filters += s.filters
checkSolution(fig, channelsAll, filters)
# Print filters and write to file.
print("Filters used:",len(filters))
f = open("filters.txt", "w")
for filter in filters:
print(filter)
f.write(str(filter)+",\n")
f.close()
renderGUI(fig, channelsAll, filters)
exit(0)
print("Error: No solution for", filterCountMax, "filters.")
exit(0)
if __name__ == "__main__":
main()