Skip to content

Dinindu-Wick/nbdev-squ

 
 

Repository files navigation

SIEM Query Utils

Install

pip install https://github.com/wagov/nbdev-squ/archive/refs/heads/main.tar.gz

How to use

Note: If you create/use a Github Codespace on any of the wagov repos, SQU_CONFIG should be configured automatically.

Before using, config needs to be loaded into squ.core.cache, which can be done automatically from json in a keyvault by setting the env var SQU_CONFIG to "keyvault/tenantid".

export SQU_CONFIG="{{ keyvault }}/{{ tenantid }}"
from nbdev_squ import api, clients
import io, pandas

# Load workspace info from datalake blob storage
df = api.list_workspaces(fmt="df"); print(df.shape)

# Load workspace info from introspection of azure graph
df = api.list_securityinsights(); print(df.shape)

# Kusto query to Sentinel workspaces via Azure Lighthouse
df = api.query_all("SecurityIncident | take 20", fmt="df"); print(df.shape)

# Kusto queries to Sentinel workspaces via Azure Lighthouse (batches up to 100 queries at a time)
df = api.query_all(["SecurityAlert | take 20" for a in range(10)]); print(df.shape)

# Kusto query to ADX
#df = api.adxtable2df(api.adx_query("kusto query | take 20"))

# General azure cli cmd
api.azcli(["config", "set", "extension.use_dynamic_install=yes_without_prompt"])
print(len(api.azcli(["account", "list"])))

# Various pre-configured api clients

# RunZero
#response = clients.runzero.get("/export/org/assets.csv", params={"search": "has_public:t AND alive:t AND (protocol:rdp OR protocol:vnc OR protocol:teamviewer OR protocol:telnet OR protocol:ftp)"})
#runzero_assets = pandas.read_csv(io.StringIO(response.text))

# Jira
#issues = clients.jira.jql("updated > -1d")["issues"]

# AbuseIPDB
#clients.abuseipdb.check_ip("1.1.1.1")
badips_df = api.query_all("""
SecurityIncident
| where Classification == "TruePositive"
| mv-expand AlertIds
| project tostring(AlertIds)
| join SecurityAlert on $left.AlertIds == $right.SystemAlertId
| mv-expand todynamic(Entities)
| project Entities.Address
| where isnotempty(Entities_Address)
| distinct tostring(Entities_Address)
""", timespan=pandas.Timedelta("45d"))
df = api.query_all("find where ClientIP startswith '172.16.' | evaluate bag_unpack(pack_) | take 40000")
df = api.query_all("""union withsource="_table" *
| extend _ingestion_time_bin = bin(ingestion_time(), 1h)
| summarize take_any(*) by _table, _ingestion_time_bin
| project pack=pack_all(true)""")
import json
pandas.DataFrame(list(df["pack"].apply(json.loads)))

About

siem query utils nbdev edition

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Jupyter Notebook 59.6%
  • Python 37.6%
  • JavaScript 1.5%
  • CSS 1.1%
  • Shell 0.2%