1
1
from Interfaces import AzureOMSInterface , SqlInterface , GraylogInterface , PRTGInterface , FileInterface , \
2
2
AzureTableInterface , AzureBlobInterface , FluentdInterface
3
+ import alc # Rust based log collector Engine
3
4
import AuditLogSubscriber
4
5
import ApiConnection
5
6
import os
@@ -45,6 +46,7 @@ def __init__(self, config_path, **kwargs):
45
46
self .run_started = None
46
47
self .logs_retrieved = 0
47
48
self .errors_retrieving = 0
49
+ self .retries = 0
48
50
49
51
def force_stop (self , * args ):
50
52
@@ -65,13 +67,46 @@ def run_once(self):
65
67
"""
66
68
self ._prepare_to_run ()
67
69
logging .log (level = logging .INFO , msg = 'Starting run @ {}. Content: {}.' .format (
68
- datetime .datetime .now (), self ._remaining_content_types ))
69
- self ._start_monitoring ()
70
- self ._get_all_available_content ()
71
- while self .monitor_thread .is_alive ():
72
- self .monitor_thread .join (1 )
70
+ datetime .datetime .now (), self .config ['collect' , 'contentTypes' ]))
71
+ if not self .config ['collect' , 'rustEngine' ] is False :
72
+ self ._start_interfaces ()
73
+ self .receive_results_from_rust_engine ()
74
+ self ._stop_interfaces (force = False )
75
+ else :
76
+ self ._start_monitoring ()
77
+ self ._get_all_available_content ()
78
+ while self .monitor_thread .is_alive ():
79
+ self .monitor_thread .join (1 )
73
80
self ._finish_run ()
74
81
82
+ def receive_results_from_rust_engine (self ):
83
+
84
+ runs = self ._get_needed_runs (content_types = self .config ['collect' , 'contentTypes' ].copy ())
85
+ engine = alc .RustEngine (self .tenant_id , self .client_key , self .secret_key , self .publisher_id or self .tenant_id ,
86
+ self .config ['collect' , 'contentTypes' ], runs ,
87
+ self .config ['collect' , 'maxThreads' ] or 50 ,
88
+ self .config ['collect' , 'retries' ] or 3 )
89
+ engine .run_once ()
90
+ last_received = datetime .datetime .now ()
91
+ while True :
92
+ try :
93
+ result = engine .get_result ()
94
+ except ValueError : # RustEngine throws this error when no logs are in the results recv queue
95
+ now = datetime .datetime .now ()
96
+ if now - last_received > datetime .timedelta (seconds = 60 ):
97
+ logging .error ("Timed out waiting for results from engine" )
98
+ break
99
+ last_received = now
100
+ except EOFError : # RustEngine throws this error when all content has been retrieved
101
+ logging .info ("Rust engine finished receiving all content" )
102
+ break
103
+ else :
104
+ content_json , content_id , content_expiration , content_type = result
105
+ self ._handle_retrieved_content (content_id = content_id , content_expiration = content_expiration ,
106
+ content_type = content_type , results = json .loads (content_json ))
107
+ self .logs_retrieved += 1
108
+ _ , _ , self .retries , self .errors_retrieving = engine .stop ()
109
+
75
110
def run_scheduled (self ):
76
111
"""
77
112
Run according to the schedule set in the config file. Collector will not exit unless manually stopped.
@@ -155,8 +190,8 @@ def _log_statistics(self):
155
190
"""
156
191
Write run statistics to log file / console.
157
192
"""
158
- logging .info ("Finished. Total logs retrieved: {}. Total logs with errors: {}. Run time: {}." . format (
159
- self .logs_retrieved , self .errors_retrieving , datetime .datetime .now () - self .run_started ))
193
+ logging .info ("Finished. Total logs retrieved: {}. Total retries: {}. Total logs with errors: {}. Run time: {}."
194
+ . format ( self .logs_retrieved , self . retries , self .errors_retrieving , datetime .datetime .now () - self .run_started ))
160
195
for interface in self ._all_enabled_interfaces :
161
196
logging .info ("{} reports: {} successfully sent, {} errors" .format (
162
197
interface .__class__ .__name__ , interface .successfully_sent , interface .unsuccessfully_sent ))
@@ -224,12 +259,15 @@ def _auto_subscribe(self):
224
259
logging .info ("Auto subscribing to: {}" .format (content_type ))
225
260
subscriber .set_sub_status (content_type = content_type , action = 'start' )
226
261
227
- def _get_all_available_content (self ):
262
+ def _get_needed_runs (self , content_types ):
228
263
"""
229
- Start a thread to retrieve available content blobs for each content type to be collected.
264
+ Return the start- and end times needed to retrieve content for each content type. If the timespan to retrieve
265
+ logs for exceeds 24 hours, we need to split it up into 24 hour runs (limit by Office API).
230
266
"""
267
+ runs = {}
231
268
end_time = datetime .datetime .now (datetime .timezone .utc )
232
- for content_type in self ._remaining_content_types .copy ():
269
+ for content_type in content_types :
270
+ runs [content_type ] = []
233
271
if self .config ['collect' , 'resume' ] and content_type in self ._last_run_times .keys ():
234
272
start_time = self ._last_run_times [content_type ]
235
273
logging .info ("{} - resuming from: {}" .format (content_type , start_time ))
@@ -244,15 +282,29 @@ def _get_all_available_content(self):
244
282
if end_time - start_time > datetime .timedelta (hours = 24 ):
245
283
split_start_time = start_time
246
284
split_end_time = start_time + datetime .timedelta (hours = 24 )
247
- self ._start_get_available_content_thread (
248
- content_type = content_type , start_time = split_start_time , end_time = split_end_time )
285
+ formatted_start_time = str (split_start_time ).replace (' ' , 'T' ).rsplit ('.' , maxsplit = 1 )[0 ]
286
+ formatted_end_time = str (split_end_time ).replace (' ' , 'T' ).rsplit ('.' , maxsplit = 1 )[0 ]
287
+ runs [content_type ].append ((formatted_start_time , formatted_end_time ))
249
288
start_time = split_end_time
250
289
self ._remaining_content_types .append (content_type )
251
290
else :
252
- self ._start_get_available_content_thread (
253
- content_type = content_type , start_time = start_time , end_time = end_time )
291
+ formatted_start_time = str (start_time ).replace (' ' , 'T' ).rsplit ('.' , maxsplit = 1 )[0 ]
292
+ formatted_end_time = str (end_time ).replace (' ' , 'T' ).rsplit ('.' , maxsplit = 1 )[0 ]
293
+ runs [content_type ].append ((formatted_start_time , formatted_end_time ))
254
294
break
255
295
self ._last_run_times [content_type ] = end_time .strftime ("%Y-%m-%dT%H:%M:%SZ" )
296
+ return runs
297
+
298
+ def _get_all_available_content (self ):
299
+ """
300
+ Start a thread to retrieve available content blobs for each content type to be collected.
301
+ """
302
+ runs = self ._get_needed_runs (content_types = self ._remaining_content_types .copy ())
303
+ for content_type , run_dates in runs .items ():
304
+ for run_date in run_dates :
305
+ start_time , end_time = run_date
306
+ self ._start_get_available_content_thread (
307
+ content_type = content_type , start_time = start_time , end_time = end_time )
256
308
257
309
def _start_get_available_content_thread (self , content_type , start_time , end_time ):
258
310
@@ -268,12 +320,10 @@ def _get_available_content(self, content_type, start_time, end_time):
268
320
"""
269
321
try :
270
322
logging .log (level = logging .DEBUG , msg = 'Getting available content for type: "{}"' .format (content_type ))
271
- formatted_end_time = str (end_time ).replace (' ' , 'T' ).rsplit ('.' , maxsplit = 1 )[0 ]
272
- formatted_start_time = str (start_time ).replace (' ' , 'T' ).rsplit ('.' , maxsplit = 1 )[0 ]
273
323
logging .info ("Retrieving {}. Start time: {}. End time: {}." .format (
274
- content_type , formatted_start_time , formatted_end_time ))
324
+ content_type , start_time , end_time ))
275
325
response = self .make_api_request (url = 'subscriptions/content?contentType={0}&startTime={1}&endTime={2}' .
276
- format (content_type , formatted_start_time , formatted_end_time ))
326
+ format (content_type , start_time , end_time ))
277
327
self .blobs_to_collect [content_type ] += response .json ()
278
328
while 'NextPageUri' in response .headers .keys () and response .headers ['NextPageUri' ]:
279
329
logging .log (level = logging .DEBUG , msg = 'Getting next page of content for type: "{0}"' .
@@ -351,24 +401,28 @@ def _retrieve_content(self, content_json, content_type, retries):
351
401
return
352
402
except Exception as e :
353
403
if retries :
404
+ self .retries += 1
354
405
time .sleep (self .config ['collect' , 'retryCooldown' ] or 3 )
355
406
return self ._retrieve_content (content_json = content_json , content_type = content_type , retries = retries - 1 )
356
407
else :
357
408
self .errors_retrieving += 1
358
409
logging .error ("Error retrieving content: {}" .format (e ))
359
410
return
360
411
else :
361
- self ._handle_retrieved_content (content_json = content_json , content_type = content_type , results = results )
412
+ self ._handle_retrieved_content (
413
+ content_id = content_json ['contentId' ], content_expiration = content_json ['contentExpiration' ],
414
+ content_type = content_type , results = results )
362
415
363
- def _handle_retrieved_content (self , content_json , content_type , results ):
416
+ def _handle_retrieved_content (self , content_id , content_expiration , content_type , results ):
364
417
"""
365
418
Check known logs, filter results and output what remains.
366
- :param content_json: JSON dict of the content blob as retrieved from the API (dict)
419
+ :param content_id: ID of content blob from API (str)
420
+ :param content_expiration: date string of expiration of content blob from API (str)
367
421
:param content_type: Type of API being retrieved for, e.g. 'Audit.Exchange' (str)
368
422
:param results: list of JSON
369
423
"""
370
424
if self .config ['collect' , 'skipKnownLogs' ]:
371
- self ._known_content [content_json [ 'contentId' ]] = content_json [ 'contentExpiration' ]
425
+ self ._known_content [content_id ] = content_expiration
372
426
for log in results .copy ():
373
427
if self .config ['collect' , 'skipKnownLogs' ]:
374
428
if log ['Id' ] in self .known_logs :
0 commit comments