-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathcffex.py
297 lines (277 loc) · 10.7 KB
/
cffex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# -*- coding:utf-8 -*-
# 中金所
# IF 2010-04-16
# IC 2015-04-16
# IH 2015-04-16
import os
import time
import datetime
import re
import requests
import pymongo
import xmltodict
import settings
from threading import Thread
from queue import Queue, Empty
from log import Logger
log = Logger('logs/cffex.log')
class CrawlData(Thread):
"""数据爬取类"""
def __init__(self, q):
super(CrawlData, self).__init__()
self.url = settings.API['cffex']
self.q = q
self.goods = ['IF', 'IH', 'IC']
self.retry = 3
def run(self):
# 出错异常停止信号
exit_signal = False
# 查询最后一天的时间
last_date = self.get_last_date()
today = datetime.datetime.today()
while last_date <= today:
# 排除周六日情况
if last_date.isoweekday() != 6 and last_date.isoweekday() != 7:
year_month = last_date.strftime('%Y%m')
day = last_date.strftime('%d')
# 分别对三个品种进行查询
for goods in self.goods:
format_dict = {
'year_month': year_month,
'day': day,
'goods': goods,
}
url = self.url.format(**format_dict)
timeout = 0
while timeout < self.retry:
try:
# log.logger.debug('正在爬取 %s' % url)
response = requests.get(url)
break
except Exception as e:
log.logger.warning('获取数据超时 %s, 错误:%s' % (url, e))
timeout += 1
# 如果获取超时了,则退出本次爬取,并且通知线程退出
if timeout == self.retry:
log.logger.error('爬取严重超时 %s' % url)
exit_signal = True
break
# 如果返回的数据大小大于3000,说明是有数据的
if len(response.content) > 3000 and response.status_code == 200:
# 对内容进行转码
xml = response.content.decode('utf-8')
# 把xml转成字典格式
xml_dict = xmltodict.parse(xml)
# 放进队列
self.q.put(xml_dict)
# 如果有退出信号,则退出整个线程
if exit_signal:
break
last_date += datetime.timedelta(days=1)
def get_last_date(self):
"""获取最后一天的日期"""
# 连接数据库
if not settings.MONGODB['AUTHMECHANISM']:
client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE']
)
else:
client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE'],
authMechanism=settings.MONGODB['AUTHMECHANISM']
)
db = client[settings.DB_NAME]
date_list = []
# 对每个表进行查询最后一条的日期
for collection_name in settings.COLLECTION_NAMES.values():
collection = db[collection_name]
data = collection.find_one({'exchange': 'cffex'}, sort=[('date', -1)])
# 郑商所期货大户持仓数据最早时间是2005年5月9日
if data:
date_list.append(data['date'])
else:
date_list.append(datetime.datetime(2010, 4, 16))
# 返回最小的那天
return min(date_list)
class ParseData(Thread):
"""数据处理类"""
def __init__(self, q, trade_q, short_q, long_q):
super(ParseData, self).__init__()
# 数据队列
self.q = q
self.trade_q = trade_q
self.short_q = short_q
self.long_q = long_q
def run(self):
global EXIT_FLAG_PARSER
while not EXIT_FLAG_PARSER:
try:
xml_dict = self.q.get(timeout=1)
self.parse_data(xml_dict)
self.q.task_done()
except Empty:
pass
except Exception as e:
log.logger.error('数据处理线程出错, 时间:%s,错误信息:%s' % (xml_dict['positionRank']['data'][0]['tradingday'], e), exc_info=True)
self.q.task_done()
def parse_data(self, xml_dict):
"""处理数据"""
# 时间
date = xml_dict['positionRank']['data'][0]['tradingday']
date = datetime.datetime.strptime(date, '%Y%m%d')
# log.logger.debug('正在处理 %s' % date)
# 数据
trade_data = {}
long_data = {}
short_data = {}
data = xml_dict['positionRank']['data']
for item in data:
# 当日的变化量
volumeDiff = int(item.get('varVolume', item.get('varvolume')))
# 合约代码
instrumentId = item.get('instrumentId', item.get('instrumentid')).strip()
temp_dict = {
'rank': int(item['rank']),
'name': item['shortname'],
'volume': int(item['volume']),
'volumeDiff': volumeDiff,
}
if item['@Value'] == '0':
temp = trade_data.setdefault(instrumentId, [])
temp.append(temp_dict)
elif item['@Value'] == '1':
temp = long_data.setdefault(instrumentId, [])
temp.append(temp_dict)
elif item['@Value'] == '2':
temp = short_data.setdefault(instrumentId, [])
temp.append(temp_dict)
if trade_data:
self.parse2(trade_data, date, self.trade_q)
if long_data:
self.parse2(long_data, date, self.long_q)
if short_data:
self.parse2(short_data, date, self.short_q)
def parse2(self, data_dict, date, q):
"""转成需要的格式并放进队列"""
for contract, data in data_dict.items():
volume = sum([i['volume'] for i in data])
volumeDiff = sum([i['volumeDiff'] for i in data])
goods = re.findall(r'[a-zA-Z]+', contract)[0]
doc = {
'exchange': 'cffex',
'goods': goods,
'symbol': 'cffex_%s' % contract.lower(),
'date': date,
'volume': volume,
'volumeDiff': volumeDiff,
'data': data,
}
q.put(doc)
class InsertData(Thread):
"""插入数据类"""
def __init__(self, q, collection_name):
super(InsertData, self).__init__()
self.q = q
if not settings.MONGODB['AUTHMECHANISM']:
self.client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE']
)
else:
self.client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE'],
authMechanism=settings.MONGODB['AUTHMECHANISM']
)
self.db = self.client[settings.DB_NAME]
self.collection = self.db[collection_name]
self.collection.create_index([('date', 1), ('symbol', 1)])
def run(self):
global EXIT_FLAG_INSERTER
while not EXIT_FLAG_INSERTER:
# 采用非堵塞获取队列数据
try:
data = self.q.get(timeout=1)
self.insert_data(data)
self.q.task_done()
except Empty:
pass
except Exception as e:
log.logger.error('插入数据线程出错, 时间:%s,错误内容:%s' % (data['date'], e), exc_info=True)
self.q.task_done()
def insert_data(self, data):
date = data['date']
symbol = data['symbol']
try:
# log.logger.debug('正在插入 %s %s' % (symbol, date))
self.collection.replace_one({'date': date, 'symbol': symbol}, data, True)
except Exception as e:
log.logger.error('插入数据出错 %s' % data)
def main():
start = time.time()
log.logger.info('-'*50+' start '+'-'*50)
log.logger.info('开始上期所大户持仓爬虫程序')
# 数据处理线程退出信号
global EXIT_FLAG_PARSER
EXIT_FLAG_PARSER = False
# 数据插入线程退出信号
global EXIT_FLAG_INSERTER
EXIT_FLAG_INSERTER = False
# 爬虫数据队列
q = Queue()
# 成交量排名数据队列
trade_q = Queue()
# 持卖单量排名数据队列
short_q = Queue()
# 持买单量排名数据队列
long_q = Queue()
# 开启爬虫
crawler = CrawlData(q)
crawler.start()
# 开启数据处理
parser = ParseData(q, trade_q, short_q, long_q)
parser.start()
# 开启数据插入
insert1 = InsertData(trade_q, settings.COLLECTION_NAMES['TRADE'])
insert2 = InsertData(short_q, settings.COLLECTION_NAMES['SHORT'])
insert3 = InsertData(long_q, settings.COLLECTION_NAMES['LONG'])
insert1.start()
insert2.start()
insert3.start()
# 等待爬虫线程结束
crawler.join()
# 等待数据处理完成
q.join()
# 通知数据处理线程可以结束了
EXIT_FLAG_PARSER = True
# 等待处理线程结束
parser.join()
# 等待其他数据队列完成
trade_q.join()
short_q.join()
long_q.join()
# 通知数据插入线程已经没有其他数据了
EXIT_FLAG_INSERTER = True
# 等待数据插入线程结束
insert1.join()
insert2.join()
insert3.join()
log.logger.info('上期所大户持仓数据已更新完成')
log.logger.info('共耗时%ss' % (time.time()-start))
log.logger.info('-'*50+' end '+'-'*50)
if __name__ == "__main__":
main()