-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path15293277939094.html
671 lines (450 loc) · 24.4 KB
/
15293277939094.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
<!doctype html>
<html class="no-js" lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>
Java 并发编程:阻塞队列 - Junkman
</title>
<link href="atom.xml" rel="alternate" title="Junkman" type="application/atom+xml">
<link rel="stylesheet" href="asset/css/foundation.min.css" />
<link rel="stylesheet" href="asset/css/docs.css" />
<script src="asset/js/vendor/modernizr.js"></script>
<script src="asset/js/vendor/jquery.js"></script>
<script src="asset/highlightjs/highlight.pack.js"></script>
<link href="asset/highlightjs/styles/github.css" media="screen, projection" rel="stylesheet" type="text/css">
<script>hljs.initHighlightingOnLoad();</script>
<script type="text/javascript">
function before_search(){
var searchVal = 'site:panlw.github.io ' + document.getElementById('search_input').value;
document.getElementById('search_q').value = searchVal;
return true;
}
</script>
</head>
<body class="antialiased hide-extras">
<div class="marketing off-canvas-wrap" data-offcanvas>
<div class="inner-wrap">
<nav class="top-bar docs-bar hide-for-small" data-topbar>
<section class="top-bar-section">
<div class="row">
<div style="position: relative;width:100%;"><div style="position: absolute; width:100%;">
<ul id="main-menu" class="left">
<li id=""><a target="self" href="index.html">Home</a></li>
<li id=""><a target="_self" href="archives.html">Archives</a></li>
</ul>
<ul class="right" id="search-wrap">
<li>
<form target="_blank" onsubmit="return before_search();" action="http://google.com/search" method="get">
<input type="hidden" id="search_q" name="q" value="" />
<input tabindex="1" type="search" id="search_input" placeholder="Search"/>
</form>
</li>
</ul>
</div></div>
</div>
</section>
</nav>
<nav class="tab-bar show-for-small">
<a href="javascript:void(0)" class="left-off-canvas-toggle menu-icon">
<span> Junkman</span>
</a>
</nav>
<aside class="left-off-canvas-menu">
<ul class="off-canvas-list">
<li><a href="index.html">HOME</a></li>
<li><a href="archives.html">Archives</a></li>
<li><a href="about.html">ABOUT</a></li>
<li><label>Categories</label></li>
<li><a href="Infra.html">Infra</a></li>
<li><a href="Coding.html">Coding</a></li>
<li><a href="Modeling.html">Modeling</a></li>
<li><a href="Archtecting.html">Archtecting</a></li>
</ul>
</aside>
<a class="exit-off-canvas" href="#"></a>
<section id="main-content" role="main" class="scroll-container">
<script type="text/javascript">
$(function(){
$('#menu_item_index').addClass('is_active');
});
</script>
<div class="row">
<div class="large-8 medium-8 columns">
<div class="markdown-body article-wrap">
<div class="article">
<h1>Java 并发编程:阻塞队列</h1>
<div class="read-more clearfix">
<span class="date">2018/6/18</span>
<span>posted in </span>
<span class="posted-in"><a href='Cocurrency.html'>Cocurrency</a></span>
<span class="comments">
</span>
</div>
</div><!-- article -->
<div class="article-content">
<blockquote>
<p>作者:海子<br/>
原文:<a href="http://www.cnblogs.com/dolphin0520/p/3932906.html">http://www.cnblogs.com/dolphin0520/p/3932906.html</a></p>
</blockquote>
<p>使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者 - 生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。</p>
<p>本文先讲述一下 java.util.concurrent 包下提供主要的几种阻塞队列,然后分析了阻塞队列和非阻塞队列的中的各个方法,接着分析了阻塞队列的实现原理,最后给出了一个实际例子和几个使用场景。</p>
<p> 一. 几种主要的阻塞队列</p>
<p> 二. 阻塞队列中的方法 VS 非阻塞队列中的方法</p>
<p> 三. 阻塞队列的实现原理</p>
<p> 四. 示例和使用场景</p>
<p>若有不正之处请多多谅解,并欢迎批评指正。</p>
<h2 id="toc_0">一. 几种主要的阻塞队列</h2>
<p>自从 Java 1.5 之后,在 java.util.concurrent 包下提供了若干个阻塞队列,主要有以下几个:</p>
<ul>
<li><p>ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建 ArrayBlockingQueue 对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。</p></li>
<li><p>LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建 LinkedBlockingQueue 对象时如果不指定容量大小,则默认大小为 Integer.MAX_VALUE。</p></li>
<li><p>PriorityBlockingQueue:以上 2 种队列都是先进先出队列,而 PriorityBlockingQueue 却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面 2 种都是有界队列。</p></li>
<li><p>DelayQueue:基于 PriorityQueue,一种延时阻塞队列,DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue 也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。</p></li>
</ul>
<h2 id="toc_1">二. 阻塞队列中的方法 VS 非阻塞队列中的方法</h2>
<p><strong>1. 非阻塞队列中的几个主要方法:</strong></p>
<ul>
<li><p><strong>add(E e):</strong>将元素 e 插入到队列末尾,如果插入成功,则返回 true;如果插入失败(即队列已满),则会抛出异常;</p></li>
<li><p><strong>remove():</strong>移除队首元素,若移除成功,则返回 true;如果移除失败(队列为空),则会抛出异常;</p></li>
<li><p><strong>offer(E e):</strong>将元素 e 插入到队列末尾,如果插入成功,则返回 true;如果插入失败(即队列已满),则返回 false;</p></li>
<li><p><strong>poll():</strong>移除并获取队首元素,若成功,则返回队首元素;否则返回 null;</p></li>
<li><p><strong>peek():</strong>获取队首元素,若成功,则返回队首元素;否则返回 null</p></li>
</ul>
<p>对于非阻塞队列,一般情况下建议使用 offer、poll 和 peek 三个方法,不建议使用 add 和 remove 方法。因为使用 offer、poll 和 peek 三个方法可以通过返回值判断操作成功与否,而使用 add 和 remove 方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。</p>
<p><strong>2. 阻塞队列中的几个主要方法:</strong></p>
<p>阻塞队列包括了非阻塞队列中的大部分方法,上面列举的 5 个方法在阻塞队列中都存在,但是要注意这 5 个方法在阻塞队列中都进行了同步措施。</p>
<p>除此之外,阻塞队列提供了另外 4 个非常有用的方法:</p>
<p> put(E e)</p>
<p> take()</p>
<p> offer(E e,long timeout, TimeUnit unit)</p>
<p> poll(long timeout, TimeUnit unit)</p>
<p>这四个方法的理解:</p>
<p> put 方法用来向队尾存入元素,如果队列满,则等待;</p>
<p> take 方法用来从队首取元素,如果队列为空,则等待;</p>
<p> offer 方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回 false;否则返回 true;</p>
<p> poll 方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回 null;否则返回取得的元素;</p>
<h2 id="toc_2">三. 阻塞队列的实现原理</h2>
<p>前面谈到了非阻塞队列和阻塞队列中常用的方法,下面来探讨阻塞队列的实现原理,本文以 ArrayBlockingQueue 为例,其他阻塞队列实现原理可能和 ArrayBlockingQueue 有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。</p>
<p>首先看一下 ArrayBlockingQueue 类中的几个成员变量:</p>
<pre><code class="language-java">public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
</code></pre>
<p>可以看出,ArrayBlockingQueue 中用来存储元素的实际上是一个数组,takeIndex 和 putIndex 分别表示队首元素和队尾元素的下标,count 表示队列中元素的个数。</p>
<p>lock 是一个可重入锁,notEmpty 和 notFull 是等待条件。</p>
<p>下面看一下 ArrayBlockingQueue 的构造器,构造器有三个重载版本:</p>
<pre><code class="language-java">public ArrayBlockingQueue(int capacity) {
}
public ArrayBlockingQueue(int capacity, boolean fair) {
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
}
</code></pre>
<p>第一个构造器只有一个参数用来指定容量,第二个构造器可以指定容量和公平性,第三个构造器可以指定容量、公平性以及用另外一个集合进行初始化。</p>
<p>然后看它的两个关键方法的实现:put() 和 take():</p>
<pre><code class="language-java">public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
</code></pre>
<p>从 put 方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用 notFull.await() 进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。</p>
<p>当被其他线程唤醒时,通过 insert(e) 方法插入元素,最后解锁。</p>
<p>我们看一下 insert 方法的实现:</p>
<pre><code class="language-java">private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
</code></pre>
<p>它是一个 private 方法,插入成功后,通过 notEmpty 唤醒正在等待取元素的线程。</p>
<p>下面是 take() 方法的实现:</p>
<pre><code class="language-java">public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal();
// propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
</code></pre>
<p>跟 put 方法实现很类似,只不过 put 方法等待的是 notFull 信号,而 take 方法等待的是 notEmpty 信号。</p>
<p>在 take 方法中,如果可以取元素,则通过 extract 方法取得元素,下面是 extract 方法的实现:</p>
<pre><code class="language-java">private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
</code></pre>
<p>跟 insert 方法也很类似。</p>
<p>其实从这里大家应该明白了阻塞队列的实现原理,事实它和我们用 Object.wait()、Object.notify() 和非阻塞队列实现生产者 - 消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。</p>
<h2 id="toc_3">四. 示例和使用场景</h2>
<p>下面先使用 Object.wait() 和 Object.notify()、非阻塞队列实现生产者 - 消费者模式:</p>
<pre><code class="language-java">public class Test {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while(true){
synchronized (queue) {
while(queue.size() == 0){
try {
System.out.println("队列空,等待数据");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.poll(); //每次移走队首元素
queue.notify();
System.out.println("从队列取走一个元素,队列剩余"+
queue.size()+"个元素");
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
try {
System.out.println("队列满,等待有空余空间");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
queue.notify();
}
}
queue.offer(1); //每次插入一个元素
queue.notify();
System.out.println("向队列取中插入一个元素,队列剩余空间:"+
(queueSize-queue.size()));
}
}
}
}
}
</code></pre>
<p>这个是经典的生产者 - 消费者模式,通过阻塞队列和 Object.wait() 和 Object.notify() 实现,wait() 和 notify() 主要用来实现线程间通信。</p>
<p>具体的线程间通信方式(wait 和 notify 的使用)在后续问章中会讲述到。</p>
<p>下面是使用阻塞队列实现的生产者 - 消费者模式:</p>
<pre><code class="language-java">public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue =
new ArrayBlockingQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while(true){
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余"+
queue.size()+"个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
while(true){
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:"+
(queueSize-queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
</code></pre>
<p>有没有发现,使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。</p>
<p>在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。</p>
<p>阻塞队列使用最经典的场景就是 socket 客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者 - 消费者模型的都可以使用阻塞队列。</p>
</div>
<div class="row">
<div class="large-6 columns">
<p class="text-left" style="padding:15px 0px;">
<a href="15293283879436.html"
title="Previous Post: 精讲 Redis 内存模型">« 精讲 Redis 内存模型</a>
</p>
</div>
<div class="large-6 columns">
<p class="text-right" style="padding:15px 0px;">
<a href="15293276617956.html"
title="Next Post: Java如何实现任务超时处理">Java如何实现任务超时处理 »</a>
</p>
</div>
</div>
<div class="comments-wrap">
<div class="share-comments">
<script type="text/javascript" src="//s7.addthis.com/js/300/addthis_widget.js#pubid=ra-5ae58078c0d7b2ab"></script>
</div>
</div>
</div><!-- article-wrap -->
</div><!-- large 8 -->
<div class="large-4 medium-4 columns">
<div class="hide-for-small">
<div id="sidebar" class="sidebar">
<div id="site-info" class="site-info">
<div class="site-a-logo"><img src="./asset/img/logo.jpg" /></div>
<h1>Junkman</h1>
<div class="site-des">“拾荒者”一词来自凯文・凯利的《失控》中关于机器学习的故事(“收集癖好机”如何完成他的收集工作)。</div>
<div class="social">
<a target="_blank" class="github" target="_blank" href="https://github.com/panlw/" title="GitHub">GitHub</a>
<a target="_blank" class="rss" href="atom.xml" title="RSS">RSS</a>
</div>
</div>
<div id="site-categories" class="side-item ">
<div class="side-header">
<h2>Categories</h2>
</div>
<div class="side-content">
<p class="cat-list">
<a href="Infra.html"><strong>Infra</strong></a>
<a href="Coding.html"><strong>Coding</strong></a>
<a href="Modeling.html"><strong>Modeling</strong></a>
<a href="Archtecting.html"><strong>Archtecting</strong></a>
</p>
</div>
</div>
<div id="site-categories" class="side-item">
<div class="side-header">
<h2>Recent Posts</h2>
</div>
<div class="side-content">
<ul class="posts-list">
<li class="post">
<a href="15517999043443.html">The Art of Crafting Architectural Diagrams</a>
</li>
<li class="post">
<a href="15517997955971.html">为什么说我们需要软件架构图?</a>
</li>
<li class="post">
<a href="15516128677869.html">DNS Servers That Offer Privacy and Filtering</a>
</li>
<li class="post">
<a href="15516123108194.html">Airbnb's Migration from Monolith to Services</a>
</li>
<li class="post">
<a href="15516097487470.html">Events As First-Class Citizens</a>
</li>
</ul>
</div>
</div>
</div><!-- sidebar -->
</div><!-- hide for small -->
</div><!-- large 4 -->
</div><!-- row -->
<div class="page-bottom clearfix">
<div class="row">
<p class="copyright">Copyright © 2015
Powered by <a target="_blank" href="http://www.mweb.im">MWeb</a>,
Theme used <a target="_blank" href="http://github.com">GitHub CSS</a>.</p>
</div>
</div>
</section>
</div>
</div>
<script src="asset/js/foundation.min.js"></script>
<script>
$(document).foundation();
function fixSidebarHeight(){
var w1 = $('.markdown-body').height();
var w2 = $('#sidebar').height();
if (w1 > w2) { $('#sidebar').height(w1); };
}
$(function(){
fixSidebarHeight();
})
$(window).load(function(){
fixSidebarHeight();
});
</script>
<script src="asset/chart/all-min.js"></script><script type="text/javascript">$(function(){ var mwebii=0; var mwebChartEleId = 'mweb-chart-ele-'; $('pre>code').each(function(){ mwebii++; var eleiid = mwebChartEleId+mwebii; if($(this).hasClass('language-sequence')){ var ele = $(this).addClass('nohighlight').parent(); $('<div id="'+eleiid+'"></div>').insertAfter(ele); ele.hide(); var diagram = Diagram.parse($(this).text()); diagram.drawSVG(eleiid,{theme: 'simple'}); }else if($(this).hasClass('language-flow')){ var ele = $(this).addClass('nohighlight').parent(); $('<div id="'+eleiid+'"></div>').insertAfter(ele); ele.hide(); var diagram = flowchart.parse($(this).text()); diagram.drawSVG(eleiid); } });});</script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script><script type="text/x-mathjax-config">MathJax.Hub.Config({TeX: { equationNumbers: { autoNumber: "AMS" } }});</script>
</body>
</html>