diff --git a/src/lil_pwny/__init__.py b/src/lil_pwny/__init__.py index 7d60adf..12261af 100644 --- a/src/lil_pwny/__init__.py +++ b/src/lil_pwny/__init__.py @@ -4,31 +4,30 @@ import tempfile import time import traceback -import warnings from datetime import timedelta from importlib import metadata from lil_pwny import password_audit, hashing -from lil_pwny.loggers import JSONLogger, StdoutLogger +from lil_pwny.custom_password_enhancer import CustomPasswordEnhancer from lil_pwny.exceptions import FileReadError -from lil_pwny.custom_list_enhancer import CustomListEnhancer +from lil_pwny.loggers import JSONLogger, StdoutLogger output_logger = JSONLogger -def init_logger(logging_type: str, debug: bool) -> JSONLogger or StdoutLogger: +def init_logger(logging_type: str, verbose: bool) -> JSONLogger or StdoutLogger: """ Create a logger object. Defaults to stdout if no option is given Args: logging_type: Type of logging to use - debug: Whether to use debug level logging or not + verbose: Whether to use verbose logging or not Returns: JSONLogger or StdoutLogger """ if not logging_type or logging_type == 'stdout': - return StdoutLogger(debug=debug) - return JSONLogger(debug=debug) + return StdoutLogger(debug=verbose) + return JSONLogger(debug=verbose) def get_readable_file_size(file_path: str) -> str: @@ -103,10 +102,10 @@ def main(): default=False, help='Obfuscate hashes from discovered matches by hashing with a random salt') parser.add_argument( - '--debug', - dest='debug', + '--verbose', + dest='verbose', action='store_true', - help='Turn on debug level logging') + help='Turn on verbose logging') args = parser.parse_args() hibp_file = args.hibp @@ -115,18 +114,18 @@ def main(): duplicates = args.d logging_type = args.logging_type obfuscate = args.obfuscate - debug = args.debug + verbose = args.verbose custom_enhance = args.custom_enhance hasher = hashing.Hashing() if logging_type == 'file': logging_type = 'stdout' - logger = init_logger(logging_type, debug) + logger = init_logger(logging_type, verbose) logger.log('WARNING', 'File output is no longer supported.' ' Select JSON output and redirect this to file. Defaulting to stdout') else: - logger = init_logger(logging_type, debug) + logger = init_logger(logging_type, verbose) logger.log('SUCCESS', 'Lil Pwny started execution') logger.log('INFO', f'Version: {project_metadata.get("version")}') @@ -177,37 +176,64 @@ def main(): try: logger.log('INFO', 'Loading custom password list...') with open(custom_passwords, 'r') as f: - custom_passwords = [line.strip() for line in f] + custom_passwords = [line.strip() for line in f if line.strip()] logger.log('SUCCESS', f'Loaded {len(custom_passwords)} custom passwords') if custom_enhance: + custom_count = 0 + variants_count = 0 logger.log('INFO', 'Enhancing custom password list by adding variations...') - custom_client = CustomListEnhancer(min_password_length=int(custom_enhance)) - custom_passwords = custom_client.enhance_list(custom_passwords) - logger.log('SUCCESS', f'Enhanced custom password list to {len(custom_passwords)} ' - f'plaintext passwords') - - logger.log('INFO', 'Converting custom passwords to NTLM hashes...') - custom_password_hashes = hasher.get_hashes(custom_passwords) - logger.log('SUCCESS', f'Generated {len(custom_password_hashes)} custom password hashes') - - with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: - for h in custom_password_hashes: - temp_file.write(f'{h}\n') - temp_file_path = temp_file.name - - logger.log('INFO', f'Comparing {ad_lines} Active Directory' - f' users against {len(custom_password_hashes)} custom password hashes...') - custom_matches = password_audit.search( - log_handler=logger, - hibp_hashes_filepath=temp_file_path, - ad_user_hashes=ad_users, - finding_type='custom', - obfuscated=obfuscate) - custom_count = len(custom_matches) - if logging_type != 'stdout': - for result in custom_matches: - logger.log('NOTIFY', result, notify_type='custom') + custom_client = CustomPasswordEnhancer(min_password_length=int(custom_enhance)) + for custom_pwd in custom_passwords: + logger.log('DEBUG', f'Generating variants for `{custom_pwd}`...') + temp_custom_passwords = custom_client.enhance_password(custom_pwd) + logger.log('DEBUG', 'Converting custom passwords to NTLM hashes...') + custom_password_hashes = hasher.get_hashes(temp_custom_passwords) + variants_count += len(custom_password_hashes) + logger.log('SUCCESS', f'Generated {len(custom_password_hashes)} variants for `{custom_pwd}`') + with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: + for h in custom_password_hashes: + temp_file.write(f'{h}\n') + temp_file_path = temp_file.name + logger.log('DEBUG', f'Custom hashes written to temp file {temp_file_path}') + + logger.log('INFO', f'Comparing {ad_lines} Active Directory' + f' users against {len(custom_password_hashes)} custom password hashes...') + custom_matches = password_audit.search( + log_handler=logger, + hibp_hashes_filepath=temp_file_path, + ad_user_hashes=ad_users, + finding_type='custom', + obfuscated=obfuscate) + os.remove(temp_file_path) + logger.log('DEBUG', f'Temp file {temp_file_path} deleted') + custom_count += len(custom_matches) + if logging_type != 'stdout': + for result in custom_matches: + logger.log('NOTIFY', result, notify_type='custom') + else: + logger.log('DEBUG', 'Converting custom passwords to NTLM hashes...') + custom_password_hashes = hasher.get_hashes(custom_passwords) + with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: + for h in custom_password_hashes: + temp_file.write(f'{h}\n') + temp_file_path = temp_file.name + logger.log('DEBUG', f'Custom hashes written to temp file {temp_file_path}') + + logger.log('INFO', f'Comparing {ad_lines} Active Directory' + f' users against {len(custom_password_hashes)} custom password hashes...') + custom_matches = password_audit.search( + log_handler=logger, + hibp_hashes_filepath=temp_file_path, + ad_user_hashes=ad_users, + finding_type='custom', + obfuscated=obfuscate) + os.remove(temp_file_path) + logger.log('DEBUG', f'Temp file {temp_file_path} deleted') + custom_count = len(custom_matches) + if logging_type != 'stdout': + for result in custom_matches: + logger.log('NOTIFY', result, notify_type='custom') except FileNotFoundError as e: logger.log('CRITICAL', f'Custom password file not found: {e.filename}') sys.exit(1) @@ -238,6 +264,9 @@ def main(): logger.log('SUCCESS', f'Total compromised passwords: {total_comp_count}') logger.log('SUCCESS', f'Passwords matching HIBP: {hibp_count}') logger.log('SUCCESS', f'Passwords matching custom password dictionary: {custom_count}') + if custom_enhance: + logger.log('SUCCESS', f'Variant passwords generated from {len(custom_passwords)} custom passwords:' + f' {variants_count}') logger.log('SUCCESS', f'Passwords duplicated (being used by multiple user accounts): {duplicate_count}') logger.log('SUCCESS', f'Time taken: {str(timedelta(seconds=time_taken))}') diff --git a/src/lil_pwny/custom_list_enhancer.py b/src/lil_pwny/custom_password_enhancer.py similarity index 63% rename from src/lil_pwny/custom_list_enhancer.py rename to src/lil_pwny/custom_password_enhancer.py index f68af72..8084f6b 100644 --- a/src/lil_pwny/custom_list_enhancer.py +++ b/src/lil_pwny/custom_password_enhancer.py @@ -1,27 +1,43 @@ from typing import List from datetime import datetime -import multiprocessing as mp -class CustomListEnhancer: - """ Enhances the custom password list with additional variations """ +class CustomPasswordEnhancer: + """ Enhances the custom password with additional variations + """ def __init__(self, min_password_length: int = 8): self.min_password_length = min_password_length def _deduplicate(self, password_list: List) -> List: - """ Remove duplicates from the list """ + """ Remove duplicates from the given list + """ + return list(set(password_list)) - def _add_leet_speak(self, password_list: List) -> List: - """ Add leet speak variations to the list """ + def _remove_too_short(self, password_list: List) -> List: + """ Remove passwords that do not match the length requirements + """ + + return [password for password in password_list if len(password) >= self.min_password_length] + + def _add_leet_speak(self, password: str) -> List[str]: + """ Add leetspeak variations to a single password""" + leet_speak_mappings = { - 'a': ['4', '@'], 'b': ['8'], 'c': ['<'], 'e': ['3'], - 'g': ['6'], 'h': ['#'], 'i': ['1', '!'], 'l': ['1'], - 'o': ['0'], 's': ['5', '$'], 't': ['7'], 'z': ['2'], + 'a': ['4', '@'], + 'b': ['8'], + 'e': ['3'], + 'g': ['6'], + 'i': ['1', '!'], + 'l': ['1'], + 'o': ['0'], + 's': ['5', '$'], + 't': ['7'], + 'z': ['2'], } - def _generate_variations(word: str, index: int = 0): + def _generate_variations(word: str, index: int = 0) -> List[str]: if index == len(word): return [word] current_char = word[index] @@ -34,16 +50,19 @@ def _generate_variations(word: str, index: int = 0): variations.extend(additional_variations) return variations - return [variation for password in password_list for variation in _generate_variations(password)] + return _generate_variations(password) def _capitalise_first_character(self, password_list: List) -> List: - """ Capitalise the first letter of each password """ + """ Capitalise the first letter of each password in the list + """ + return [password.capitalize() for password in password_list] def _pad_password(self, password_list: List) -> List: """ Pad the password with the original word and additional characters to meet the minimum password length - Characters include alphanumeric characters and special characters + Characters include alphanumeric characters and special characters """ + output_list = [] for password in password_list: if len(password) < self.min_password_length: @@ -60,43 +79,33 @@ def _pad_password(self, password_list: List) -> List: return output_list def _append_years(self, password_list: List) -> List: - """ Append years from 1950 to ten years greater than the current year to each password in the list """ + """ Append years from 1950 to ten years greater than the current year to each password in the list + """ + current_year = datetime.now().year - end_year = current_year + 10 + end_year = current_year years = [str(year) for year in range(1950, end_year + 1)] return [password + year for password in password_list for year in years] def _append_special_characters(self, password_list: List) -> List: - """ Append special characters commonly used in passwords to the end of each password in the list """ + """ Append special characters commonly used in passwords to the end of each password in the list + """ + special_characters = ['!', '@', '#', '$', '%', '&', '*', '?'] return [password + char for password in password_list for char in special_characters] - def enhance_list(self, password_list: List) -> List: - """ Enhance a plaintext password list with additional variations """ - with mp.Pool() as pool: - chunks = [password_list[i::mp.cpu_count()] for i in range(mp.cpu_count())] - results = pool.map(self._enhance_chunk, chunks) - enhanced_list = [item for sublist in results for item in sublist] - return self._deduplicate(enhanced_list) - - def _enhance_chunk(self, password_list: List) -> List: - """ Helper method to enhance a chunk of passwords """ - enhanced_list = password_list - enhanced_list += self._add_leet_speak(password_list) + def enhance_password(self, password: str) -> List: + """ Enhance a plaintext password list with additional variations + + Args: + password: The custom password to enhance + Returns: + Enhanced list of passwords + """ + + enhanced_list = self._add_leet_speak(password) enhanced_list += self._capitalise_first_character(enhanced_list) enhanced_list += self._pad_password(enhanced_list) enhanced_list += self._append_years(enhanced_list) enhanced_list += self._append_special_characters(enhanced_list) - return enhanced_list - - -def main(): - password_list = ['ocado', '123456'] - enhancer = CustomListEnhancer(15) - enhanced_list = enhancer.enhance_list(password_list) - print(enhanced_list) - print(len(enhanced_list)) - - -if __name__ == '__main__': - main() + return self._deduplicate(self._remove_too_short(enhanced_list)) diff --git a/src/lil_pwny/exceptions.py b/src/lil_pwny/exceptions.py index 6d655a7..da15bbe 100644 --- a/src/lil_pwny/exceptions.py +++ b/src/lil_pwny/exceptions.py @@ -9,12 +9,12 @@ def __init__(self, message): class HashingError(Exception): - """Base class for exceptions in this module.""" + """ Base class for exceptions in this module.""" pass class FileReadError(HashingError): - """Exception raised for errors in the input file. + """ Exception raised for errors in the input file. Attributes: filename: The name of the input file which caused the error. diff --git a/src/lil_pwny/hashing.py b/src/lil_pwny/hashing.py index 154caa2..0ed76d5 100644 --- a/src/lil_pwny/hashing.py +++ b/src/lil_pwny/hashing.py @@ -17,7 +17,7 @@ def __init__(self): @staticmethod def _hashify(input_string: str) -> str: - """Converts the input string to a NTLM hash and returns the hash + """ Converts the input string to a NTLM hash and returns the hash Args: input_string: string to be converted to NTLM hash @@ -33,7 +33,7 @@ def _process_password(self, password: str) -> str: return f'{self._hashify(password)}:0:{password}' def get_hashes(self, password_list: List[str]) -> List[str]: - """Converts a list of strings to NTLM hashes using multiprocessing + """ Converts a list of strings to NTLM hashes using multiprocessing Args: password_list: list of strings to convert to NTLM hashes @@ -45,7 +45,7 @@ def get_hashes(self, password_list: List[str]) -> List[str]: return hashes def obfuscate(self, input_hash: str) -> str: - """Further hashes the input NTLM hash with a random salt + """ Further hashes the input NTLM hash with a random salt Args: input_hash: hash to be obfuscated diff --git a/src/lil_pwny/password_audit.py b/src/lil_pwny/password_audit.py index e7673ac..ad0d6d9 100644 --- a/src/lil_pwny/password_audit.py +++ b/src/lil_pwny/password_audit.py @@ -52,7 +52,7 @@ def import_users(filepath: str) -> Dict[str, List[str]]: def find_duplicates(ad_hash_dict: Dict, obfuscated: bool) -> List[dict]: - """Returns users using the same hash in the input file. Outputs + """ Returns users using the same hash in the input file. Outputs a file grouping all users of a hash being used more than once Args: @@ -130,7 +130,7 @@ def search(log_handler: JSONLogger or StdoutLogger, def _nonblank_lines(f: TextIO) -> str: - """Generator to filter out blank lines from the input list + """ Generator to filter out blank lines from the input list Args: f: input file @@ -215,7 +215,7 @@ def _multi_pro_search(log_handler: JSONLogger or StdoutLogger, worker_function: callable, worker_function_args: List, skip_lines: int = 0) -> List[dict]: - """Breaks the [HIBP|custom passwords] file into blocks and uses multiprocessing to iterate through them and return + """ Breaks the [HIBP|custom passwords] file into blocks and uses multiprocessing to iterate through them and return any matches against AD users. Args: @@ -239,8 +239,8 @@ def _multi_pro_search(log_handler: JSONLogger or StdoutLogger, jobs = _divide_blocks(hibp_filepath, 1024 * 1024 * block_size, skip_lines) jobs = [list(j) + [worker_function, encoding] + worker_function_args for j in jobs] - log_handler.log('INFO', f'Split into {len(jobs)} parallel jobs ') - log_handler.log('INFO', f'{cores} cores being utilised') + log_handler.log('DEBUG', f'Split into {len(jobs)} parallel jobs ') + log_handler.log('DEBUG', f'{cores} cores being utilised') pool = mp.Pool(cores - 1, maxtasksperchild=1000)