Skip to content

Commit

Permalink
Refactor for custom password enhancement logic
Browse files Browse the repository at this point in the history
  • Loading branch information
PaperMtn committed Aug 13, 2024
1 parent db07b97 commit 1fc245a
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 91 deletions.
109 changes: 69 additions & 40 deletions src/lil_pwny/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,30 @@
import tempfile
import time
import traceback
import warnings
from datetime import timedelta
from importlib import metadata

from lil_pwny import password_audit, hashing
from lil_pwny.loggers import JSONLogger, StdoutLogger
from lil_pwny.custom_password_enhancer import CustomPasswordEnhancer
from lil_pwny.exceptions import FileReadError
from lil_pwny.custom_list_enhancer import CustomListEnhancer
from lil_pwny.loggers import JSONLogger, StdoutLogger

output_logger = JSONLogger


def init_logger(logging_type: str, debug: bool) -> JSONLogger or StdoutLogger:
def init_logger(logging_type: str, verbose: bool) -> JSONLogger or StdoutLogger:
""" Create a logger object. Defaults to stdout if no option is given
Args:
logging_type: Type of logging to use
debug: Whether to use debug level logging or not
verbose: Whether to use verbose logging or not
Returns:
JSONLogger or StdoutLogger
"""

if not logging_type or logging_type == 'stdout':
return StdoutLogger(debug=debug)
return JSONLogger(debug=debug)
return StdoutLogger(debug=verbose)
return JSONLogger(debug=verbose)


def get_readable_file_size(file_path: str) -> str:
Expand Down Expand Up @@ -103,10 +102,10 @@ def main():
default=False,
help='Obfuscate hashes from discovered matches by hashing with a random salt')
parser.add_argument(
'--debug',
dest='debug',
'--verbose',
dest='verbose',
action='store_true',
help='Turn on debug level logging')
help='Turn on verbose logging')

args = parser.parse_args()
hibp_file = args.hibp
Expand All @@ -115,18 +114,18 @@ def main():
duplicates = args.d
logging_type = args.logging_type
obfuscate = args.obfuscate
debug = args.debug
verbose = args.verbose
custom_enhance = args.custom_enhance

hasher = hashing.Hashing()

if logging_type == 'file':
logging_type = 'stdout'
logger = init_logger(logging_type, debug)
logger = init_logger(logging_type, verbose)
logger.log('WARNING', 'File output is no longer supported.'
' Select JSON output and redirect this to file. Defaulting to stdout')
else:
logger = init_logger(logging_type, debug)
logger = init_logger(logging_type, verbose)

logger.log('SUCCESS', 'Lil Pwny started execution')
logger.log('INFO', f'Version: {project_metadata.get("version")}')
Expand Down Expand Up @@ -177,37 +176,64 @@ def main():
try:
logger.log('INFO', 'Loading custom password list...')
with open(custom_passwords, 'r') as f:
custom_passwords = [line.strip() for line in f]
custom_passwords = [line.strip() for line in f if line.strip()]
logger.log('SUCCESS', f'Loaded {len(custom_passwords)} custom passwords')

if custom_enhance:
custom_count = 0
variants_count = 0
logger.log('INFO', 'Enhancing custom password list by adding variations...')
custom_client = CustomListEnhancer(min_password_length=int(custom_enhance))
custom_passwords = custom_client.enhance_list(custom_passwords)
logger.log('SUCCESS', f'Enhanced custom password list to {len(custom_passwords)} '
f'plaintext passwords')

logger.log('INFO', 'Converting custom passwords to NTLM hashes...')
custom_password_hashes = hasher.get_hashes(custom_passwords)
logger.log('SUCCESS', f'Generated {len(custom_password_hashes)} custom password hashes')

with tempfile.NamedTemporaryFile('w', delete=False) as temp_file:
for h in custom_password_hashes:
temp_file.write(f'{h}\n')
temp_file_path = temp_file.name

logger.log('INFO', f'Comparing {ad_lines} Active Directory'
f' users against {len(custom_password_hashes)} custom password hashes...')
custom_matches = password_audit.search(
log_handler=logger,
hibp_hashes_filepath=temp_file_path,
ad_user_hashes=ad_users,
finding_type='custom',
obfuscated=obfuscate)
custom_count = len(custom_matches)
if logging_type != 'stdout':
for result in custom_matches:
logger.log('NOTIFY', result, notify_type='custom')
custom_client = CustomPasswordEnhancer(min_password_length=int(custom_enhance))
for custom_pwd in custom_passwords:
logger.log('DEBUG', f'Generating variants for `{custom_pwd}`...')
temp_custom_passwords = custom_client.enhance_password(custom_pwd)
logger.log('DEBUG', 'Converting custom passwords to NTLM hashes...')
custom_password_hashes = hasher.get_hashes(temp_custom_passwords)
variants_count += len(custom_password_hashes)
logger.log('SUCCESS', f'Generated {len(custom_password_hashes)} variants for `{custom_pwd}`')
with tempfile.NamedTemporaryFile('w', delete=False) as temp_file:
for h in custom_password_hashes:
temp_file.write(f'{h}\n')
temp_file_path = temp_file.name
logger.log('DEBUG', f'Custom hashes written to temp file {temp_file_path}')

logger.log('INFO', f'Comparing {ad_lines} Active Directory'
f' users against {len(custom_password_hashes)} custom password hashes...')
custom_matches = password_audit.search(
log_handler=logger,
hibp_hashes_filepath=temp_file_path,
ad_user_hashes=ad_users,
finding_type='custom',
obfuscated=obfuscate)
os.remove(temp_file_path)
logger.log('DEBUG', f'Temp file {temp_file_path} deleted')
custom_count += len(custom_matches)
if logging_type != 'stdout':
for result in custom_matches:
logger.log('NOTIFY', result, notify_type='custom')
else:
logger.log('DEBUG', 'Converting custom passwords to NTLM hashes...')
custom_password_hashes = hasher.get_hashes(custom_passwords)
with tempfile.NamedTemporaryFile('w', delete=False) as temp_file:
for h in custom_password_hashes:
temp_file.write(f'{h}\n')
temp_file_path = temp_file.name
logger.log('DEBUG', f'Custom hashes written to temp file {temp_file_path}')

logger.log('INFO', f'Comparing {ad_lines} Active Directory'
f' users against {len(custom_password_hashes)} custom password hashes...')
custom_matches = password_audit.search(
log_handler=logger,
hibp_hashes_filepath=temp_file_path,
ad_user_hashes=ad_users,
finding_type='custom',
obfuscated=obfuscate)
os.remove(temp_file_path)
logger.log('DEBUG', f'Temp file {temp_file_path} deleted')
custom_count = len(custom_matches)
if logging_type != 'stdout':
for result in custom_matches:
logger.log('NOTIFY', result, notify_type='custom')
except FileNotFoundError as e:
logger.log('CRITICAL', f'Custom password file not found: {e.filename}')
sys.exit(1)
Expand Down Expand Up @@ -238,6 +264,9 @@ def main():
logger.log('SUCCESS', f'Total compromised passwords: {total_comp_count}')
logger.log('SUCCESS', f'Passwords matching HIBP: {hibp_count}')
logger.log('SUCCESS', f'Passwords matching custom password dictionary: {custom_count}')
if custom_enhance:
logger.log('SUCCESS', f'Variant passwords generated from {len(custom_passwords)} custom passwords:'
f' {variants_count}')
logger.log('SUCCESS', f'Passwords duplicated (being used by multiple user accounts): {duplicate_count}')
logger.log('SUCCESS', f'Time taken: {str(timedelta(seconds=time_taken))}')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
from typing import List
from datetime import datetime
import multiprocessing as mp


class CustomListEnhancer:
""" Enhances the custom password list with additional variations """
class CustomPasswordEnhancer:
""" Enhances the custom password with additional variations
"""

def __init__(self, min_password_length: int = 8):
self.min_password_length = min_password_length

def _deduplicate(self, password_list: List) -> List:
""" Remove duplicates from the list """
""" Remove duplicates from the given list
"""

return list(set(password_list))

def _add_leet_speak(self, password_list: List) -> List:
""" Add leet speak variations to the list """
def _remove_too_short(self, password_list: List) -> List:
""" Remove passwords that do not match the length requirements
"""

return [password for password in password_list if len(password) >= self.min_password_length]

def _add_leet_speak(self, password: str) -> List[str]:
""" Add leetspeak variations to a single password"""

leet_speak_mappings = {
'a': ['4', '@'], 'b': ['8'], 'c': ['<'], 'e': ['3'],
'g': ['6'], 'h': ['#'], 'i': ['1', '!'], 'l': ['1'],
'o': ['0'], 's': ['5', '$'], 't': ['7'], 'z': ['2'],
'a': ['4', '@'],
'b': ['8'],
'e': ['3'],
'g': ['6'],
'i': ['1', '!'],
'l': ['1'],
'o': ['0'],
's': ['5', '$'],
't': ['7'],
'z': ['2'],
}

def _generate_variations(word: str, index: int = 0):
def _generate_variations(word: str, index: int = 0) -> List[str]:
if index == len(word):
return [word]
current_char = word[index]
Expand All @@ -34,16 +50,19 @@ def _generate_variations(word: str, index: int = 0):
variations.extend(additional_variations)
return variations

return [variation for password in password_list for variation in _generate_variations(password)]
return _generate_variations(password)

def _capitalise_first_character(self, password_list: List) -> List:
""" Capitalise the first letter of each password """
""" Capitalise the first letter of each password in the list
"""

return [password.capitalize() for password in password_list]

def _pad_password(self, password_list: List) -> List:
""" Pad the password with the original word and additional characters to meet the minimum password length
Characters include alphanumeric characters and special characters
Characters include alphanumeric characters and special characters
"""

output_list = []
for password in password_list:
if len(password) < self.min_password_length:
Expand All @@ -60,43 +79,33 @@ def _pad_password(self, password_list: List) -> List:
return output_list

def _append_years(self, password_list: List) -> List:
""" Append years from 1950 to ten years greater than the current year to each password in the list """
""" Append years from 1950 to ten years greater than the current year to each password in the list
"""

current_year = datetime.now().year
end_year = current_year + 10
end_year = current_year
years = [str(year) for year in range(1950, end_year + 1)]
return [password + year for password in password_list for year in years]

def _append_special_characters(self, password_list: List) -> List:
""" Append special characters commonly used in passwords to the end of each password in the list """
""" Append special characters commonly used in passwords to the end of each password in the list
"""

special_characters = ['!', '@', '#', '$', '%', '&', '*', '?']
return [password + char for password in password_list for char in special_characters]

def enhance_list(self, password_list: List) -> List:
""" Enhance a plaintext password list with additional variations """
with mp.Pool() as pool:
chunks = [password_list[i::mp.cpu_count()] for i in range(mp.cpu_count())]
results = pool.map(self._enhance_chunk, chunks)
enhanced_list = [item for sublist in results for item in sublist]
return self._deduplicate(enhanced_list)

def _enhance_chunk(self, password_list: List) -> List:
""" Helper method to enhance a chunk of passwords """
enhanced_list = password_list
enhanced_list += self._add_leet_speak(password_list)
def enhance_password(self, password: str) -> List:
""" Enhance a plaintext password list with additional variations
Args:
password: The custom password to enhance
Returns:
Enhanced list of passwords
"""

enhanced_list = self._add_leet_speak(password)
enhanced_list += self._capitalise_first_character(enhanced_list)
enhanced_list += self._pad_password(enhanced_list)
enhanced_list += self._append_years(enhanced_list)
enhanced_list += self._append_special_characters(enhanced_list)
return enhanced_list


def main():
password_list = ['ocado', '123456']
enhancer = CustomListEnhancer(15)
enhanced_list = enhancer.enhance_list(password_list)
print(enhanced_list)
print(len(enhanced_list))


if __name__ == '__main__':
main()
return self._deduplicate(self._remove_too_short(enhanced_list))
4 changes: 2 additions & 2 deletions src/lil_pwny/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ def __init__(self, message):


class HashingError(Exception):
"""Base class for exceptions in this module."""
""" Base class for exceptions in this module."""
pass


class FileReadError(HashingError):
"""Exception raised for errors in the input file.
""" Exception raised for errors in the input file.
Attributes:
filename: The name of the input file which caused the error.
Expand Down
6 changes: 3 additions & 3 deletions src/lil_pwny/hashing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self):

@staticmethod
def _hashify(input_string: str) -> str:
"""Converts the input string to a NTLM hash and returns the hash
""" Converts the input string to a NTLM hash and returns the hash
Args:
input_string: string to be converted to NTLM hash
Expand All @@ -33,7 +33,7 @@ def _process_password(self, password: str) -> str:
return f'{self._hashify(password)}:0:{password}'

def get_hashes(self, password_list: List[str]) -> List[str]:
"""Converts a list of strings to NTLM hashes using multiprocessing
""" Converts a list of strings to NTLM hashes using multiprocessing
Args:
password_list: list of strings to convert to NTLM hashes
Expand All @@ -45,7 +45,7 @@ def get_hashes(self, password_list: List[str]) -> List[str]:
return hashes

def obfuscate(self, input_hash: str) -> str:
"""Further hashes the input NTLM hash with a random salt
""" Further hashes the input NTLM hash with a random salt
Args:
input_hash: hash to be obfuscated
Expand Down
10 changes: 5 additions & 5 deletions src/lil_pwny/password_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def import_users(filepath: str) -> Dict[str, List[str]]:


def find_duplicates(ad_hash_dict: Dict, obfuscated: bool) -> List[dict]:
"""Returns users using the same hash in the input file. Outputs
""" Returns users using the same hash in the input file. Outputs
a file grouping all users of a hash being used more than once
Args:
Expand Down Expand Up @@ -130,7 +130,7 @@ def search(log_handler: JSONLogger or StdoutLogger,


def _nonblank_lines(f: TextIO) -> str:
"""Generator to filter out blank lines from the input list
""" Generator to filter out blank lines from the input list
Args:
f: input file
Expand Down Expand Up @@ -215,7 +215,7 @@ def _multi_pro_search(log_handler: JSONLogger or StdoutLogger,
worker_function: callable,
worker_function_args: List,
skip_lines: int = 0) -> List[dict]:
"""Breaks the [HIBP|custom passwords] file into blocks and uses multiprocessing to iterate through them and return
""" Breaks the [HIBP|custom passwords] file into blocks and uses multiprocessing to iterate through them and return
any matches against AD users.
Args:
Expand All @@ -239,8 +239,8 @@ def _multi_pro_search(log_handler: JSONLogger or StdoutLogger,
jobs = _divide_blocks(hibp_filepath, 1024 * 1024 * block_size, skip_lines)
jobs = [list(j) + [worker_function, encoding] + worker_function_args for j in jobs]

log_handler.log('INFO', f'Split into {len(jobs)} parallel jobs ')
log_handler.log('INFO', f'{cores} cores being utilised')
log_handler.log('DEBUG', f'Split into {len(jobs)} parallel jobs ')
log_handler.log('DEBUG', f'{cores} cores being utilised')

pool = mp.Pool(cores - 1, maxtasksperchild=1000)

Expand Down

0 comments on commit 1fc245a

Please sign in to comment.