-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path15274396845490.html
581 lines (345 loc) · 35.6 KB
/
15274396845490.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
<!doctype html>
<html class="no-js" lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>
面试-线程池的成长之路 - Junkman
</title>
<link href="atom.xml" rel="alternate" title="Junkman" type="application/atom+xml">
<link rel="stylesheet" href="asset/css/foundation.min.css" />
<link rel="stylesheet" href="asset/css/docs.css" />
<script src="asset/js/vendor/modernizr.js"></script>
<script src="asset/js/vendor/jquery.js"></script>
<script src="asset/highlightjs/highlight.pack.js"></script>
<link href="asset/highlightjs/styles/github.css" media="screen, projection" rel="stylesheet" type="text/css">
<script>hljs.initHighlightingOnLoad();</script>
<script type="text/javascript">
function before_search(){
var searchVal = 'site:panlw.github.io ' + document.getElementById('search_input').value;
document.getElementById('search_q').value = searchVal;
return true;
}
</script>
</head>
<body class="antialiased hide-extras">
<div class="marketing off-canvas-wrap" data-offcanvas>
<div class="inner-wrap">
<nav class="top-bar docs-bar hide-for-small" data-topbar>
<section class="top-bar-section">
<div class="row">
<div style="position: relative;width:100%;"><div style="position: absolute; width:100%;">
<ul id="main-menu" class="left">
<li id=""><a target="self" href="index.html">Home</a></li>
<li id=""><a target="_self" href="archives.html">Archives</a></li>
</ul>
<ul class="right" id="search-wrap">
<li>
<form target="_blank" onsubmit="return before_search();" action="http://google.com/search" method="get">
<input type="hidden" id="search_q" name="q" value="" />
<input tabindex="1" type="search" id="search_input" placeholder="Search"/>
</form>
</li>
</ul>
</div></div>
</div>
</section>
</nav>
<nav class="tab-bar show-for-small">
<a href="javascript:void(0)" class="left-off-canvas-toggle menu-icon">
<span> Junkman</span>
</a>
</nav>
<aside class="left-off-canvas-menu">
<ul class="off-canvas-list">
<li><a href="index.html">HOME</a></li>
<li><a href="archives.html">Archives</a></li>
<li><a href="about.html">ABOUT</a></li>
<li><label>Categories</label></li>
<li><a href="Infra.html">Infra</a></li>
<li><a href="Coding.html">Coding</a></li>
<li><a href="Modeling.html">Modeling</a></li>
<li><a href="Archtecting.html">Archtecting</a></li>
</ul>
</aside>
<a class="exit-off-canvas" href="#"></a>
<section id="main-content" role="main" class="scroll-container">
<script type="text/javascript">
$(function(){
$('#menu_item_index').addClass('is_active');
});
</script>
<div class="row">
<div class="large-8 medium-8 columns">
<div class="markdown-body article-wrap">
<div class="article">
<h1>面试-线程池的成长之路</h1>
<div class="read-more clearfix">
<span class="date">2018/5/28</span>
<span>posted in </span>
<span class="posted-in"><a href='Microservice.html'>Microservice</a></span>
<span class="comments">
</span>
</div>
</div><!-- article -->
<div class="article-content">
<p>尹吉欢 投稿 纯洁的微笑 3天前</p>
<blockquote>
<p><a href="https://mp.weixin.qq.com/s/5dexEENTqJWXN_17c6Lz6A">原文地址</a></p>
</blockquote>
<h1 id="toc_0"><strong>背景</strong></h1>
<p>相信大家在面试过程中遇到面试官问线程的很多,线程过后就是线程池了。从易到难,都是这么个过程,还有就是确实很多人在工作中接触线程池比较少,最多的也就是创建一个然后往里面提交线程,对于一些经验很丰富的面试官来说,一下就可以问出很多线程池相关的问题,与其被问的晕头转向,还不如好好学习。此时不努力更待何时。</p>
<h1 id="toc_1">什么是线程池?</h1>
<p>线程池是一种多线程处理形式,处理过程中将任务提交到线程池,任务的执行交由线程池来管理。</p>
<p>如果每个请求都创建一个线程去处理,那么服务器的资源很快就会被耗尽,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。</p>
<p>如果用生活中的列子来说明,我们可以把线程池当做一个客服团队,如果同时有 1000 个人打电话进行咨询,按照正常的逻辑那就是需要 1000 个客服接听电话,服务客户。现实往往需要考虑到很多层面的东西,比如:资源够不够,招这么多人需要费用比较多。正常的做法就是招 100 个人成立一个客服中心,当有电话进来后分配没有接听的客服进行服务,如果超出了 100 个人同时咨询的话,提示客户等待,稍后处理,等有客服空出来就可以继续服务下一个客户,这样才能达到一个资源的合理利用,实现效益的最大化。</p>
<h1 id="toc_2">Java 中的线程池种类</h1>
<p><strong>1. newSingleThreadExecutor</strong></p>
<p>创建方式:</p>
<pre><code>ExecutorService pool = Executors.newSingleThreadExecutor();
</code></pre>
<p>一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。</p>
<p>使用方式:</p>
<pre><code>import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPool { public static void main(String[] args) { ExecutorService pool = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { pool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t开始发车啦...."); }); } }}
</code></pre>
<p>输出结果如下:</p>
<pre><code>pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....pool-1-thread-1 开始发车啦....
</code></pre>
<p>从输出的结果我们可以看出,一直只有一个线程在运行。</p>
<p>2.<strong>newFixedThreadPool</strong></p>
<p>创建方式:</p>
<pre><code>ExecutorService pool = Executors.newFixedThreadPool(10);
</code></pre>
<p>创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。</p>
<p>使用方式:</p>
<pre><code>import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPool { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { pool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t开始发车啦...."); }); } }}
</code></pre>
<p>输出结果如下:</p>
<pre><code>pool-1-thread-1 开始发车啦....pool-1-thread-4 开始发车啦....pool-1-thread-3 开始发车啦....pool-1-thread-2 开始发车啦....pool-1-thread-6 开始发车啦....pool-1-thread-7 开始发车啦....pool-1-thread-5 开始发车啦....pool-1-thread-8 开始发车啦....pool-1-thread-9 开始发车啦....pool-1-thread-10 开始发车啦....
</code></pre>
<p><strong>3. newCachedThreadPool</strong></p>
<p>创建方式:</p>
<pre><code>ExecutorService pool = Executors.newCachedThreadPool();
</code></pre>
<p>创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务。</p>
<p>使用方式如上 2 所示。</p>
<p>4.<strong>newScheduledThreadPool</strong></p>
<p>创建方式:</p>
<pre><code>ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
</code></pre>
<p>此线程池支持定时以及周期性执行任务的需求。</p>
<p>使用方式:</p>
<pre><code>import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPool { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(10); for (int i = 0; i < 10; i++) { pool.schedule(() -> { System.out.println(Thread.currentThread().getName() + "\t开始发车啦...."); }, 10, TimeUnit.SECONDS); } }}
</code></pre>
<p>上面演示的是延迟 10 秒执行任务, 如果想要执行周期性的任务可以用下面的方式,每秒执行一次</p>
<pre><code>//pool.scheduleWithFixedDelay也可以pool.scheduleAtFixedRate(() -> { System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");}, 1, 1, TimeUnit.SECONDS);
</code></pre>
<p>5.<strong>newWorkStealingPool</strong></p>
<p>newWorkStealingPool 是 jdk1.8 才有的,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层用的 ForkJoinPool 来实现的。ForkJoinPool 的优势在于,可以充分利用多 cpu,多核 cpu 的优势,把一个任务拆分成多个 “小任务”,把多个“小任务” 放到多个处理器核心上并行执行;当多个 “小任务” 执行完成之后,再将这些执行结果合并起来即可。</p>
<h1 id="toc_3">说说线程池的拒绝策略</h1>
<p>当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler 接口提供了拒绝任务处理的自定义方法的机会。在 ThreadPoolExecutor 中已经包含四种处理策略。</p>
<ul>
<li> AbortPolicy 策略:该策略会直接抛出异常,阻止系统正常工作。</li>
</ul>
<pre><code>public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}
</code></pre>
<ul>
<li> CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。</li>
</ul>
<pre><code>public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }}
</code></pre>
<ul>
<li> DiscardOleddestPolicy 策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。</li>
</ul>
<pre><code>public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}
</code></pre>
<ul>
<li> DiscardPolicy 策略:该策略默默的丢弃无法处理的任务,不予任何处理。</li>
</ul>
<pre><code>public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }}
</code></pre>
<p>除了 JDK 默认为什么提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现 RejectedExecutionHandler 接口即可</p>
<p>比如 Spring integration 中就有一个自定义的拒绝策略 CallerBlocksPolicy,将任务插入到队列中,直到队列中有空闲并插入成功的时候,否则将根据最大等待时间一直阻塞,直到超时。</p>
<pre><code>package org.springframework.integration.util;import java.util.concurrent.BlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;public class CallerBlocksPolicy implements RejectedExecutionHandler { private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class); private final long maxWait; /** * @param maxWait The maximum time to wait for a queue slot to be * available, in milliseconds. */ public CallerBlocksPolicy(long maxWait) { this.maxWait = maxWait; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { BlockingQueue<Runnable> queue = executor.getQueue(); if (logger.isDebugEnabled()) { logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds"); } if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) { throw new RejectedExecutionException("Max wait time expired to queue task"); } if (logger.isDebugEnabled()) { logger.debug("Task execution queued"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Interrupted", e); } } else { throw new RejectedExecutionException("Executor has been shut down"); } }}
</code></pre>
<p>定义好之后如何使用呢?光定义没用的呀,一定要用到线程池中呀,可以通过下面的方式自定义线程池,指定拒绝策略。</p>
<pre><code>BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());
</code></pre>
<h1 id="toc_4">execute 和 submit 的区别?</h1>
<p>在前面的讲解中,我们执行任务是用的 execute 方法,除了 execute 方法,还有一个 submit 方法也可以执行我们提交的任务。</p>
<p>这两个方法有什么区别呢?分别适用于在什么场景下呢?我们来做一个简单的分析。</p>
<p>execute 适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了</p>
<pre><code>public class ThreadPool { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(10); pool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t开始发车啦...."); }); }}
</code></pre>
<p>submit 方法适用于需要关注返回值的场景,submit 方法的定义如下:</p>
<pre><code>public interface ExecutorService extends Executor { ... <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); ...}
</code></pre>
<p>其子类 AbstractExecutorService 实现了 submit 方法, 可以看到无论参数是 Callable 还是 Runnable,最终都会被封装成 RunnableFuture,然后再调用 execute 执行。</p>
<pre><code> /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
</code></pre>
<p>下面我们来看看这三个方法分别如何去使用:</p>
<p><strong>submit(Callable<t style="max-width: 100%;box-sizing: border-box;font-size: inherit;color: inherit;line-height: inherit;word-wrap: break-word !important;"> task);</t></strong></p>
<pre><code>public class ThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(10); Future<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { return "Hello"; } }); String result = future.get(); System.out.println(result); }}
</code></pre>
<p><strong>submit(Runnable task, T result);</strong></p>
<pre><code>public class ThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(10); Data data = new Data(); Future<Data> future = pool.submit(new MyRunnable(data), data); String result = future.get().getName(); System.out.println(result); }}class Data { String name; public String getName() { return name; } public void setName(String name) { this.name = name; }}class MyRunnable implements Runnable { private Data data; public MyRunnable(Data data) { this.data = data; } @Override public void run() { data.setName("yinjihuan"); }}
</code></pre>
<p><strong>Future submit(Runnable task);</strong><br/>
直接 submit 一个 Runnable 是拿不到返回值的,返回值就是 null.</p>
<h1 id="toc_5">五种线程池的使用场景</h1>
<ul>
<li><p>newSingleThreadExecutor:一个单线程的线程池,可以用于需要保证顺序执行的场景,并且只有一个线程在执行。</p></li>
<li><p>newFixedThreadPool:一个固定大小的线程池,可以用于已知并发压力的情况下,对线程数做限制。</p></li>
<li><p>newCachedThreadPool:一个可以无限扩大的线程池,比较适合处理执行时间比较小的任务。</p></li>
<li><p>newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。</p></li>
<li><p>newWorkStealingPool:一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用 cpu 数量的线程来并行执行。</p></li>
</ul>
<h1 id="toc_6">线程池的关闭</h1>
<p>关闭线程池可以调用 shutdownNow 和 shutdown 两个方法来实现</p>
<p><strong>shutdownNow:对正在执行的任务全部发出 interrupt(),停止执行,对还未开始执行的任务全部取消,并且返回还没开始的任务列表</strong></p>
<pre><code>public class ThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(1); for (int i = 0; i < 5; i++) { System.err.println(i); pool.execute(() -> { try { Thread.sleep(30000); System.out.println("--"); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(1000); List<Runnable> runs = pool.shutdownNow(); }}
</code></pre>
<p>上面的代码模拟了立即取消的场景,往线程池里添加 5 个线程任务,然后 sleep 一段时间,线程池只有一个线程,如果此时调用 shutdownNow 后应该需要中断一个正在执行的任务和返回 4 个还未执行的任务,控制台输出下面的内容:</p>
<pre><code>01234[fs.ThreadPool$$Lambda$1/990368553@682a0b20, fs.ThreadPool$$Lambda$1/990368553@682a0b20, fs.ThreadPool$$Lambda$1/990368553@682a0b20, fs.ThreadPool$$Lambda$1/990368553@682a0b20]java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at fs.ThreadPool.lambda$0(ThreadPool.java:15) at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
</code></pre>
<p><strong>shutdown:当我们调用 shutdown 后,线程池将不再接受新的任务,但也不会去强制终止已经提交或者正在执行中的任务</strong></p>
<pre><code>public class ThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(1); for (int i = 0; i < 5; i++) { System.err.println(i); pool.execute(() -> { try { Thread.sleep(30000); System.out.println("--"); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(1000); pool.shutdown(); pool.execute(() -> { try { Thread.sleep(30000); System.out.println("--"); } catch (Exception e) { e.printStackTrace(); } }); }}
</code></pre>
<p>上面的代码模拟了正在运行的状态,然后调用 shutdown,接着再往里面添加任务,肯定是拒绝添加的,请看输出结果:</p>
<pre><code>01234Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.ThreadPoolExecutor@214c265e[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at fs.ThreadPool.main(ThreadPool.java:24)
</code></pre>
<p>还有一些业务场景下需要知道线程池中的任务是否全部执行完成,当我们关闭线程池之后,可以用 isTerminated 来判断所有的线程是否执行完成,千万不要用 isShutdown,isShutdown 只是返回你是否调用过 shutdown 的结果。</p>
<pre><code>public class ThreadPool { public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(1); for (int i = 0; i < 5; i++) { System.err.println(i); pool.execute(() -> { try { Thread.sleep(3000); System.out.println("--"); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(1000); pool.shutdown(); while(true){ if(pool.isTerminated()){ System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } }}
</code></pre>
<h1 id="toc_7">自定义线程池</h1>
<p>在实际的使用过程中,大部分我们都是用 Executors 去创建线程池直接使用,如果有一些其他的需求,比如指定线程池的拒绝策略,阻塞队列的类型,线程名称的前缀等等,我们可以采用自定义线程池的方式来解决。</p>
<p>如果只是简单的想要改变线程名称的前缀的话可以自定义 ThreadFactory 来实现,在 Executors.new… 中有一个 ThreadFactory 的参数,如果没有指定则用的是 DefaultThreadFactory。</p>
<p>自定义线程池核心在于创建一个 ThreadPoolExecutor 对象,指定参数</p>
<p>下面我们看下 ThreadPoolExecutor 构造函数的定义:</p>
<pre><code>public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ;
</code></pre>
<ul>
<li><p>corePoolSize<br/>
线程池大小,决定着新提交的任务是新开线程去执行还是放到任务队列中,也是线程池的最最核心的参数。一般线程池开始时是没有线程的,只有当任务来了并且线程数量小于 corePoolSize 才会创建线程。</p></li>
<li><p>maximumPoolSize<br/>
最大线程数,线程池能创建的最大线程数量。</p></li>
<li><p>keepAliveTime<br/>
在线程数量超过 corePoolSize 后,多余空闲线程的最大存活时间。</p></li>
<li><p>unit<br/>
时间单位</p></li>
<li><p>workQueue<br/>
存放来不及处理的任务的队列,是一个 BlockingQueue。</p></li>
<li><p>threadFactory<br/>
生产线程的工厂类,可以定义线程名,优先级等。</p></li>
<li><p>handler<br/>
拒绝策略,当任务来不及处理的时候,如何处理, 前面有讲解。</p></li>
</ul>
<p>了解上面的参数信息后我们就可以定义自己的线程池了,我这边用 ArrayBlockingQueue 替换了 LinkedBlockingQueue,指定了队列的大小,当任务超出队列大小之后使用 CallerRunsPolicy 拒绝策略处理。</p>
<p>这样做的好处是严格控制了队列的大小,不会出现一直往里面添加任务的情况,有的时候任务处理的比较慢,任务数量过多会占用大量内存,导致内存溢出。</p>
<p>当然你也可以在提交到线程池的入口进行控制,比如用 CountDownLatch, Semaphore 等。</p>
<pre><code>/** * 自定义线程池<br> * 默认的newFixedThreadPool里的LinkedBlockingQueue是一个无边界队列,如果不断的往里加任务,最终会导致内存的不可控<br> * 增加了有边界的队列,使用了CallerRunsPolicy拒绝策略 * @author yinjihuan * */public class FangjiaThreadPoolExecutor { private static ExecutorService executorService = newFixedThreadPool(50); private static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy()); } public static void execute(Runnable command) { executorService.execute(command); } public static void shutdown() { executorService.shutdown(); } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "FSH-pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }}
</code></pre>
</div>
<div class="row">
<div class="large-6 columns">
<p class="text-left" style="padding:15px 0px;">
<a href="15274398936380.html"
title="Previous Post: 电商平台备战促销季的运维秘诀——高可用服务层">« 电商平台备战促销季的运维秘诀——高可用服务层</a>
</p>
</div>
<div class="large-6 columns">
<p class="text-right" style="padding:15px 0px;">
<a href="15274395979385.html"
title="Next Post: 不理解Zookeeper一致性原理,谈何异地多活改造">不理解Zookeeper一致性原理,谈何异地多活改造 »</a>
</p>
</div>
</div>
<div class="comments-wrap">
<div class="share-comments">
<script type="text/javascript" src="//s7.addthis.com/js/300/addthis_widget.js#pubid=ra-5ae58078c0d7b2ab"></script>
</div>
</div>
</div><!-- article-wrap -->
</div><!-- large 8 -->
<div class="large-4 medium-4 columns">
<div class="hide-for-small">
<div id="sidebar" class="sidebar">
<div id="site-info" class="site-info">
<div class="site-a-logo"><img src="./asset/img/logo.jpg" /></div>
<h1>Junkman</h1>
<div class="site-des">“拾荒者”一词来自凯文・凯利的《失控》中关于机器学习的故事(“收集癖好机”如何完成他的收集工作)。</div>
<div class="social">
<a target="_blank" class="github" target="_blank" href="https://github.com/panlw/" title="GitHub">GitHub</a>
<a target="_blank" class="rss" href="atom.xml" title="RSS">RSS</a>
</div>
</div>
<div id="site-categories" class="side-item ">
<div class="side-header">
<h2>Categories</h2>
</div>
<div class="side-content">
<p class="cat-list">
<a href="Infra.html"><strong>Infra</strong></a>
<a href="Coding.html"><strong>Coding</strong></a>
<a href="Modeling.html"><strong>Modeling</strong></a>
<a href="Archtecting.html"><strong>Archtecting</strong></a>
</p>
</div>
</div>
<div id="site-categories" class="side-item">
<div class="side-header">
<h2>Recent Posts</h2>
</div>
<div class="side-content">
<ul class="posts-list">
<li class="post">
<a href="15517999043443.html">The Art of Crafting Architectural Diagrams</a>
</li>
<li class="post">
<a href="15517997955971.html">为什么说我们需要软件架构图?</a>
</li>
<li class="post">
<a href="15516128677869.html">DNS Servers That Offer Privacy and Filtering</a>
</li>
<li class="post">
<a href="15516123108194.html">Airbnb's Migration from Monolith to Services</a>
</li>
<li class="post">
<a href="15516097487470.html">Events As First-Class Citizens</a>
</li>
</ul>
</div>
</div>
</div><!-- sidebar -->
</div><!-- hide for small -->
</div><!-- large 4 -->
</div><!-- row -->
<div class="page-bottom clearfix">
<div class="row">
<p class="copyright">Copyright © 2015
Powered by <a target="_blank" href="http://www.mweb.im">MWeb</a>,
Theme used <a target="_blank" href="http://github.com">GitHub CSS</a>.</p>
</div>
</div>
</section>
</div>
</div>
<script src="asset/js/foundation.min.js"></script>
<script>
$(document).foundation();
function fixSidebarHeight(){
var w1 = $('.markdown-body').height();
var w2 = $('#sidebar').height();
if (w1 > w2) { $('#sidebar').height(w1); };
}
$(function(){
fixSidebarHeight();
})
$(window).load(function(){
fixSidebarHeight();
});
</script>
<script src="asset/chart/all-min.js"></script><script type="text/javascript">$(function(){ var mwebii=0; var mwebChartEleId = 'mweb-chart-ele-'; $('pre>code').each(function(){ mwebii++; var eleiid = mwebChartEleId+mwebii; if($(this).hasClass('language-sequence')){ var ele = $(this).addClass('nohighlight').parent(); $('<div id="'+eleiid+'"></div>').insertAfter(ele); ele.hide(); var diagram = Diagram.parse($(this).text()); diagram.drawSVG(eleiid,{theme: 'simple'}); }else if($(this).hasClass('language-flow')){ var ele = $(this).addClass('nohighlight').parent(); $('<div id="'+eleiid+'"></div>').insertAfter(ele); ele.hide(); var diagram = flowchart.parse($(this).text()); diagram.drawSVG(eleiid); } });});</script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script><script type="text/x-mathjax-config">MathJax.Hub.Config({TeX: { equationNumbers: { autoNumber: "AMS" } }});</script>
</body>
</html>