-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathshfe.py
296 lines (277 loc) · 11.3 KB
/
shfe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# -*- encode:utf-8 -*-
# 上期所
import time
import re
import datetime
import requests
import pymongo
import settings
import threading
import pandas as pd
from queue import Queue, Empty
from log import Logger
log = Logger('logs/shfe.log')
class CrawlData(threading.Thread):
"""爬取数据类"""
def __init__(self, q):
super(CrawlData, self).__init__()
# 数据存储队列
self.q = q
# url
self.url = settings.API['shfe']
# 请求头
self.headers = {
'Host': 'www.shfe.com.cn',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36',
'Referer': 'http://www.shfe.com.cn/statements/dataview.html?paramid=delaymarket_all',
}
# 连接失败重试次数
self.retry = 3
def run(self):
# 获取数据库最新的一条时间
last_time = self.get_last_time()
last_time = last_time.date()
today = datetime.date.today()
while last_time <= today:
# 排除周六日情况
if last_time.isoweekday != 6 and last_time.isoweekday != 7:
# 请求url
url = self.url % last_time.strftime('%Y%m%d')
# 超时次数
time_out = 0
while time_out < self.retry:
try:
# log.logger.debug('开始爬取 %s' % url)
response = requests.get(url)
except Exception as e:
log.logger.warning('连接失败, 错误内容: %s, url: %s' % (e, url), exc_info=True)
time_out += 1
continue
# 如果是404,说明当天没有数据
if response.status_code == 404:
break
try:
# 把数据转换成json
data = response.json()
except Exception as e:
log.logger.error('数据转成json失败, 错误内容: %s, url: %s' % (e, url), exc_info=True)
time_out += 1
continue
# 如果数据没有report_date,则添加一个时间
data.setdefault('report_date', last_time.strftime('%Y%m%d'))
# 如果是有数据的
if data['o_cursor']:
# 把数据放进队列里
self.q.put(data)
break
# 如果超时次数等于设定的重试次数,说明没有成功爬取到数据
if time_out == self.retry:
log.logger.error('获取数据失败 %s' % url)
# 停止爬取
break
# 日期加1天
last_time += datetime.timedelta(days=1)
def get_last_time(self):
"""查询数据库最后一条的时间"""
# 连接数据库
if not settings.MONGODB['AUTHMECHANISM']:
client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE']
)
else:
client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE'],
authMechanism=settings.MONGODB['AUTHMECHANISM']
)
db = client[settings.DB_NAME]
date_list = []
# 查询每个表中最小的日期
for collection_name in settings.COLLECTION_NAMES.values():
collection = db[collection_name]
data = collection.find_one({'exchange': 'shfe'}, sort=[('date', -1)])
# 上期所期货最早时间是2002年1月7日
if data:
date_list.append(data['date'])
else:
date_list.append(datetime.datetime(2002, 1, 7))
return min(date_list)
class ParseData(threading.Thread):
"""处理数据类"""
def __init__(self, q, trade_q, short_q, long_q):
super(ParseData, self).__init__()
# 数据队列
self.q = q
self.trade_q = trade_q
self.short_q = short_q
self.long_q = long_q
def run(self):
global EXIT_FLAG_PARSER
while not EXIT_FLAG_PARSER:
# 采用非堵塞获取队列数据
try:
data = self.q.get(timeout=1)
self.parse_data(data)
self.q.task_done()
except Empty:
pass
except Exception as e:
log.logger.error('数据处理线程出错,时间:%s,错误信息:%s' % (data['report_date'], e), exc_info=True)
self.q.task_done()
def parse_data(self, data):
"""处理数据"""
# 日期
date = datetime.datetime.strptime(data['report_date'], '%Y%m%d')
# log.logger.debug('正在处理 %s' % date)
# 把数据转换成DataFrame
df = pd.DataFrame(data['o_cursor'])
# 如果排名是-1或0,是品种的一个总情况,忽略跳过
df = df[(df['RANK']!=-1) & (df['RANK']!=0)]
# 把数据中的空格去掉
df['INSTRUMENTID'] = df['INSTRUMENTID'].str.strip()
df['PARTICIPANTABBR1'] = df['PARTICIPANTABBR1'].str.strip()
df['PARTICIPANTABBR2'] = df['PARTICIPANTABBR2'].str.strip()
df['PARTICIPANTABBR3'] = df['PARTICIPANTABBR3'].str.strip()
# 数据类型转换
df['RANK'] = df['RANK'].astype('int32')
# 按照合约进行分组
for contract, son_df in df.groupby(df['INSTRUMENTID']):
# RANK=999的是对合约的小结
new_df = son_df[son_df['RANK']!=999]
for i in range(1, 4):
# 如果合约小结没有数据,则跳过
if int(son_df.loc[son_df['RANK']==999, 'CJ%s' % i] == ''):
continue
# 构造新的DataFrame
temp_df = pd.DataFrame({'rank': new_df['RANK'], 'name': new_df['PARTICIPANTABBR%s' % i], 'volume': new_df['CJ%s' % i], 'volumeDiff': new_df['CJ%s_CHG' % i]}).reset_index(drop=True).sort_values('rank')
# 把空的数据删除,并转换数据类型
temp_df = temp_df[temp_df['name'] != '']
temp_df[['volume', 'volumeDiff']] = temp_df[['volume', 'volumeDiff']].astype('int32')
# 整理全部数据
temp_dict = {
'exchange': 'shfe',
'goods': re.match(r'[^\d]+', contract).group(),
'symbol': 'shfe_%s' % contract.lower(),
'date': date,
'volume': int(son_df.loc[son_df['RANK']==999, 'CJ%s' % i]),
'volumeDiff': int(son_df.loc[son_df['RANK']==999, 'CJ%s_CHG' % i]),
'data': temp_df.to_dict('records'),
}
# 把数据放进队列
if i == 1:
self.trade_q.put(temp_dict)
elif i == 3:
self.short_q.put(temp_dict)
elif i == 2:
self.long_q.put(temp_dict)
class InsertData(threading.Thread):
"""插入数据类"""
def __init__(self, q, collection_name):
super(InsertData, self).__init__()
self.q = q
if not settings.MONGODB['AUTHMECHANISM']:
self.client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE']
)
else:
self.client = pymongo.MongoClient(
host=settings.MONGODB['HOST'],
port=settings.MONGODB['PORT'],
username=settings.MONGODB['USERNAME'],
password=settings.MONGODB['PASSWORD'],
authSource=settings.MONGODB['AUTHSOURCE'],
authMechanism=settings.MONGODB['AUTHMECHANISM']
)
self.db = self.client[settings.DB_NAME]
self.collection = self.db[collection_name]
self.collection.create_index([('date', 1), ('symbol', 1)])
def run(self):
global EXIT_FLAG_INSERTER
while not EXIT_FLAG_INSERTER:
# 采用非堵塞获取队列数据
try:
data = self.q.get(timeout=1)
self.insert_data(data)
self.q.task_done()
except Empty:
pass
except Exception as e:
log.logger.error('插入数据线程出错, 时间:%s,错误内容:%s' % (data['date'], e), exc_info=True)
self.q.task_done()
self.client.close()
def insert_data(self, data):
date = data['date']
symbol = data['symbol']
try:
# log.logger.debug('正在插入 %s %s' % (symbol, date))
self.collection.replace_one({'date': date, 'symbol': symbol}, data, True)
except Exception as e:
log.logger.error('插入数据出错 %s' % data)
def main():
start = time.time()
log.logger.info('-'*50+' start '+'-'*50)
log.logger.info('开始上期所大户持仓爬虫程序')
# 数据处理线程退出信号
global EXIT_FLAG_PARSER
EXIT_FLAG_PARSER = False
# 数据插入线程退出信号
global EXIT_FLAG_INSERTER
EXIT_FLAG_INSERTER = False
# 爬虫数据队列
q = Queue()
# 成交量排名数据队列
trade_q = Queue()
# 持卖单量排名数据队列
short_q = Queue()
# 持买单量排名数据队列
long_q = Queue()
# 开启爬虫
crawler = CrawlData(q)
crawler.start()
# 开启数据处理
parser1 = ParseData(q, trade_q, short_q, long_q)
parser2 = ParseData(q, trade_q, short_q, long_q)
parser1.start()
parser2.start()
# 开启数据插入
insert1 = InsertData(trade_q, settings.COLLECTION_NAMES['TRADE'])
insert2 = InsertData(short_q, settings.COLLECTION_NAMES['SHORT'])
insert3 = InsertData(long_q, settings.COLLECTION_NAMES['LONG'])
insert1.start()
insert2.start()
insert3.start()
# 等待爬虫线程结束
crawler.join()
# 等待数据处理完成
q.join()
# 通知数据处理线程可以结束了
EXIT_FLAG_PARSER = True
# 等待处理线程结束
parser1.join()
parser2.join()
# 等待其他数据队列完成
trade_q.join()
short_q.join()
long_q.join()
# 通知数据插入线程已经没有其他数据了
EXIT_FLAG_INSERTER = True
# 等待数据插入线程结束
insert1.join()
insert2.join()
insert3.join()
log.logger.info('上期所大户持仓数据已更新完成')
log.logger.info('共耗时%ss' % (time.time()-start))
log.logger.info('-'*50+' end '+'-'*50)
if __name__ == "__main__":
main()