-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.js
77 lines (75 loc) · 2.07 KB
/
threadpool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const MAX_THREADS = 7;
class ThreadPool {
constructor(delegate) {
this.uniqueId = 0;
this.promises = [];
this.pool = [];
this.queue = [];
this.poolIntervalId = null;
this.promises_resolve = {};
this.promises_reject = {};
this.createThreads(delegate);
}
createThreads(delegate) {
for(var i = 0; i < MAX_THREADS; ++i) {
this.pool.push(new Thread(delegate));
}
}
startMonitor() {
if(this.poolIntervalId == null) {
this.poolIntervalId = setInterval(() => {
if(this.queued > 0 && this.running < MAX_THREADS) {
var meta = this.queue.pop();
for(var i = 0; i < MAX_THREADS; ++i) {
if(!this.pool[i].running) {
var promise = this.pool[i].start.apply(this.pool[i], meta.arguments);
this.wrapThreadPromise(meta.id, promise);
break;
}
}
} else if(this.queued == 0 && this.running == 0) {
this.stopMonitor();
}
}, 1);
}
}
stopMonitor() {
if(this.poolIntervalId != null) {
clearInterval(this.poolIntervalId);
this.poolIntervalId = null;
}
}
wrapThreadPromise(id, threadPromise) {
threadPromise
.then((result) => { this.promises_resolve[id](result); this.destroyThreadpoolPromise(id); })
.catch((error) => { this.promises_reject[id](error); this.destroyThreadpoolPromise(id); })
}
destroyThreadpoolPromise(id) {
delete this.promises_resolve[id];
delete this.promises_reject[id];
}
createThreadPoolPromise(id) {
return new Promise((resolve, reject) => {
this.promises_resolve[id] = resolve;
this.promises_reject[id] = reject;
});
}
start(...args) {
var meta = { id: this.uniqueId++, arguments: args };
this.queue.push(meta);
this.startMonitor();
return this.createThreadPoolPromise(meta.id);
}
get running() {
var running = 0;
for(var i = 0; i < this.pool.length; ++i) {
if(this.pool[i].running) {
running++;
}
}
return running;
}
get queued() {
return this.queue.length;
}
}