Skip to content

Commit 2cb5300

Browse files
committed
commit code compelete
1 parent 34b71a8 commit 2cb5300

File tree

8 files changed

+365
-0
lines changed

8 files changed

+365
-0
lines changed

Blog/线程池增加监控.md

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
### jdk拓展线程池增加监控
2+
> 摘自: http://lixiaohui.iteye.com/blog/2330086
3+
4+
> MonitorableThreadPoolExecutor类代码如下:
5+
6+
```
7+
package com.sitech.crm_cmi.util.concurrent;
8+
9+
import java.util.concurrent.BlockingQueue;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.ConcurrentMap;
12+
import java.util.concurrent.RejectedExecutionHandler;
13+
import java.util.concurrent.ThreadFactory;
14+
import java.util.concurrent.ThreadPoolExecutor;
15+
import java.util.concurrent.TimeUnit;
16+
17+
/**
18+
* 摘自: http://lixiaohui.iteye.com/blog/2330086
19+
* 可监控的线程池, 可有多个监控处理器,如果监控的逻辑是比较耗时的话,最好另起个线程或者线程池专门用来跑MonitorHandler的方法.
20+
*/
21+
public class MonitorableThreadPoolExecutor extends ThreadPoolExecutor {
22+
/**
23+
* 可有多个监控处理器
24+
*/
25+
private final ConcurrentMap<String, MonitorHandler> handlerMap = new ConcurrentHashMap<String, MonitorHandler>();
26+
27+
public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
28+
long keepAliveTime, TimeUnit unit,
29+
BlockingQueue<Runnable> workQueue) {
30+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
31+
}
32+
33+
public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
34+
long keepAliveTime, TimeUnit unit,
35+
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
36+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
37+
threadFactory);
38+
}
39+
40+
public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
41+
long keepAliveTime, TimeUnit unit,
42+
BlockingQueue<Runnable> workQueue,
43+
RejectedExecutionHandler handler) {
44+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
45+
handler);
46+
}
47+
48+
public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
49+
long keepAliveTime, TimeUnit unit,
50+
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
51+
RejectedExecutionHandler handler) {
52+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
53+
threadFactory, handler);
54+
}
55+
56+
@Override
57+
protected void beforeExecute(Thread t, Runnable r) {
58+
super.beforeExecute(t, r);
59+
// 依次调用处理器
60+
for (MonitorHandler handler : handlerMap.values())
61+
if (handler.usable())
62+
handler.before(t, r);
63+
}
64+
65+
@Override
66+
protected void afterExecute(Runnable r, Throwable t) {
67+
super.afterExecute(r, t);
68+
// 依次调用处理器
69+
for (MonitorHandler handler : handlerMap.values())
70+
if (handler.usable())
71+
handler.after(r, t);
72+
}
73+
74+
@Override
75+
protected void terminated() {
76+
super.terminated();
77+
for (MonitorHandler handler : handlerMap.values())
78+
if (handler.usable())
79+
handler.terminated(getLargestPoolSize(),
80+
getCompletedTaskCount());
81+
}
82+
83+
public MonitorHandler addMonitorTask(String key, MonitorHandler task,
84+
boolean overrideIfExist) {
85+
if (overrideIfExist)
86+
return handlerMap.put(key, task);
87+
else
88+
return handlerMap.putIfAbsent(key, task);
89+
}
90+
91+
public MonitorHandler addMonitorTask(String key, MonitorHandler task) {
92+
return addMonitorTask(key, task, true);
93+
}
94+
95+
public MonitorHandler removeMonitorTask(String key) {
96+
return handlerMap.remove(key);
97+
}
98+
99+
}
100+
```
101+
102+
> MonitorHandler类代码如下:
103+
104+
```
105+
package com.sitech.crm_cmi.util.concurrent;
106+
107+
/**
108+
* 监控处理器, 目的是把before和after抽象出来, 以便在{@link MonitorableThreadPoolExecutor}
109+
* 中形成一条监控处理器链,观察者模式
110+
*/
111+
public interface MonitorHandler {
112+
113+
/**
114+
* 改监控任务是否可用
115+
*
116+
* @return
117+
*/
118+
boolean usable();
119+
120+
/**
121+
* 任务执行前回调
122+
*
123+
* @param thread
124+
* 即将执行该任务的线程
125+
* @param runnable
126+
* 即将执行的任务
127+
*/
128+
void before(Thread thread, Runnable runnable);
129+
130+
/**
131+
* <pre>
132+
* 任务执行后回调 注意:
133+
* 1.当你往线程池提交的是{@link Runnable} 对象时, 参数runnable就是一个
134+
* {@link Runnable}对象
135+
* 2.当你往线程池提交的是{@link java.util.concurrent.Callable<?>}
136+
* 对象时, 参数runnable实际上就是一个{@link java.util.concurrent.FutureTask<?>}对象
137+
* 这时你可以通过把参数runnable downcast为FutureTask<?>或者Future来获取任务执行结果
138+
* </pre>
139+
*
140+
* @param runnable
141+
* 执行完后的任务
142+
* @param throwable
143+
* 异常信息
144+
*/
145+
void after(Runnable runnable, Throwable throwable);
146+
147+
/**
148+
* 线程池关闭后回调
149+
*
150+
* @param largestPoolSize
151+
* @param completedTaskCount
152+
*/
153+
void terminated(int largestPoolSize, long completedTaskCount);
154+
155+
}
156+
```
157+
158+
> TimeMonitorHandler类代码如下:
159+
160+
```
161+
package com.sitech.crm_cmi.util.concurrent;
162+
163+
import java.util.Map;
164+
import java.util.concurrent.CancellationException;
165+
import java.util.concurrent.ConcurrentHashMap;
166+
import java.util.concurrent.ExecutionException;
167+
import java.util.concurrent.Future;
168+
import java.util.concurrent.FutureTask;
169+
170+
public class TimeMonitorHandler implements MonitorHandler {
171+
// 任务开始时间记录map, 多线程增删, 需用ConcurrentHashMap
172+
private final Map<Runnable, Long> timeRecords = new ConcurrentHashMap<Runnable, Long>();
173+
private final boolean usable;
174+
175+
public TimeMonitorHandler() {
176+
this(true);
177+
}
178+
179+
public TimeMonitorHandler(boolean usable) {
180+
this.usable = usable;
181+
}
182+
183+
@Override
184+
public boolean usable() {
185+
return usable;
186+
}
187+
188+
@Override
189+
public void terminated(int largestPoolSize, long completedTaskCount) {
190+
System.out.println(
191+
String.format("%s:largestPoolSize=%d, completedTaskCount=%s",
192+
time(), largestPoolSize, completedTaskCount));
193+
}
194+
195+
@Override
196+
public void before(Thread thread, Runnable runnable) {
197+
System.out.println(String.format("%s: before[%s -> %s]", time(),
198+
thread.getName(), runnable));
199+
timeRecords.put(runnable, System.currentTimeMillis());
200+
}
201+
202+
@Override
203+
public void after(Runnable runnable, Throwable throwable) {
204+
long costTime = System.currentTimeMillis()
205+
- timeRecords.remove(runnable);
206+
207+
Object result = null;
208+
if (throwable == null && runnable instanceof FutureTask<?>) {
209+
// 有返回值的异步任务,不一定是Callable<?>,也有可能是Runnable
210+
try {
211+
result = ((Future<?>) runnable).get();
212+
} catch (InterruptedException e) {
213+
Thread.currentThread().interrupt(); // reset
214+
} catch (ExecutionException e) {
215+
throwable = e;
216+
} catch (CancellationException e) {
217+
throwable = e;
218+
}
219+
}
220+
if (throwable == null) {
221+
// 任务正常结束
222+
if (result != null)
223+
// 有返回值的异步任务
224+
System.out.println(String.format(
225+
"%s: after[%s -> %s], costs %d millisecond, result: %s",
226+
time(), Thread.currentThread().getName(), runnable,
227+
costTime, result));
228+
else
229+
System.out.println(String.format(
230+
"%s: after[%s -> %s], costs %d millisecond", time(),
231+
Thread.currentThread().getName(), runnable, costTime));
232+
} else
233+
System.err.println(String.format(
234+
"%s: after[%s -> %s], costs %d millisecond, exception: %s",
235+
time(), Thread.currentThread().getName(), runnable,
236+
costTime, throwable.getCause()));
237+
}
238+
239+
private static String time() {
240+
return Long.toString(System.currentTimeMillis());
241+
}
242+
243+
}
244+
```
245+
246+
> Tester类代码如下:
247+
248+
```
249+
package com.sitech.crm_cmi.util.concurrent;
250+
251+
import java.io.IOException;
252+
import java.util.Random;
253+
import java.util.concurrent.Callable;
254+
import java.util.concurrent.LinkedBlockingQueue;
255+
import java.util.concurrent.TimeUnit;
256+
257+
public class Tester {
258+
private static volatile boolean stop = false;
259+
private static final Random random = new Random(47);
260+
261+
public static void main(String[] args)
262+
throws InterruptedException, IOException {
263+
// fixed size 10
264+
final MonitorableThreadPoolExecutor exec = new MonitorableThreadPoolExecutor(
265+
10, 10, 30, TimeUnit.SECONDS,
266+
new LinkedBlockingQueue<Runnable>());
267+
268+
exec.addMonitorTask("TimeMonitorTask", newTimeMonitorHandler());
269+
// 起一个线程不断地往线程池丢任务
270+
Thread t = new Thread(new Runnable() {
271+
@Override
272+
public void run() {
273+
startAddTask(exec);
274+
}
275+
});
276+
t.start();
277+
278+
// 丢任务丢50ms
279+
TimeUnit.MILLISECONDS.sleep(50);
280+
stop = true;
281+
t.join();
282+
exec.shutdown();
283+
// 等线程池任务跑完
284+
exec.awaitTermination(100, TimeUnit.SECONDS);
285+
}
286+
287+
// 随机runnable或者callable<?>, 任务随机抛异常
288+
private static void startAddTask(MonitorableThreadPoolExecutor pool) {
289+
int count = 0;
290+
while (!stop) {
291+
if (random.nextBoolean())
292+
// 丢Callable<?>任务
293+
pool.submit(new Callable<Boolean>() {
294+
@Override
295+
public Boolean call() throws Exception {
296+
// 随机抛异常
297+
boolean bool = random.nextBoolean();
298+
// 随机耗时 0~100 ms
299+
TimeUnit.MILLISECONDS.sleep(random.nextInt(100));
300+
if (bool)
301+
throw new RuntimeException("thrown randomly");
302+
return bool;
303+
}
304+
});
305+
else
306+
// 丢Runnable
307+
pool.submit(new Runnable() {
308+
@Override
309+
public void run() {
310+
// 随机耗时 0~100 ms
311+
try {
312+
TimeUnit.MILLISECONDS.sleep(random.nextInt(100));
313+
} catch (InterruptedException e) {
314+
Thread.currentThread().interrupt();
315+
}
316+
// 随机抛异常
317+
if (random.nextBoolean())
318+
throw new RuntimeException("thrown randomly");
319+
};
320+
});
321+
System.out.println(
322+
String.format("%s:submitted %d task", time(), ++count));
323+
}
324+
}
325+
326+
private static MonitorHandler newTimeMonitorHandler() {
327+
return new TimeMonitorHandler();
328+
}
329+
330+
private static String time() {
331+
return String.valueOf(System.currentTimeMillis());
332+
}
333+
}
334+
```

Blog/线程池增加监控.pdf

488 KB
Binary file not shown.
8.47 MB
Binary file not shown.
26.7 MB
Binary file not shown.
1.75 MB
Binary file not shown.

Other/file.jar

-2 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)