Skip to content

Commit

Permalink
Add SQL output
Browse files Browse the repository at this point in the history
  • Loading branch information
ddbnl committed Apr 15, 2022
1 parent d5185a6 commit db8e04b
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ collect:
output:
file:
enabled: True
path: 'output.txt'
path: 'output'
separateByContentType: True
separator: ';'
File renamed without changes.
4 changes: 4 additions & 0 deletions ConfigExamples/fullConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ output:
enabled: False
workspaceId:
sharedKey:
sql:
enabled: False
cacheSize: 500000 # Amount of logs to cache until each SQL commit, larger=faster but eats more memory
chunkSize: 2000 # Amount of rows to write simultaneously to SQL, in most cases just set it as high as your DB allows. COUNT errors = too high
graylog:
enabled: False
address:
Expand Down
12 changes: 12 additions & 0 deletions ConfigExamples/sql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
collect:
contentTypes:
Audit.General: True
Audit.AzureActiveDirectory: True
Audit.Exchange: True
Audit.SharePoint: True
DLP.All: True
output:
sql:
enabled: True
cacheSize: 500000
chunkSize: 2000
24 changes: 21 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ If you have any issues or questions, feel free to create an issue in this repo.
- The following outputs are supported:
- Azure Analytics Workspace (OMS)
- PRTG Network Monitor
- ( Azure ) SQL server
- Graylog (or any other source that accepts a simple socket connection)
- Local file
- CSV Local file
- Power BI (indirectly through SQL or CSV)

Simply download the executable you need from the Windows or Linux folder and copy a config file from the ConfigExamples folder that suits your need:
- Windows:
Expand All @@ -40,11 +42,14 @@ See the following link for more info on the management APIs: https://msdn.micros

## Roadmap:

- Add AzureBlob and AzureTable outputs
- Automate onboarding as much as possible to make it easier to use
- Make a container that runs this script
- Create a tutorial for automatic onboarding + docker container for the easiest way to run this

## Latest changes:
- Added SQL output for Power BI
- Changed file to CSV output
- Added PRTG output
- Added filters
- Added YAML config file
Expand All @@ -63,14 +68,15 @@ See the following link for more info on the management APIs: https://msdn.micros
## Use cases:

- Ad-lib log retrieval;
- Scheduling regular execution to retrieve the full audit trail.
- Scheduling regular execution to retrieve the full audit trail in SIEM
- Output to PRTG for alerts on audit logs
- Output to (Azure) SQL / CSV for Power BI

## Features:

- Subscribe to the audit logs of your choice through the --interactive-subscriber switch, or automatically when collecting logs;
- Collect General, Exchange, Sharepoint, Azure active directory and/or DLP audit logs through the collector script;
- Output to file, PRTG, Azure Log Analytics or to a Graylog input (i.e. send the logs over a network socket).
- Output to CSV, PRTG, Azure Log Analytics, SQL or to a Graylog input (i.e. send the logs over a network socket).

## Requirements:
- Office365 tenant;
Expand Down Expand Up @@ -141,6 +147,18 @@ Probably at least 300 seconds. Run the script manually first to check how long i
- Match the interval of the sensor to the amount of hours of logs to retrieve. If your interval is 1 hour, hoursToCollect
in the config file should also be set to one hour.

### (optional) Using ( Azure ) SQL

If you are running this script to get audit events in an SQL database you will need an ODBC driver and a connection string
- The collector uses PYODBC, which needs an ODBC driver, examples on how to install this:
- On windows: pip install pywin; pywin install pyodbc
- On Linux: apt get install unixodbc-dev
- Connection string might look like this: "Driver={ODBC Driver 17 for SQL Server};Server=tcp:mydatabase.com,1433;Database=mydatabase;Uid=myuser;Pwd=mypassword;Encrypt
=yes;TrustServerCertificate=no;Connection Timeout=30;"
- Use SQL example config and pass --sql-string parameter when running the collector with your connection string



### (optional) Creating a Graylog input

If you are running this script to get audit events in Graylog you will need to create a Graylog input.
Expand Down
87 changes: 46 additions & 41 deletions Source/AuditLogCollector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from Interfaces import AzureOMSInterface, GraylogInterface, PRTGInterface, FileInterface
from Interfaces import AzureOMSInterface, SqlInterface, GraylogInterface, PRTGInterface, FileInterface
import AuditLogSubscriber
import ApiConnection
import os
Expand All @@ -17,7 +17,7 @@ class AuditLogCollector(ApiConnection.ApiConnection):

def __init__(self, content_types=None, resume=True, fallback_time=None, skip_known_logs=True,
log_path='collector.log', debug=False, auto_subscribe=False, max_threads=20, retries=3,
retry_cooldown=3, file_output=False, output_path=None, graylog_output=False, azure_oms_output=False,
retry_cooldown=3, file_output=False, sql_output=False, graylog_output=False, azure_oms_output=False,
prtg_output=False, **kwargs):
"""
Object that can retrieve all available content blobs for a list of content types and then retrieve those
Expand All @@ -32,7 +32,6 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno
:param log_path: path of file to log to (str)
:param debug: enable debug logging (Bool)
:param auto_subscribe: automatically subscribe to audit log feeds for which content is retrieved (Bool)
:param output_path: path to output retrieved logs to (None=no file output) (string)
:param graylog_output: Enable graylog Interface (Bool)
:param azure_oms_output: Enable Azure workspace analytics OMS Interface (Bool)
:param prtg_output: Enable PRTG output (Bool)
Expand All @@ -44,6 +43,7 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno
hours=23)
self.retries = retries
self.retry_cooldown = retry_cooldown
self.max_threads = max_threads
self.skip_known_logs = skip_known_logs
self.log_path = log_path
self.debug = debug
Expand All @@ -54,11 +54,12 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno
self.file_interface = FileInterface.FileInterface(**kwargs)
self.azure_oms_output = azure_oms_output
self.azure_oms_interface = AzureOMSInterface.AzureOMSInterface(**kwargs)
self.sql_output = sql_output
self.sql_interface = SqlInterface.SqlInterface(**kwargs)
self.graylog_output = graylog_output
self.graylog_interface = GraylogInterface.GraylogInterface(**kwargs)
self.prtg_output = prtg_output
self.prtg_interface = PRTGInterface.PRTGInterface(**kwargs)
self.max_threads = max_threads

self._last_run_times = {}
self._known_content = {}
Expand All @@ -72,6 +73,18 @@ def __init__(self, content_types=None, resume=True, fallback_time=None, skip_kno
self.logs_retrieved = 0
self.errors_retrieving = 0

@property
def all_interfaces(self):

return {self.file_interface: self.file_output, self.azure_oms_interface: self.azure_oms_output,
self.sql_interface: self.sql_output, self.graylog_interface: self.graylog_output,
self.prtg_interface: self.prtg_output}

@property
def all_enabled_interfaces(self):

return [interface for interface, enabled in self.all_interfaces.items() if enabled]

@property
def all_content_types(self):
"""
Expand Down Expand Up @@ -140,6 +153,7 @@ def _load_output_config(self, config):
if 'output' in config:
self._load_file_output_config(config=config)
self._load_azure_log_analytics_output_config(config=config)
self._load_sql_output_config(config=config)
self._load_graylog_output_config(config=config)
self._load_prtg_output_config(config=config)

Expand Down Expand Up @@ -169,6 +183,18 @@ def _load_azure_log_analytics_output_config(self, config):
if 'sharedKey' in config['output']['azureLogAnalytics']:
self.azure_oms_interface.shared_key = config['output']['azureLogAnalytics']['sharedKey']

def _load_sql_output_config(self, config):
"""
:param config: str
"""
if 'sql' in config['output']:
if 'enabled' in config['output']['sql']:
self.sql_output = config['output']['sql']['enabled']
if 'cacheSize' in config['output']['sql']:
self.sql_interface.cache_size = config['output']['sql']['cacheSize']
if 'chunkSize' in config['output']['sql']:
self.sql_interface.chunk_size = config['output']['sql']['chunkSize']

def _load_graylog_output_config(self, config):
"""
:param config: str
Expand Down Expand Up @@ -217,10 +243,9 @@ def _prepare_to_run(self):
self._clean_known_content()
self._clean_known_logs()
self.logs_retrieved = 0
self.graylog_interface.successfully_sent = 0
self.graylog_interface.unsuccessfully_sent = 0
self.azure_oms_interface.successfully_sent = 0
self.azure_oms_interface.unsuccessfully_sent = 0
for interface in self.all_enabled_interfaces:
interface.successfully_sent = 0
interface.unsuccessfully_sent = 0
self.run_started = datetime.datetime.now()

def run_once(self, start_time=None):
Expand All @@ -246,8 +271,6 @@ def _finish_run(self):
if self.resume and self._last_run_times:
with open('last_run_times', 'w') as ofile:
json.dump(fp=ofile, obj=self._last_run_times)
if self.prtg_output:
self.prtg_interface.output()
self._log_statistics()

def _log_statistics(self):
Expand All @@ -256,12 +279,9 @@ def _log_statistics(self):
"""
logging.info("Finished. Total logs retrieved: {}. Total logs with errors: {}. Run time: {}.".format(
self.logs_retrieved, self.errors_retrieving, datetime.datetime.now() - self.run_started))
if self.azure_oms_output:
logging.info("Azure OMS output report: {} successfully sent, {} errors".format(
self.azure_oms_interface.successfully_sent, self.azure_oms_interface.unsuccessfully_sent))
if self.graylog_output:
logging.info("Graylog output report: {} successfully sent, {} errors".format(
self.graylog_interface.successfully_sent, self.graylog_interface.unsuccessfully_sent))
for interface in self.all_enabled_interfaces:
logging.info("{} reports: {} successfully sent, {} errors".format(
interface.__class__.__name__, interface.successfully_sent, interface.unsuccessfully_sent))

def _get_last_run_times(self):
"""
Expand Down Expand Up @@ -371,25 +391,13 @@ def _get_available_content(self, content_type, start_time):

def _start_interfaces(self):

if self.file_output:
self.file_interface.start()
if self.azure_oms_output:
self.azure_oms_interface.start()
if self.prtg_output:
self.prtg_interface.start()
if self.graylog_output:
self.graylog_interface.start()
for interface in self.all_enabled_interfaces:
interface.start()

def _stop_interfaces(self):

if self.file_output:
self.file_interface.stop()
if self.azure_oms_output:
self.azure_oms_interface.stop()
if self.prtg_output:
self.prtg_interface.stop()
if self.graylog_output:
self.graylog_interface.stop()
for interface in self.all_enabled_interfaces:
interface.stop()

def _monitor_blobs_to_collect(self):
"""
Expand Down Expand Up @@ -477,14 +485,8 @@ def _output_results(self, results, content_type):
:param content_type: Type of API being retrieved for, e.g. 'Audit.Exchange' (str)
:param results: list of JSON
"""
if self.file_output:
self.file_interface.send_messages(*results, content_type=content_type)
if self.prtg_output:
self.prtg_interface.send_messages(*results, content_type=content_type)
if self.graylog_output:
self.graylog_interface.send_messages(*results, content_type=content_type)
if self.azure_oms_output:
self.azure_oms_interface.send_messages(*results, content_type=content_type)
for interface in self.all_enabled_interfaces:
interface.send_messages(*results, content_type=content_type)

def _check_filters(self, log, content_type):
"""
Expand Down Expand Up @@ -610,6 +612,8 @@ def known_content(self):
parser.add_argument('secret_key', type=str, help='Secret key generated by Azure application', action='store')
parser.add_argument('--config', metavar='config', type=str, help='Path to YAML config file',
action='store', dest='config')
parser.add_argument('--sql-string', metavar='sql_string', type=str,
help='Connection string for SQL output interface', action='store', dest='sql_string')
parser.add_argument('--interactive-subscriber', action='store_true',
help='Manually (un)subscribe to audit log feeds', dest='interactive_subscriber')
parser.add_argument('--general', action='store_true', help='Retrieve General content', dest='general')
Expand Down Expand Up @@ -681,7 +685,8 @@ def known_content(self):
azure_oms_output=argsdict['azure'], workspace_id=argsdict['azure_workspace'],
shared_key=argsdict['azure_key'],
gl_address=argsdict['graylog_addr'], gl_port=argsdict['graylog_port'],
graylog_output=argsdict['graylog'])
graylog_output=argsdict['graylog'],
sql_connection_string=argsdict['sql_string'])
if argsdict['config']:
collector.load_config(path=argsdict['config'])
collector.init_logging()
Expand Down
52 changes: 46 additions & 6 deletions Source/Interfaces/FileInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,67 @@
from . import _Interface
import collections
import pandas
import threading
import time


class FileInterface(_Interface.Interface):

def __init__(self, path='output', separate_by_content_type=True, separator=';', **kwargs):
def __init__(self, path='output', separate_by_content_type=True, separator=';', cache_size=500000, **kwargs):
"""
Interface to send logs to an Azure Log Analytics Workspace.
Interface to send logs to CSV file(s). Not every audit log has every possible column, so columns in the CSV
file might change over time. Therefore, the CSV file is recreated each time the cache_size is hit to insure
integrity, taking the performance hit.
"""
super().__init__(**kwargs)
self.path = path
self.paths = {}
self.separate_by_content_type = separate_by_content_type
self.separator = separator
self.cache_size = cache_size
self.results_cache = collections.defaultdict(collections.deque)

def _send_message(self, msg, content_type, **kwargs):
@property
def total_cache_length(self):

return sum([len(self.results_cache[k]) for k in self.results_cache.keys()])

def _path_for(self, content_type):

if content_type not in self.paths:
self.paths[content_type] = "{}_{}.csv".format(self.path, content_type.replace('.', '')) \
if self.separate_by_content_type else self.path
df = pandas.json_normalize(msg)
df.to_csv(self.paths[content_type], index=False, sep=self.separator, mode='a',
header=not os.path.exists(self.paths[content_type]))
return self.paths[content_type]

def _send_message(self, msg, content_type, **kwargs):

self.results_cache[content_type].append(msg)
if self.total_cache_length >= self.cache_size:
self._process_caches()

def exit_callback(self):

self._process_caches()

def _process_caches(self):

for content_type in self.results_cache.keys():
self._process_cache(content_type=content_type)

def _process_cache(self, content_type):

amount = len(self.results_cache[content_type])
try:
df = pandas.DataFrame(self.results_cache[content_type])
self.results_cache[content_type].clear()
if os.path.exists(self._path_for(content_type=content_type)):
existing_df = pandas.read_csv(self._path_for(content_type=content_type), sep=self.separator)
df = pandas.concat([existing_df, df])
logging.info("Writing {} logs of type {} to {}".format(amount, content_type, self._path_for(content_type)))
df.to_csv(self._path_for(content_type=content_type), index=False, sep=self.separator, mode='w',
header=not os.path.exists(self.paths[content_type]))
except Exception as e:
self.unsuccessfully_sent += amount
raise e
else:
self.successfully_sent += amount
5 changes: 5 additions & 0 deletions Source/Interfaces/PRTGInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ def output(self):
csr.error = "Python Script execution error: %s" % str(e)
print(csr.json_result)

def exit_callback(self):

super().exit_callback()
self.output()

Loading

0 comments on commit db8e04b

Please sign in to comment.