-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path15283864664932.html
1300 lines (937 loc) · 56 KB
/
15283864664932.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!doctype html>
<html class="no-js" lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>
手把手教你搭建一个基于 Java 的分布式爬虫系统 - Junkman
</title>
<link href="atom.xml" rel="alternate" title="Junkman" type="application/atom+xml">
<link rel="stylesheet" href="asset/css/foundation.min.css" />
<link rel="stylesheet" href="asset/css/docs.css" />
<script src="asset/js/vendor/modernizr.js"></script>
<script src="asset/js/vendor/jquery.js"></script>
<script src="asset/highlightjs/highlight.pack.js"></script>
<link href="asset/highlightjs/styles/github.css" media="screen, projection" rel="stylesheet" type="text/css">
<script>hljs.initHighlightingOnLoad();</script>
<script type="text/javascript">
function before_search(){
var searchVal = 'site:panlw.github.io ' + document.getElementById('search_input').value;
document.getElementById('search_q').value = searchVal;
return true;
}
</script>
</head>
<body class="antialiased hide-extras">
<div class="marketing off-canvas-wrap" data-offcanvas>
<div class="inner-wrap">
<nav class="top-bar docs-bar hide-for-small" data-topbar>
<section class="top-bar-section">
<div class="row">
<div style="position: relative;width:100%;"><div style="position: absolute; width:100%;">
<ul id="main-menu" class="left">
<li id=""><a target="self" href="index.html">Home</a></li>
<li id=""><a target="_self" href="archives.html">Archives</a></li>
</ul>
<ul class="right" id="search-wrap">
<li>
<form target="_blank" onsubmit="return before_search();" action="http://google.com/search" method="get">
<input type="hidden" id="search_q" name="q" value="" />
<input tabindex="1" type="search" id="search_input" placeholder="Search"/>
</form>
</li>
</ul>
</div></div>
</div>
</section>
</nav>
<nav class="tab-bar show-for-small">
<a href="javascript:void(0)" class="left-off-canvas-toggle menu-icon">
<span> Junkman</span>
</a>
</nav>
<aside class="left-off-canvas-menu">
<ul class="off-canvas-list">
<li><a href="index.html">HOME</a></li>
<li><a href="archives.html">Archives</a></li>
<li><a href="about.html">ABOUT</a></li>
<li><label>Categories</label></li>
<li><a href="Infra.html">Infra</a></li>
<li><a href="Coding.html">Coding</a></li>
<li><a href="Modeling.html">Modeling</a></li>
<li><a href="Archtecting.html">Archtecting</a></li>
</ul>
</aside>
<a class="exit-off-canvas" href="#"></a>
<section id="main-content" role="main" class="scroll-container">
<script type="text/javascript">
$(function(){
$('#menu_item_index').addClass('is_active');
});
</script>
<div class="row">
<div class="large-8 medium-8 columns">
<div class="markdown-body article-wrap">
<div class="article">
<h1>手把手教你搭建一个基于 Java 的分布式爬虫系统</h1>
<div class="read-more clearfix">
<span class="date">2018/6/7</span>
<span>posted in </span>
<span class="posted-in"><a href='Modeling.html'>Modeling</a></span>
<span class="comments">
</span>
</div>
</div><!-- article -->
<div class="article-content">
<blockquote>
<p>原创: 叶泳豪 51CTO技术栈 5月8日<br/>
<a href="http://developer.51cto.com/art/201805/572935.htm">http://developer.51cto.com/art/201805/572935.htm</a></p>
</blockquote>
<p>在不用爬虫框架的情况下,我经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似 MySQL、HBase 等。</p>
<p>【51CTO.com 原创稿件】在不用爬虫框架的情况下,我经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似 MySQL、HBase 等。</p>
<p>因为此系统基于面向接口的编码思想来开发,所以具有一定的扩展性,有兴趣的朋友直接看一下代码,就能理解其设计思想。</p>
<p>虽然代码目前来说很多地方还是比较紧耦合,但只要花些时间和精力,很多都是可抽取出来并且可配置化的。</p>
<p>因为时间的关系,我只写了京东和苏宁易购两个网站的爬虫,但是完全可以实现不同网站爬虫的随机调度,基于其代码结构,再写国美、天猫等的商品爬取,难度不大,但是估计需要花些时间和精力。</p>
<p>因为在解析网页的数据时,比如我在爬取苏宁易购商品的价格时,价格是异步获取的,并且其 API 是一长串的数字组合,我花了几个小时的时间才发现其规律,当然也承认,我的经验不足。</p>
<p>这个系统的设计,除了基本的数据爬取以外,更关注以下几个方面的问题:</p>
<ul>
<li> 如何实现分布式?同一个程序打包后分发到不同的节点运行时,不影响整体的数据爬取。</li>
<li> 如何实现 URL 随机循环调度?核心是针对不同的顶级域名做随机。</li>
<li> 如何定时向 URL 仓库中添加种子 URL?达到不让爬虫系统停下来的目的。</li>
<li> 如何实现对爬虫节点程序的监控,并能够发邮件报警?</li>
<li> 如何实现一个随机 IP 代理库?目的跟第 2 点有点类似,都是为了反反爬虫。</li>
</ul>
<p>下面会针对这个系统来做一个整体的基本介绍,我在代码中都有非常详细的注释,有兴趣的朋友可以参考一下代码,最后我会给出一些我爬虫时的数据分析。</p>
<p>另外需要注意的是,这个爬虫系统是基于 Java 实现的,但是语言本身仍然不是最重要的,有兴趣的朋友可以尝试用 Python 实现。</p>
<p><strong>分布式爬虫系统架构</strong></p>
<p>整体系统架构如下:</p>
<p><a href="http://s3.51cto.com/oss/201805/09/1ec03f13dff96ba2a1cb42795b7a1ba1.jpg"><img src="http://s3.51cto.com/oss/201805/09/1ec03f13dff96ba2a1cb42795b7a1ba1.jpg" alt=""/></a></p>
<p>从上面的架构可以看出,整个系统主要分为三个部分:</p>
<ul>
<li> 爬虫系统</li>
<li> URL 调度系统</li>
<li> 监控报警系统</li>
</ul>
<p>爬虫系统是用来爬取数据的,因为系统设计为分布式,因此,爬虫程序本身可以运行在不同的服务器节点上。</p>
<p>URL 调度系统核心在于 URL 仓库,所谓的 URL 仓库其实就是用 Redis 保存了需要爬取的 URL 列表,并且在我们的 URL 调度器中根据一定的策略来消费其中的 URL。从这个角度考虑,URL 仓库其实也是一个 URL 队列。</p>
<p>监控报警系统主要是对爬虫节点进行监控,虽然并行执行的爬虫节点中的某一个挂掉了对整体数据爬取本身没有影响(只是降低了爬虫的速度),但是我们还是希望能够主动接收到节点挂掉的通知,而不是被动地发现。</p>
<p>下面将针对以上三个方面并结合部分代码片段来对整个系统的设计思路做一些基本的介绍。</p>
<p><strong>爬虫系统</strong></p>
<p><a href="http://s3.51cto.com/oss/201805/09/17cd163d986b85ed00354f60c90e8ca6.jpg"><img src="http://s3.51cto.com/oss/201805/09/17cd163d986b85ed00354f60c90e8ca6.jpg" alt=""/></a></p>
<p>爬虫系统是一个独立运行的进程,我们把我们的爬虫系统打包成 jar 包,然后分发到不同的节点上执行,这样并行爬取数据可以提高爬虫的效率。(说明:ZooKeeper 监控属于监控报警系统,URL 调度器属于 URL 调度系统)</p>
<p><strong>随机 IP 代理器</strong></p>
<p>加入随机 IP 代理主要是为了反反爬虫,因此如果有一个 IP 代理库,并且可以在构建 http 客户端时随机地使用不同的代理,那么对我们进行反反爬虫会有很大的帮助。</p>
<p>在系统中使用 IP 代理库,需要先在文本文件中添加可用的代理地址信息:</p>
<pre><code class="language-txt"># IPProxyRepository.txt
58.60.255.104:8118
219.135.164.245:3128
27.44.171.27:9999
219.135.164.245:3128
58.60.255.104:8118
58.252.6.165:9000
......
</code></pre>
<p>需要注意的是,上面的代理 IP 是我在西刺代理上拿到的一些代理 IP,不一定可用,建议是自己花钱购买一批代理 IP,这样可以节省很多时间和精力去寻找代理 IP。</p>
<p>然后在构建 http 客户端的工具类中,当第一次使用工具类时,会把这些代理 IP 加载进内存中,加载到 Java 的一个 HashMap:</p>
<pre><code class="language-java">// IP地址代理库Map
private static Map<String, Integer> IPProxyRepository = new HashMap<>();
private static String[] keysArray = null; // keysArray是为了方便生成随机的代理对象
/**
* 初次使用时使用静态代码块将IP代理库加载进set中
*/
static {
InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream("IPProxyRepository.txt"); // 加载包含代理IP的文本
// 构建缓冲流对象
InputStreamReader isr = new InputStreamReader(in);
BufferedReader bfr = new BufferedReader(isr);
String line = null;
try {
// 循环读每一行,添加进map中
while ((line = bfr.readLine()) != null) {
String[] split = line.split(":"); // 以:作为分隔符,即文本中的数据格式应为192.168.1.1:4893
String host = split[0];
int port = Integer.valueOf(split[1]);
IPProxyRepository.put(host, port);
}
Set<String> keys = IPProxyRepository.keySet();
keysArray = keys.toArray(new String[keys.size()]); // keysArray是为了方便生成随机的代理对象
} catch (IOException e) {
e.printStackTrace();
}
}
</code></pre>
<p>之后,在每次构建 http 客户端时,都会先到 map 中看是否有代理 IP,有则使用,没有则不使用代理:</p>
<pre><code class="language-java">/**
* 随机返回一个代理对象
*
* @return
*/
public static HttpHost getRandomProxy() {
// 随机获取host:port,并构建代理对象
Random random = new Random();
String host = keysArray[random.nextInt(keysArray.length)];
int port = IPProxyRepository.get(host);
HttpHost proxy = new HttpHost(host, port); // 设置http代理
return proxy;
}
</code></pre>
<p>随机代理对象则通过下面的方法生成:</p>
<pre><code class="language-java">/**
* 随机返回一个代理对象
*
* @return
*/
public static HttpHost getRandomProxy() {
// 随机获取host:port,并构建代理对象
Random random = new Random();
String host = keysArray[random.nextInt(keysArray.length)];
int port = IPProxyRepository.get(host);
HttpHost proxy = new HttpHost(host, port); // 设置http代理
return proxy;
}
</code></pre>
<p>这样,通过上面的设计,基本就实现了随机 IP 代理器的功能,当然,其中还有很多可以完善的地方。</p>
<p>比如,当使用这个 IP 代理而请求失败时,是否可以把这一情况记录下来;当超过一定次数时,再将其从代理库中删除,同时生成日志供开发人员或运维人员参考,这是完全可以实现的,不过我就不做这一步功能了。</p>
<p><strong>网页下载器</strong></p>
<p>网页下载器就是用来下载网页中的数据,主要基于下面的接口开发:</p>
<pre><code class="language-java">/**
* 网页数据下载
*/
public interface IDownload {
/**
* 下载给定url的网页数据
* @param url
* @return
*/
public Page download(String url);
}
</code></pre>
<p>基于此,在系统中只实现了一个 http get 的下载器,但是也可以完成我们所需要的功能了:</p>
<pre><code class="language-java">/**
* 数据下载实现类
*/
public class HttpGetDownloadImpl implements IDownload {
@Override
public Page download(String url) {
Page page = new Page();
String content = HttpUtil.getHttpContent(url); // 获取网页数据
page.setUrl(url);
page.setContent(content);
return page;
}
}
</code></pre>
<p><strong>网页解析器</strong></p>
<p>网页解析器就是把下载的网页中我们感兴趣的数据解析出来,并保存到某个对象中,供数据存储器进一步处理以保存到不同的持久化仓库中,其基于下面的接口进行开发:</p>
<pre><code class="language-java">/**
* 网页数据解析
*/
public interface IParser {
public void parser(Page page);
}
</code></pre>
<p>网页解析器在整个系统的开发中也算是比较重头戏的一个组件,功能不复杂,主要是代码比较多,针对不同的商城不同的商品,对应的解析器可能就不一样了。</p>
<p>因此需要针对特别的商城的商品进行开发,因为很显然,京东用的网页模板跟苏宁易购的肯定不一样,天猫用的跟京东用的也肯定不一样。</p>
<p>所以这个完全是看自己的需要来进行开发了,只是说,在解析器开发的过程当中会发现有部分重复代码,这时就可以把这些代码抽象出来开发一个工具类了。</p>
<p>目前在系统中爬取的是京东和苏宁易购的手机商品数据,因此就写了这两个实现类:</p>
<pre><code class="language-java">/**
* 解析京东商品的实现类
*/
public class JDHtmlParserImpl implements IParser {
......
}
/**
* 苏宁易购网页解析
*/
public class SNHtmlParserImpl implements IParser {
......
}
</code></pre>
<p><strong>数据存储器</strong></p>
<p><a href="http://s4.51cto.com/oss/201805/09/2979ac425e0fbcfcf2f078e6f97842f5.jpg-wh_600x-s_1985411508.jpg"><img src="http://s4.51cto.com/oss/201805/09/2979ac425e0fbcfcf2f078e6f97842f5.jpg-wh_600x-s_1985411508.jpg" alt=""/></a></p>
<p>数据存储器主要是将网页解析器解析出来的数据对象保存到不同的表格,而对于本次爬取的手机商品,数据对象是下面一个 Page 对象:</p>
<pre><code class="language-java">/**
* 网页对象,主要包含网页内容和商品数据
*/
public class Page {
private String content; // 网页内容
private String id; // 商品Id
private String source; // 商品来源
private String brand; // 商品品牌
private String title; // 商品标题
private float price; // 商品价格
private int commentCount; // 商品评论数
private String url; // 商品地址
private String imgUrl; // 商品图片地址
private String params; // 商品规格参数
private List<String> urls = new ArrayList<>(); // 解析列表页面时用来保存解析的商品url的容器
}
</code></pre>
<p>对应的,在 MySQL 中,表数据结构如下:</p>
<pre><code class="language-sql">-- ----------------------------
-- Table structure for phone
-- ----------------------------
DROP TABLE IF EXISTS `phone`;
CREATE TABLE `phone` (
`id` varchar(30) CHARACTER SET armscii8 NOT NULL COMMENT '商品id',
`source` varchar(30) NOT NULL COMMENT '商品来源,如jd suning gome等',
`brand` varchar(30) DEFAULT NULL COMMENT '手机品牌',
`title` varchar(255) DEFAULT NULL COMMENT '商品页面的手机标题',
`price` float(10,2) DEFAULT NULL COMMENT '手机价格',
`comment_count` varchar(30) DEFAULT NULL COMMENT '手机评论',
`url` varchar(500) DEFAULT NULL COMMENT '手机详细信息地址',
`img_url` varchar(500) DEFAULT NULL COMMENT '图片地址',
`params` text COMMENT '手机参数,json格式存储',
PRIMARY KEY (`id`,`source`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
</code></pre>
<p>而在 HBase 中的表结构则为如下:</p>
<pre><code class="language-log">## cf1 存储 id source price comment brand url
## cf2 存储 title params imgUrl
create 'phone', 'cf1', 'cf2'
## 在HBase shell中查看创建的表
hbase(main):135:0> desc 'phone'
Table phone is ENABLED
phone
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536', REPLICATION_SCOPE => '0'}
{NAME => 'cf2', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK
_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536', REPLICATION_SCOPE => '0'}
2 row(s) in 0.0350 seconds
</code></pre>
<p>即在 HBase 中建立了两个列族,分别为 cf1、cf2,其中 cf1 用来保存 id source price comment brand url 字段信息;cf2 用来保存 title params imgUrl 字段信息。</p>
<p>不同的数据存储用的是不同的实现类,但是其都是基于下面同一个接口开发的:</p>
<pre><code class="language-java">/**
* 商品数据的存储
*/
public interface IStore {
public void store(Page page);
}
</code></pre>
<p>然后基于此开发了 MySQL 的存储实现类、HBase 的存储实现类还有控制台的输出实现类,如 MySQL 的存储实现类,其实就是简单的数据插入语句:</p>
<pre><code class="language-java">/**
* 使用dbc数据库连接池将数据写入mysql表中
*/
public class MySQLStoreImpl implements IStore {
private QueryRunner queryRunner = new QueryRunner(DBCPUtil.getDataSource());
@Override
public void store(Page page) {
String sql = "insert into phone(id, source, brand, title, price, comment_count, url, img_url, params) values(?, ?, ?, ?, ?, ?, ?, ?, ?)";
try {
queryRunner.update(sql, page.getId(),
page.getSource(),
page.getBrand(),
page.getTitle(),
page.getPrice(),
page.getCommentCount(),
page.getUrl(),
page.getImgUrl(),
page.getParams());
} catch (SQLException e) {
e.printStackTrace();
}
}
}
</code></pre>
<p>而 HBase 的存储实现类,则是 HBase Java API 的常用插入语句代码:</p>
<pre><code class="language-java">......
// cf1:price
Put pricePut = new Put(rowKey);
// 必须要做是否为null判断,否则会有空指针异常
pricePut.addColumn(cf1, "price".getBytes(), page.getPrice() != null ? String.valueOf(page.getPrice()).getBytes() : "".getBytes());
puts.add(pricePut);
// cf1:comment
Put commentPut = new Put(rowKey);
commentPut.addColumn(cf1, "comment".getBytes(), page.getCommentCount() != null ? String.valueOf(page.getCommentCount()).getBytes() : "".getBytes());
puts.add(commentPut);
// cf1:brand
Put brandPut = new Put(rowKey);
brandPut.addColumn(cf1, "brand".getBytes(), page.getBrand() != null ? page.getBrand().getBytes() : "".getBytes());
puts.add(brandPut);
......
</code></pre>
<p>当然,至于要将数据存储在哪个地方,在初始化爬虫程序时,是可以手动选择的:</p>
<pre><code class="language-java">// 3.注入存储器
iSpider.setStore(new HBaseStoreImpl());
</code></pre>
<p>目前还没有把代码写成可以同时存储在多个地方,按照目前代码的架构,要实现这一点也比较简单,修改一下相应代码就好了。</p>
<p>实际上,是可以先把数据保存到 MySQL 中,然后通过 Sqoop 导入到 HBase 中,详细操作可以参考我写的 Sqoop 文章。</p>
<p>仍然需要注意的是,如果确定需要将数据保存到 HBase 中,请保证你有可用的集群环境,并且需要将如下配置文档添加到 classpath 下:</p>
<pre><code class="language-txt">core-site.xml
hbase-site.xml
hdfs-site.xml
</code></pre>
<p>对大数据感兴趣的同学可以折腾一下这一点,如果之前没有接触过的,直接使用 MySQL 存储就好了,只需要在初始化爬虫程序时注入 MySQL 存储器即可:</p>
<pre><code class="language-java">// 3.注入存储器
iSpider.setStore(new MySQLStoreImpl());
</code></pre>
<p>URL 调度系统</p>
<p><a href="http://s1.51cto.com/oss/201805/09/ae7addb90d2c16385a177bac153cfcf5.jpg"><img src="http://s1.51cto.com/oss/201805/09/ae7addb90d2c16385a177bac153cfcf5.jpg" alt=""/></a></p>
<p>URL 调度系统是实现整个爬虫系统分布式的桥梁与关键,正是通过 URL 调度系统的使用,才使得整个爬虫系统可以较为高效(Redis 作为存储)随机地获取 URL,并实现整个系统的分布式。</p>
<p><strong>URL 仓库</strong></p>
<p>通过架构图可以看出,所谓的 URL 仓库不过是 Redis 仓库,即在我们的系统中使用 Redis 来保存 URL 地址列表。</p>
<p>正是这样,才能保证我们的程序实现分布式,只要保存了 URL 是唯一的,这样不管我们的爬虫程序有多少个,最终保存下来的数据都是只有唯一一份的,而不会重复。</p>
<p>同时 URL 仓库中的 URL 地址在获取时的策略是通过队列的方式来实现的,待会通过 URL 调度器的实现即可知道。</p>
<p>另外,在我们的 URL 仓库中,主要保存了下面的数据:</p>
<p><strong>种子 URL 列表,Redis 的数据类型为 list</strong></p>
<p>种子 URL 是持久化存储的,一定时间后,由 URL 定时器通过种子 URL 获取 URL,并将其注入到我们的爬虫程序需要使用的高优先级 URL 队列中。</p>
<p>这样就可以保证我们的爬虫程序可以源源不断地爬取数据而不需要中止程序的执行。</p>
<p><strong>高优先级 URL 队列,Redis 的数据类型为 set</strong></p>
<p>什么是高优先级 URL 队列?其实它就是用来保存列表 URL 的。那么什么是列表 URL 呢?</p>
<p>说白了就是一个列表中含有多个商品,以京东为例,我们打开一个手机列表:</p>
<p>该地址中包含的不是一个具体商品的 URL,而是包含了多个我们需要爬取的数据(手机商品)的列表。</p>
<p><a href="http://s4.51cto.com/oss/201805/09/797107def9ba8263c95d8d7d632fce1c.jpg"><img src="http://s4.51cto.com/oss/201805/09/797107def9ba8263c95d8d7d632fce1c.jpg" alt=""/></a></p>
<p>通过对每个高级 URL 的解析,我们可以获取到非常多的具体商品 URL,而具体的商品 URL,就是低优先 URL,其会保存到低优先级 URL 队列中。</p>
<p>那么以这个系统为例,保存的数据类似如下:</p>
<pre><code class="language-log">jd.com.higher
--https://list.jd.com/list.html?cat=9987,653,655&page=1
...
suning.com.higher
--https://list.suning.com/0-20006-0.html
...
</code></pre>
<p><strong>低优先级 URL 队列,Redis 的数据类型为 set</strong></p>
<p>低优先级 URL 其实就是具体某个商品的 URL,如下面一个手机商品:</p>
<p><a href="http://s2.51cto.com/oss/201805/09/71ae2263e5ba4b9caa567fd704d659dc.jpg"><img src="http://s2.51cto.com/oss/201805/09/71ae2263e5ba4b9caa567fd704d659dc.jpg" alt=""/></a></p>
<p>通过下载该 URL 的数据,并对其进行解析,就能够获取到我们想要的数据。</p>
<p>那么以这个系统为例,保存的数据类似如下:</p>
<pre><code class="language-log">jd.com.lower
--https://item.jd.com/23545806622.html
...
suning.com.lower
--https://product.suning.com/0000000000/690128156.html
...
</code></pre>
<p><strong>URL 调度器</strong></p>
<p>所谓 URL 调度器,就是 URL 仓库 Java 代码的调度策略,不过因为其核心在于调度,所以将其放到 URL 调度器中来进行说明,目前其调度基于以下接口开发:</p>
<pre><code class="language-java">/**
* url 仓库
* 主要功能:
* 向仓库中添加url(高优先级的列表,低优先级的商品url)
* 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
*
*/
public interface IRepository {
/**
* 获取url的方法
* 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
* @return
*/
public String poll();
/**
* 向高优先级列表中添加商品列表url
* @param highUrl
*/
public void offerHigher(String highUrl);
/**
* 向低优先级列表中添加商品url
* @param lowUrl
*/
public void offerLower(String lowUrl);
}
</code></pre>
<p>其基于 Redis 作为 URL 仓库的实现如下:</p>
<pre><code class="language-java">/**
* 基于Redis的全网爬虫,随机获取爬虫url:
*
* Redis中用来保存url的数据结构如下:
* 1.需要爬取的域名集合(存储数据类型为set,这个需要先在Redis中添加)
* key
* spider.website.domains
* value(set)
* jd.com suning.com gome.com
* key由常量对象SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY 获得
* 2.各个域名所对应的高低优先url队列(存储数据类型为list,这个由爬虫程序解析种子url后动态添加)
* key
* jd.com.higher
* jd.com.lower
* suning.com.higher
* suning.com.lower
* gome.com.higher
* gome.come.lower
* value(list)
* 相对应需要解析的url列表
* key由随机的域名 + 常量 SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX或者SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX获得
* 3.种子url列表
* key
* spider.seed.urls
* value(list)
* 需要爬取的数据的种子url
* key由常量SpiderConstants.SPIDER_SEED_URLS_KEY获得
*
* 种子url列表中的url会由url调度器定时向高低优先url队列中
*/
public class RandomRedisRepositoryImpl implements IRepository {
/**
* 构造方法
*/
public RandomRedisRepositoryImpl() {
init();
}
/**
* 初始化方法,初始化时,先将redis中存在的高低优先级url队列全部删除
* 否则上一次url队列中的url没有消耗完时,再停止启动跑下一次,就会导致url仓库中有重复的url
*/
public void init() {
Jedis jedis = JedisUtil.getJedis();
Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);
String higherUrlKey;
String lowerUrlKey;
for(String domain : domains) {
higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
jedis.del(higherUrlKey, lowerUrlKey);
}
JedisUtil.returnJedis(jedis);
}
/**
* 从队列中获取url,目前的策略是:
* 1.先从高优先级url队列中获取
* 2.再从低优先级url队列中获取
* 对应我们的实际场景,应该是先解析完列表url再解析商品url
* 但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中
* 的url消耗完了,但实际上程序还在解析下一个高优先级url,此时,其它线程去获取高优先级队列url肯定获取不到
* 这时就会去获取低优先级队列中的url,在实际考虑分析时,这点尤其需要注意
* @return
*/
@Override
public String poll() {
// 从set中随机获取一个顶级域名
Jedis jedis = JedisUtil.getJedis();
String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); // jd.com
String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; // jd.com.higher
String url = jedis.lpop(key);
if(url == null) { // 如果为null,则从低优先级中获取
key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; // jd.com.lower
url = jedis.lpop(key);
}
JedisUtil.returnJedis(jedis);
return url;
}
/**
* 向高优先级url队列中添加url
* @param highUrl
*/
@Override
public void offerHigher(String highUrl) {
offerUrl(highUrl, SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
}
/**
* 向低优先url队列中添加url
* @param lowUrl
*/
@Override
public void offerLower(String lowUrl) {
offerUrl(lowUrl, SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
}
/**
* 添加url的通用方法,通过offerHigher和offerLower抽象而来
* @param url 需要添加的url
* @param urlTypeSuffix url类型后缀.higher或.lower
*/
public void offerUrl(String url, String urlTypeSuffix) {
Jedis jedis = JedisUtil.getJedis();
String domain = SpiderUtil.getTopDomain(url); // 获取url对应的顶级域名,如jd.com
String key = domain + urlTypeSuffix; // 拼接url队列的key,如jd.com.higher
jedis.lpush(key, url); // 向url队列中添加url
JedisUtil.returnJedis(jedis);
}
}
</code></pre>
<p>通过代码分析也可以知道,其核心就在如何调度 URL 仓库(Redis)中的 URL。</p>
<p><strong>URL 定时器</strong></p>
<p>一段时间后,高优先级 URL 队列和低优先 URL 队列中的 URL 都会被消费完。</p>
<p>为了让程序可以继续爬取数据,同时减少人为的干预,可以预先在 Redis 中插入种子 URL,之后定时让 URL 定时器从种子 URL 中取出 URL 存放到高优先级 URL 队列中,以此达到程序定时不间断爬取数据的目的。</p>
<p>URL 消费完毕后,是否需要循环不断爬取数据根据个人业务需求而不同,因此这一步不是必需的,只是也提供了这样的操作。</p>
<p>因为事实上,我们需要爬取的数据也是每隔一段时间就会更新的,如果希望我们爬取的数据也跟着定时更新,那么这时定时器就有非常重要的作用了。</p>
<p>不过需要注意的是,一旦决定需要循环重复爬取数据,则在设计存储器实现时需要考虑重复数据的问题,即重复数据应该是更新操作。</p>
<p>目前在我设计的存储器不包括这个功能,有兴趣的朋友可以自己实现,只需要在插入数据前判断数据库中是否存在该数据即可。</p>
<p>另外需要注意的一点是,URL 定时器是一个独立的进程,需要单独启动。</p>
<p>定时器基于 Quartz 实现,下面是其 job 的代码:</p>
<pre><code class="language-java">/**
* 每天定时从url仓库中获取种子url,添加进高优先级列表
*/
public class UrlJob implements Job {
// log4j日志记录
private Logger logger = LoggerFactory.getLogger(UrlJob.class);
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
/**
* 1.从指定url种子仓库获取种子url
* 2.将种子url添加进高优先级列表
*/
Jedis jedis = JedisUtil.getJedis();
Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); // spider.seed.urls Redis数据类型为set,防止重复添加种子url
for(String seedUrl : seedUrls) {
String domain = SpiderUtil.getTopDomain(seedUrl); // 种子url的顶级域名
jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX, seedUrl);
logger.info("获取种子:{}", seedUrl);
}
JedisUtil.returnJedis(jedis);
// System.out.println("Scheduler Job Test...");
}
}
</code></pre>
<p>调度器的实现如下:</p>
<pre><code class="language-java">/**
* url定时调度器,定时向url对应仓库中存放种子url
*
* 业务规定:每天凌晨1点10分向仓库中存放种子url
*/
public class UrlJobScheduler {
public UrlJobScheduler() {
init();
}
/**
* 初始化调度器
*/
public void init() {
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 如果没有以下start方法的执行,则是不会开启任务的调度
scheduler.start();
String name = "URL_SCHEDULER_JOB";
String group = "URL_SCHEDULER_JOB_GROUP";
JobDetail jobDetail = new JobDetail(name, group, UrlJob.class);
String cronExpression = "0 10 1 * * ?";
Trigger trigger = new CronTrigger(name, group, cronExpression);
// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
UrlJobScheduler urlJobScheduler = new UrlJobScheduler();
urlJobScheduler.start();
}
/**
* 定时调度任务
* 因为我们每天要定时从指定的仓库中获取种子url,并存放到高优先级的url列表中
* 所以是一个不间断的程序,所以不能停止
*/
private void start() {
while (true) {
}
}
}
</code></pre>
<p><strong>监控报警系统</strong></p>
<p><a href="http://s1.51cto.com/oss/201805/09/b88be51ee744baefd70f4fdb7b4b6891.jpg"><img src="http://s1.51cto.com/oss/201805/09/b88be51ee744baefd70f4fdb7b4b6891.jpg" alt=""/></a></p>
<p>监控报警系统的加入主要是为了让使用者可以主动发现节点宕机,而不是被动地发现,因为实际中爬虫程序可能是持续不断运行的。</p>
<p>并且我们会在多个节点上部署我们的爬虫程序,因此很有必要对节点进行监控,并且在节点出现问题时可以及时发现并修正,需要注意的是,监控报警系统是一个独立的进程,需要单独启动。</p>
<p><strong>基本原理</strong></p>
<p>首先需要先在 ZooKeeper 中创建一个 /ispider 节点:</p>
<pre><code class="language-log">[zk: localhost:2181(CONNECTED) 1] create /ispider ispider
Created /ispider
</code></pre>
<p>监控报警系统的开发主要依赖于 ZooKeeper 实现,监控程序对 ZooKeeper 下面的这个节点目录进行监听:</p>
<pre><code class="language-log">[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
</code></pre>
<p>爬虫程序启动时会在该节点目录下注册一个临时节点目录:</p>
<pre><code class="language-log">[zk: localhost:2181(CONNECTED) 0] ls /ispider
[192.168.43.166]
</code></pre>
<p>当节点出现宕机时,该临时节点目录就会被 ZooKeeper 删除。</p>
<pre><code class="language-log">[zk: localhost:2181(CONNECTED) 0] ls /ispider
[]
</code></pre>
<p>同时因为我们监听了节点目录 /ispider,所以当 ZooKeeper 删除其下的节点目录时(或增加一个节点目录),ZooKeeper 会给我们的监控程序发送通知。</p>
<p>即我们的监控程序会得到回调,这样便可以在回调程序中执行报警的系统动作,从而完成监控报警的功能。</p>
<p><strong>ZooKeeper Java API 使用说明</strong></p>
<p>可以使用 ZooKeeper 原生的 Java API,我在另外写的一个 RPC 框架(底层基于 Netty 实现远程通信)中就是使用原生的 API。</p>
<p>不过显然代码会复杂很多,并且本身需要对 ZooKeeper 有更多的学习和了解,这样用起来才会容易一些。</p>
<p>所以为了降低开发的难度,这里使用第三方封装的 API,即 curator,来进行 ZooKeeper 客户端程序的开发。</p>
<p><strong>爬虫系统 ZooKeeper 注册</strong></p>
<p>在启动爬虫系统时,我们的程序都会启动一个 ZooKeeper 客户端来向 ZooKeeper 来注册自身的节点信息,主要是 IP 地址。</p>
<p>并在 /ispider 节点目录创建一个以该爬虫程序所在的节点 IP 地址命名的节点,如 /ispider/192.168.43.116,实现的代码如下:</p>
<pre><code class="language-java">/**
* 注册zk
*/
private void registerZK() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
CuratorFramework curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
String ip = null;
try {
// 向zk的具体目录注册 写节点 创建节点
ip = InetAddress.getLocalHost().getHostAddress();
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/ispider/" + ip, ip.getBytes());
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
</code></pre>
<p>应该注意到的是,我们创建的节点为临时节点,要想实现监控报警功能,必须要为临时节点。</p>
<p><strong>监控程序</strong></p>
<p>首先需要先监听 ZooKeeper 中的一个节点目录,在我们的系统中,设计是监听 /ispider 这个节点目录:</p>
<pre><code class="language-java">public SpiderMonitorTask() {
String zkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181";
int baseSleepTimeMs = 1000;
int maxRetries = 3;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
curator = CuratorFrameworkFactory.newClient(zkStr, retryPolicy);
curator.start();
try {
previousNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
} catch (Exception e) {
e.printStackTrace();
}
}
</code></pre>
<p>在上面注册了 ZooKeeper 中的 watcher,也就是接收通知的回调程序,在该程序中,执行我们报警的逻辑:</p>
<pre><code class="language-java">/**
* 这个方法,当监控的zk对应的目录一旦有变动,就会被调用
* 得到当前最新的节点状态,将最新的节点状态和初始或者上一次的节点状态作比较,那我们就知道了是由谁引起的节点变化
* @param event
*/
@Override
public void process(WatchedEvent event) {
try {
List<String> currentNodes = curator.getChildren().usingWatcher(this).forPath("/ispider");
// HashSet<String> previousNodesSet = new HashSet<>(previousNodes);
if(currentNodes.size() > previousNodes.size()) { // 最新的节点服务,超过之前的节点服务个数,有新的节点增加进来
for(String node : currentNodes) {
if(!previousNodes.contains(node)) {
// 当前节点就是新增节点
logger.info("----有新的爬虫节点{}新增进来", node);
}
}
} else if(currentNodes.size() < previousNodes.size()) { // 有节点挂了 发送告警邮件或者短信
for(String node : previousNodes) {
if(!currentNodes.contains(node)) {
// 当前节点挂掉了 得需要发邮件
logger.info("----有爬虫节点{}挂掉了", node);
MailUtil.sendMail("有爬虫节点挂掉了,请人工查看爬虫节点的情况,节点信息为:", node);
}
}
} // 挂掉和新增的数目一模一样,上面是不包括这种情况的,有兴趣的朋友可以直接实现包括这种特殊情况的监控
previousNodes = currentNodes; // 更新上一次的节点列表,成为最新的节点列表
} catch (Exception e) {
e.printStackTrace();
}
// 在原生的API需要再做一次监控,因为每一次监控只会生效一次,所以当上面发现变化后,需要再监听一次,这样下一次才能监听到
// 但是在使用curator的API时则不需要这样做
}
</code></pre>
<p>当然,判断节点是否挂掉,上面的逻辑还是存在一定的问题的,按照上面的逻辑,假如某一时刻新增节点和删除节点事件同时发生,那么其就不能判断出来,所以如果需要更精准的话,可以将上面的程序代码修改一下。</p>
<p><strong>邮件发送模块</strong></p>
<p>使用模板代码就可以了,不过需要注意的是,在使用时,发件人的信息请使用自己的邮箱。</p>
<p><a href="http://s2.51cto.com/oss/201805/09/b712e7d28dd46deef7ca2d44c853d548.jpg"><img src="http://s2.51cto.com/oss/201805/09/b712e7d28dd46deef7ca2d44c853d548.jpg" alt=""/></a></p>
<p>下面是爬虫节点挂掉时接收到的邮件:</p>
<p>实际上,如果购买了短信服务,那么通过短信 API 也可以向我们的手机发送短信。</p>
<p><strong>实战:爬取京东、苏宁易购全网手机商品数据</strong></p>
<p>因为前面在介绍这个系统的时候也提到了,我只写了京东和苏宁易购的网页解析器,所以接下来也就是爬取其全网的手机商品数据。</p>
<p><strong>环境说明</strong></p>
<p>需要确保 Redis、ZooKeeper 服务可用,另外如果需要使用 HBase 来存储数据,需要确保 Hadoop 集群中的 HBase 可用,并且相关配置文件已经加入到爬虫程序的 classpath 中。</p>
<p>还有一点需要注意的是,URL 定时器和监控报警系统是作为单独的进程来运行的,并且也是可选的。</p>
<p><strong>爬虫结果</strong></p>
<p>进行了两次爬取,分别尝试将数据保存到 MySQL 和 HBase 中,给出如下数据情况。</p>
<p><strong>保存到 MySQL</strong></p>
<pre><code class="language-log">mysql> select count(*) from phone;
+----------+
| count(*) |
+----------+
| 12052 |
+----------+
1 row in set
mysql> select count(*) from phone where source='jd.com';
+----------+
| count(*) |
+----------+
| 9578 |
+----------+
1 row in set
mysql> select count(*) from phone where source='suning
.com';
+----------+
| count(*) |
+----------+
| 2474 |
+----------+
1 row in set
</code></pre>
<p>在可视化工具中查看数据情况:</p>
<p><a href="http://s4.51cto.com/oss/201805/09/a8e4c845fd594da5c2b4683c11bcc6b9.jpg"><img src="http://s4.51cto.com/oss/201805/09/a8e4c845fd594da5c2b4683c11bcc6b9.jpg" alt=""/></a></p>