-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths3collapse.py
381 lines (341 loc) · 15.3 KB
/
s3collapse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
import logging
import os
import tempfile
from datetime import datetime, timedelta
import boto
def collapse(s3bucket, inPrefix, outFile, outKey, outMaxSize=5*1024*1024*1024, outRRS=False, rmOutFile=True):
"""
concatenate all (small) files passed to the funtion into one larger one
downloads keys (files) that match inPrefix, concatenates all of them
into outFile uploads outFile to outKey, then deletes downloaded keys
Arguments:
s3bucket
s3bucket object. must already be initiated
inPrefix (str)
prefix used to list keys, like wildcard (i.e. path/to/files*)
outFile (str)
local file name that the others will be collapsed into
outKey (str)
S3 key where the contents of outFile will be written to (i.e. uploaded file)
outMaxSize (int)
if the outFile size grows over this value, raise exception and cancel.
defaults to 2GB, set to 0 to disable
outRRS
if True the new key will be set to use Reduced Redundancy Storage
rmOutFile
if False the concatenated file will not be removed from local storage
"""
# DOWNLOADING AND CONCATENATING
# open outFile in binary mode because get_contents_* returns bytes
with open(outFile, mode='wb') as outFD:
inKeys = s3bucket.list(inPrefix)
if len(list(inKeys)) == 0:
logging.info(' No files for this prefix')
outFD.close()
os.remove(outFile)
return
inSize = 0
logging.info(' Downloading {:d} keys'.format(len(list(inKeys))))
# True if the last file loaded was .gz, False otherwise
wasGzip = None
for inKey in inKeys:
# keep track of total size to check at the end
inSize = inSize + inKey.size
# create a temporary file that will reside in memory until it reaches
# 16MB, at which point it gets written to disk. default is binary mode
with tempfile.SpooledTemporaryFile(max_size = 16*1024*1024) as tempFD:
inKey.get_contents_to_file(tempFD)
# move back to start of file
tempFD.flush()
tempFD.seek(0)
if wasGzip == None:
wasGzip = isGzip(tempFD, inKey.name)
if isGzip(tempFD, inKey.name) != wasGzip:
# raise exception, we don't want to combine gzip and non-gzip files
if wasGzip:
fileType = 'not gzip'
filesType = 'gzip'
else:
fileType = 'gzip'
filesType = 'not gzip'
raise Exception(
'File {} is {}, but the other files so far were {}'.format(inKey.name, fileType, filesType))
# read tempfile 256KB at a time, write to outFile
inChunk = tempFD.read(256*1024)
while len(inChunk):
outFD.write(inChunk)
inChunk = tempFD.read(256*1024)
if outMaxSize > 0 and outFD.tell() > outMaxSize:
os.remove(outFile)
raise RuntimeError("Output file size bigger than the maximum of " + str(outMaxSize) + "B. Removed "
+ outFile + ", canceling operation")
outFD.close()
# any problems should have already interrupted execution,
# but this double-check is cheap
outSize = outFD.tell()
if inSize != outSize:
raise RuntimeError("Collapsed file size of " + str(outSize) + " bytes is different than the expected "
+ str(inSize) + " bytes!")
# UPLOADING CONCATENATED FILE
# files downloaded and concatenated, uploading outFile
# could maybe go with multipart uploads here
logging.info(' Uploading {} ({:d} Bytes) to {}'.format(outFile, outSize, outKey))
outKeyD = s3bucket.new_key(outKey)
outKeySize = outKeyD.set_contents_from_filename(outFile, replace=False, reduced_redundancy=outRRS, cb=s3_progress,
num_cb=3)
if outKeySize == None:
outKeySize = 0
if outKeySize != outSize:
raise RuntimeError("Uploaded file size (" + str(outKeySize) + "B) differs from local file size (" +
str(outSize) + "B)")
# DELETE COLLAPSED KEYS
logging.info(' Removing {:d} keys'.format(len(list(inKeys))))
for inKey in inKeys:
inKey.delete()
# DELETE LOCAL CONCATENATED FILE
if rmOutFile == True:
os.remove(outFile)
def s3_progress(current, total):
'''
called by s3 upload/download functions to display progress
'''
logging.info(' Transferred {:d}%'.format(int(current/total*100)))
def dtm_to_s3_log(dtm, increment):
'''
convert datetime to a string like 2014-12-31
will convert dtm to the appropriate prefix string, depending
on the selected incrment. For S3 logs, which have names like
2014-12-31-17-25-36-XXXXXXXXXXXXXXXX
Arguments:
dtm (datetime)
object to be converted
increment (str)
what kind of stepping will be used
'''
# {:%Y-%m-%d-%H-%M-%S-}
if increment == "d": # day
dtstr = '{:%Y-%m-%d}'.format(dtm)
elif increment == "H": # hour
dtstr = '{:%Y-%m-%d-%H}'.format(dtm)
elif increment == "M": # minute
dtstr = '{:%Y-%m-%d-%H-%M}'.format(dtm)
elif increment == "S": # second
dtstr = '{:%Y-%m-%d-%H-%M-%S}'.format(dtm)
elif increment == "m": # month
dtstr = '{:%Y-%m}'.format(dtm)
elif increment == "Y": # year
dtstr = '{:%Y}'.format(dtm)
else:
raise RuntimeError('increment "' + increment + '" undefined, aborting')
return dtstr
def isGzip(fd, fn=None):
'''
determines if file is a gzip archive or not
checks magic number (first two bytes) and file extension if passed a file
name in order to determine if file is a gzip archive. returns True or False
if a file name is also passed to the function, it return True if and only if
file extension is '.gz' AND first two bytes are '1f 8b'. if only the magic
number should be checked, don't give it a file name
Arguments:
fd (file descriptor)
file descriptor. implies file is already open
fn (str)
file name
'''
fdPosition=fd.tell()
fd.seek(0)
byte1 = fd.read(1)
byte2 = fd.read(1)
fd.seek(fdPosition)
# check magic number
if byte1 == b'\x1f' and byte2 == b'\x8b':
hasBytes = True
else:
hasBytes = False
# check extention
if type(fn) is str:
if fn[-3:len(fn)] == '.gz':
hasExtension = True
else:
hasExtension = False
else:
hasExtension = True
return hasBytes and hasExtension
def collapse_s3(s3bucket, s3inDir, s3outDir=None, dateStart=None, dateEnd=None, increment="d", outRRS=False):
'''
mass collapse S3 logs generated in the specified time period
start date and end date both default to yesterday unless specified
so, if only dateStart is specified it will group logs from
startDate to yesterday
Ex.: collapse_s3(myBucket, 'logs/s3logs/', dateStart = datetime(2014,7,29))
Arguments:
s3bucket
bucket object. must already be initiated
s3inDir (str)
s3 'path' where the logs reside
dateStart (datetime)
collapse logs starting this day (default yesterday 00:00:00)...
dateEnd (datetime)
...up to this day, inclusive (default, yesterday 23:59:59)
increment (str)
how to group logs. (d)aily/(H)ourly/(M)onthly
s3outDir (str)
directory where the resulting file will be stored on S3.
if unspecified it will be the same as s3inDir
outRRS (boolean)
use Reduced Redundancy Storage for the new file or not
passed on to collapse()
'''
if increment == "d":
timeStep = timedelta(days = 1)
elif increment == "H":
timeStep = timedelta(hours = 1)
elif increment == "M":
timeStep = timedelta(minutes = 1)
else:
raise RuntimeError('increment "' + increment + '" undefined, aborting')
if dateEnd == None:
# yesterday, one second before midnight
dateEnd = datetime(datetime.today().year, datetime.today().month, datetime.today().day) - timedelta(seconds=1)
if dateStart == None:
# yesterday, midnight
dateStart = datetime(dateEnd.year, dateEnd.month, dateEnd.day)
if type(dateStart) != type(datetime.today()) or type(dateStart) != type(dateEnd) or dateEnd < dateStart:
raise Exception("Start/End dates ({}/{}) are wrong somehow".format(str(dateStart), str(DateEnd)))
s3inDir = os.path.join(s3inDir, '')
if s3outDir == None:
s3outDir = s3inDir
else:
s3outDir = os.path.join(s3outDir, '')
outDir = os.path.join(tempfile.mkdtemp(prefix="collapse_s3_"), '')
dateCurrentLogs = dateStart
while dateCurrentLogs <= dateEnd:
prefix = dtm_to_s3_log(dateCurrentLogs, increment)
inPrefix = s3inDir + prefix + "-"
outFile = outDir + prefix + '_collapsed'
outKey = s3outDir + prefix + '_collapsed'
logging.info('{0}/{1}'.format(s3bucket.name, inPrefix))
collapse(s3bucket, inPrefix, outFile, outKey, outRRS = outRRS)
dateCurrentLogs = dateCurrentLogs + timeStep
os.rmdir(outDir) # should be empty. if it isn't, we might want the files anyway
def collapse_ctrail(s3bucket, region=None, account=None, s3outDir=None, dateStart=None, dateEnd=None, increment="d",
outRRS=False):
'''
mass collapse CloudTrail logs generated in the specified time period
start date and end date both default to yesterday unless specified
if only dateStart is specified it will group logs from
startDate to yesterday
Ex.: collapse_ctrail(myBucket, 'us-east-1', '12345678900', datestart = datetime(2014,7,29), outRRS = True)
Arguments:
s3bucket
bucket object. must already be initiated
s3outDir (str)
directory where the resulting file will be stored on S3.
if unspecified it will be the same as s3inDir
takes precedent over region/account
region (str)
if specified it will be used along with account to construct
s3dirIn and s3outDir
dateStart (datetime)
collapse logs starting this day (default yesterday 00:00:00)...
dateEnd (datetime)
...up to this day, inclusive (default, yesterday 23:59:59)
increment (str)
how to group logs. (d)aily/(H)ourly/(M)onthly
outRRS (boolean)
use Reduced Redundancy Storage for the new file or not
passed on to collapse()
'''
if dateEnd == None:
# yesterday, one second before midnight
dateEnd = datetime(datetime.today().year, datetime.today().month, datetime.today().day) - timedelta(seconds=1)
if dateStart == None:
# yesterday, midnight
dateStart = datetime(dateEnd.year, dateEnd.month, dateEnd.day)
outDir = os.path.join(tempfile.mkdtemp(prefix="collapse_ctrail_"), '')
dt = dateStart
while dt <= dateEnd:
if type(region) is str and type(account) is str:
s3inDir = 'AWSLogs/{}/CloudTrail/{}/{:%Y/%m/%d}/'.format(account, region, dt)
else:
raise Exception("Region and/or account not specified")
s3inFile = '{}_CloudTrail_{}_{:%Y%m%d}T'.format(account, region, dt)
s3inPrefix = s3inDir + s3inFile
if type(s3outDir) is str:
# defined, all good
pass # Python doesn't like empty blocks
elif type(region) is str and type(account) is str:
s3outDir = 'AWSLogs_collapsed/{}/CloudTrail/{}/'.format(account, region)
else:
raise Exception("s3outDir could not be defined")
fileName = '{}_CloudTrail_{}_{:%Y%m%d}_collapsed'.format(account, region, dt)
outFile = '{}{}'.format(outDir, fileName)
s3outFile = '{}{}'.format(s3outDir, fileName)
logging.info('{}: {} -> {}'.format(s3bucket.name, s3inPrefix, s3outFile))
collapse(s3bucket, s3inPrefix, outFile, s3outFile, outRRS = outRRS)
dt = dt + timedelta(days=1)
os.rmdir(outDir) # should be empty. if it isn't, we might want the files anyway
def collapse_redshift(s3bucket, region=None, account=None, cluster=None,
s3outDir=None, dateStart=None, dateEnd=None, increment="d",
outRRS=False):
'''
mass collapse redshift logs generated in the specified time period
start date and end date both default to yesterday unless specified
if only dateStart is specified it will group logs from
startDate to yesterday
Ex.: collapse_redshift(myBucket, 'us-east-1', '12345678900', datestart = datetime(2014,7,29), outRRS = True)
Arguments:
s3bucket
bucket object. must already be initiated
s3outDir (str)
directory where the resulting file will be stored on S3.
if unspecified it will be the same as s3inDir
takes precedent over region/account
region (str)
if specified it will be used along with account to construct
s3dirIn and s3outDir
cluster (str)
cluster name, part of the redshift log filename
dateStart (datetime)
collapse logs starting this day (default yesterday 00:00:00)...
dateEnd (datetime)
...up to this day, inclusive (default, yesterday 23:59:59)
increment (str)
NOT IMPLEMENTED, ALWAYS DAILY. how to group logs. (d)aily/(H)ourly/(M)onthly
outRRS (boolean)
use Reduced Redundancy Storage for the new file or not
passed on to collapse()
'''
if dateEnd == None:
# yesterday, one second before midnight
dateEnd = datetime(datetime.today().year, datetime.today().month, datetime.today().day) - timedelta(seconds=1)
if dateStart == None:
# yesterday, midnight
dateStart = datetime(dateEnd.year, dateEnd.month, dateEnd.day)
outDir = os.path.join(tempfile.mkdtemp(prefix="collapse_redshift_"), '')
dt = dateStart
logtypes = ['connectionlog', 'userlog', 'useractivitylog']
while dt <= dateEnd:
for logtype in logtypes:
if type(region) is str and type(account) is str:
s3inDir = 'AWSLogs/{}/redshift/{}/{:%Y/%m/%d}/'.format(account, region, dt)
else:
raise Exception('Region and/or account not specified')
if type(cluster) is not str or type(logtype) is not str:
raise Exception('Need a cluster name')
s3inFile = '{}_redshift_{}_{}_{}_{:%Y-%m-%d}T'.format(account, region, cluster, logtype, dt)
s3inPrefix = s3inDir + s3inFile
if type(s3outDir) is str:
# defined, all good
pass # Python doesn't like empty blocks
elif type(region) is str and type(account) is str:
s3outDir = 'AWSLogs_collapsed/{}/redshift/{}/'.format(account, region)
else:
raise Exception("s3outDir could not be defined")
fileName = '{}_redshift_{}_{}_{}_{:%Y-%m-%d}T_collapsed'.format(account, region, cluster, logtype, dt)
outFile = '{}{}'.format(outDir, fileName)
s3outFile = '{}{}'.format(s3outDir, fileName)
logging.info('{}: {} -> {}'.format(s3bucket.name, s3inPrefix, s3outFile))
collapse(s3bucket, s3inPrefix, outFile, s3outFile, outRRS=outRRS)
dt = dt + timedelta(days=1)
os.rmdir(outDir) # should be empty. if it isn't, we might want the files anyway