diff --git a/commands.py b/commands.py index ede960e..32df33a 100644 --- a/commands.py +++ b/commands.py @@ -10,372 +10,372 @@ class Commands: - _label_color_cache = {} - _current_color_index = 0 - - def __init__(self, repos): - self._last_printed_lines = 0 - self.repos = repos - - # `mud info` command implementation - def info(self, repos: Dict[str, List[str]]) -> None: - table = self._get_table() - for path, labels in repos.items(): - output = subprocess.check_output('git status --porcelain', shell=True, text=True, cwd=path) - files = output.splitlines() - - formatted_path = self._get_formatted_path(path) - branch = self._get_branch_status(path) - status = self._get_status_string(files) - colored_labels = self._get_formatted_labels(labels, utils.GLYPHS["label"]) - - # Sync with origin status - ahead_behind_cmd = subprocess.run('git rev-list --left-right --count HEAD...@{upstream}', shell=True, text=True, cwd=path, capture_output=True) - stdout = ahead_behind_cmd.stdout.strip().split() - origin_sync = '' - if len(stdout) >= 2: - ahead, behind = stdout[0], stdout[1] - if ahead and ahead != '0': - origin_sync += f'{BRIGHT_GREEN}{utils.GLYPHS["ahead"]} {ahead}{RESET}' - if behind and behind != '0': - if origin_sync: - origin_sync += ' ' - origin_sync += f'{BRIGHT_BLUE}{utils.GLYPHS["behind"]} {behind}{RESET}' - - if not origin_sync.strip(): - origin_sync = f'{BLUE}{utils.GLYPHS["synced"]}{RESET}' - - table.add_row([formatted_path, branch, origin_sync, status, colored_labels]) - - self._print_table(table) - - # `mud status` command implementation - def status(self, repos: Dict[str, List[str]]): - table = self._get_table() - for path, labels in repos.items(): - output = subprocess.check_output('git status --porcelain', shell=True, text=True, cwd=path) - files = output.splitlines() - - formatted_path = self._get_formatted_path(path) - branch = self._get_branch_status(path) - status = self._get_status_string(files) - - colored_output = [] - - for file in files[:5]: - file_status = file[:2].strip() - filename = file[3:].strip() - parts = filename.split(os.sep) - if file_status == 'M': - color = YELLOW - elif file_status == 'A': - color = GREEN - elif file_status == 'R': - color = BLUE - elif file_status == 'D': - color = RED - else: - color = CYAN - - shortened_parts = [part[0] if index < len(parts) - 1 and part else f'{RESET}{color}{part}' for index, part in enumerate(parts)] - filename = os.sep.join(shortened_parts) - colored_output.append(f'{color}{DIM}{filename}{RESET}') - if len(files) > 5: - colored_output.append('...') - - table.add_row([formatted_path, branch, status, ', '.join(colored_output)]) - - self._print_table(table) - - # `mud labels` command implementation - def labels(self, repos: Dict[str, List[str]]): - table = self._get_table() - for path, labels in repos.items(): - formatted_path = self._get_formatted_path(path) - colored_labels = self._get_formatted_labels(labels, utils.GLYPHS["label"]) - table.add_row([formatted_path, colored_labels]) - - self._print_table(table) - - # `mud log` command implementation - def log(self, repos: Dict[str, List[str]]) -> None: - table = self._get_table() - for path in repos.keys(): - formatted_path = self._get_formatted_path(path) - branch = self._get_branch_status(path) - author = self._get_authors_name(path) - commit = self._get_commit_message(path, 35) - - # Commit time - commit_time_cmd = subprocess.run('git log -1 --pretty=format:%cd --date=relative', shell=True, text=True, cwd=path, capture_output=True) - commit_time = commit_time_cmd.stdout.strip() - - table.add_row([formatted_path, branch, author, commit_time, commit]) - - self._print_table(table) - - # `mud branch` command implementation - def branches(self, repos: Dict[str, List[str]]) -> None: - table = self._get_table() - all_branches = {} - - # Preparing branches for sorting to display them in the right order. - for path in repos.keys(): - raw_branches = [line.strip() for line in subprocess.check_output('git branch', shell=True, text=True, cwd=path).split('\n') if line.strip()] - for branch in raw_branches: - branch = branch.replace(' ', '').replace('*', '') - if branch not in all_branches: - all_branches[branch] = 0 - all_branches[branch] += 1 - branch_counter = Counter(all_branches) - - for path, labels in repos.items(): - formatted_path = self._get_formatted_path(path) - branches = subprocess.check_output('git branch', shell=True, text=True, cwd=path).splitlines() - current_branch = next((branch.lstrip('* ') for branch in branches if branch.startswith('*')), None) - branches = [branch.lstrip('* ') for branch in branches] - sorted_branches = sorted(branches, key=lambda x: branch_counter.get(x, 0), reverse=True) - - if current_branch and current_branch in sorted_branches: - sorted_branches.remove(current_branch) - sorted_branches.insert(0, current_branch) - - formatted_branches = self._get_formatted_branches(sorted_branches, current_branch) - table.add_row([formatted_path, formatted_branches]) - - self._print_table(table) - - # `mud tags` command implementation - def tags(self, repos: Dict[str, List[str]]): - table = self._get_table() - - for path, labels in repos.items(): - formatted_path = self._get_formatted_path(path) - tags = [line.strip() for line in subprocess.check_output('git tag', shell=True, text=True, cwd=path).splitlines() if line.strip()] - tags = [f"{utils.GLYPHS['tag']} {tag}" for tag in tags] - tags = ' '.join(tags) - table.add_row([formatted_path, tags]) - - self._print_table(table) - - # `mud ` when run_async = 0 and run_table = 0 - def run_ordered(self, repos: List[str], command: [str]) -> None: - command_str = ' '.join(command) - for path in repos: - result = subprocess.run(command_str, shell=True, cwd=path, capture_output=True, text=True) - print(f'{self._get_formatted_path(path)}{RESET} {command_str}{RESET} {RED + utils.GLYPHS["failed"] if result.stderr else GREEN + utils.GLYPHS["finished"]}{RESET}') - if result.stdout and not result.stdout.strip().isspace(): - print(result.stdout.strip()) - if result.stderr and not result.stderr.strip().isspace(): - print(result.stderr.strip()) - - # `mud ` when run_async = 1 and run_table = 0 - async def run_async(self, repos: List[str], command: List[str]) -> None: - sem = asyncio.Semaphore(len(repos)) - - async def run_process(path: str) -> None: - async with sem: - process = await asyncio.create_subprocess_exec(*command, cwd=path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = await process.communicate() - print(f'{self._get_formatted_path(path)}{GRAY}>{RESET} {YELLOW}{" ".join(command)}{RESET}') - if stderr: - print(stderr.decode()) - if stdout and not stdout.isspace(): - print(stdout.decode()) - - await asyncio.gather(*(run_process(path) for path in repos)) - - # `mud ` when run_async = 1 and run_table = 1 - async def run_async_table_view(self, repos: List[str], command: List[str]) -> None: - sem = asyncio.Semaphore(len(repos)) - table = {repo: ['', ''] for repo in repos} - - async def task(repo: str) -> None: - async with sem: - await self._run_process(repo, table, command) - - tasks = [asyncio.create_task(task(repo)) for repo in repos] - await asyncio.gather(*tasks) - - async def _run_process(self, repo_path: str, table: Dict[str, List[str]], command: List[str]) -> None: - process = await asyncio.create_subprocess_exec(*command, cwd=repo_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - table[repo_path] = ['', f'{YELLOW}{utils.GLYPHS["running"]}'] - - while True: - line = await process.stdout.readline() - if not line: - break - line = line.decode().strip() - line = table[repo_path][0] if not line.strip() else line - table[repo_path] = [line, f'{YELLOW}{utils.GLYPHS["running"]}'] - self._print_process(table) - - return_code = await process.wait() - if return_code == 0: - status = f'{GREEN}{utils.GLYPHS["finished"]}' - else: - status = f'{RED}{utils.GLYPHS["failed"]} Code: {return_code}' - - table[repo_path] = [table[repo_path][0], status] - self._print_process(table) - - def _print_process(self, info: Dict[str, List[str]]) -> None: - table = self._get_table() - - for path, (line, status) in info.items(): - formatted_path = self._get_formatted_path(path) - table.add_row([formatted_path, line, status]) - - table_str = self._table_to_str(table) - num_lines = table_str.count('\n') + 1 - - if hasattr(self, '_last_printed_lines') and self._last_printed_lines > 0: - for _ in range(self._last_printed_lines): - print('\033[A\033[K', end='') - - print(f'{table_str}\n', end='') - self._last_printed_lines = num_lines - - def _print_table(self, table: PrettyTable): - table = self._table_to_str(table) - if len(table) != 0: - print(table) - - @staticmethod - def _get_status_string(files: List[str]): - modified, added, removed, moved = 0, 0, 0, 0 - - for file in files: - file = file.lstrip() - if file.startswith('M'): - modified += 1 - elif file.startswith('A') or file.startswith('??'): - added += 1 - elif file.startswith('D'): - removed += 1 - elif file.startswith('R'): - moved += 1 - status = '' - if added: - status += f'{BRIGHT_GREEN}{added} {utils.GLYPHS["added"]}{RESET} ' - if modified: - status += f'{YELLOW}{modified} {utils.GLYPHS["modified"]}{RESET} ' - if moved: - status += f'{BLUE}{moved} {utils.GLYPHS["moved"]}{RESET} ' - if removed: - status += f'{RED}{removed} {utils.GLYPHS["removed"]}{RESET} ' - if not files: - status = f'{GREEN}{utils.GLYPHS["clear"]}{RESET}' - return status - - @staticmethod - def _table_to_str(table: PrettyTable) -> str: - table = table.get_string() - table = '\n'.join(line.lstrip() for line in table.splitlines()) - return table - - @staticmethod - def _get_table() -> PrettyTable: - return PrettyTable(border=False, header=False, style=PLAIN_COLUMNS, align='l') - - @staticmethod - def _get_formatted_path(path: str) -> str: - return f'{DIM}{GRAY}../{RESET}{DIM}{path}{RESET}' - - @staticmethod - def _get_branch_status(path: str) -> str: - branch_cmd = subprocess.run('git rev-parse --abbrev-ref HEAD', shell=True, text=True, cwd=path, capture_output=True) - branch_stdout = branch_cmd.stdout.strip() - if branch_stdout == 'master' or branch_stdout == 'main': - branch = f'{YELLOW}{utils.GLYPHS["master"]}{RESET}{utils.GLYPHS["space"]}{branch_stdout}' - elif branch_stdout == 'develop': - branch = f'{GREEN}{utils.GLYPHS["feature"]}{RESET}{utils.GLYPHS["space"]}{branch_stdout}' - elif '/' in branch_stdout: - branch_path = branch_stdout.split('/') - icon = Commands._get_branch_icon(branch_path[0]) - branch_color = Commands._get_branch_color(branch_path[0]) - branch = f'{branch_color}{icon}{RESET}{utils.GLYPHS["space"]}{branch_path[0]}{RESET}/{BOLD}{("/".join(branch_path[1:]))}' - else: - branch = f'{CYAN}{utils.GLYPHS["branch"]}{RESET}{utils.GLYPHS["space"]}{branch_stdout}' - return branch - - @staticmethod - def _get_authors_name(path: str) -> str: - cmd = subprocess.run('git log -1 --pretty=format:%an', shell=True, text=True, cwd=path, capture_output=True) - git_config_user_cmd = subprocess.run(['git', 'config', 'user.name'], text=True, capture_output=True) - committer_color = '' if cmd.stdout.strip() == git_config_user_cmd.stdout.strip() else DIM - author = cmd.stdout.strip() - author = author[:20] + '...' if len(author) > 20 else author - author = f'{committer_color}{author}{RESET}' - return author - - @staticmethod - def _get_commit_message(path: str, max_chars: int) -> str: - cmd = subprocess.run('git log -1 --pretty=format:%s', shell=True, text=True, cwd=path, capture_output=True) - log = cmd.stdout.strip() - log = log[:max_chars] + '...' if len(log) > max_chars else log - return log - - @staticmethod - def _get_formatted_labels(labels: List[str], glyph: str) -> str: - if len(labels) == 0: - return '' - colored_label = '' - for label in labels: - color_index = Commands._get_color_index(label) % len(TEXT) - colored_label += f'{TEXT[color_index + 3]}{glyph} {label}{RESET} ' - return colored_label - - @staticmethod - def _get_formatted_branches(branches: List[str], current_branch: str) -> str: - if len(branches) == 0: - return '' - - simplify_branches = utils.settings.config['mud'].getboolean('simplify_branches') is True - output = '' - for branch in branches: - is_origin = branch.startswith('origin/') - branch = branch.replace('origin/', '') if is_origin else branch - current_prefix = f'{UNDERLINE}' if current_branch == branch else '' - current_prefix = current_prefix + DIM if is_origin else current_prefix - origin_prefix = f'{MAGENTA}{DIM}o/' if is_origin else '' - color = WHITE - icon = utils.GLYPHS['branch'] - if branch == 'master' or branch == 'main': - color = YELLOW - icon = f'{utils.GLYPHS["master"]}' - elif branch == 'develop': - color = GREEN - icon = f'{utils.GLYPHS["feature"]}' - elif '/' in branch: - parts = branch.split('/') - end_dim = '' if is_origin else END_DIM - branch = '/'.join([p[0] for p in parts[:-1]] + [end_dim + ( - parts[-1][:10] + '..' if len(parts[-1]) > 10 else parts[-1])]) if simplify_branches else '/'.join( - [p for p in parts[:-1]] + [end_dim + (parts[-1][:10] + '..' if len(parts[-1]) > 10 else parts[-1])]) - branch = f'{DIM}{branch}' - color = Commands._get_branch_color(parts[0]) - icon = Commands._get_branch_icon(parts[0]) - output += f'{current_prefix}{color}{icon}{utils.GLYPHS["space"]}{origin_prefix}{color}{branch}{RESET} ' - return output - - @staticmethod - def _get_branch_icon(branch_prefix: str) -> str: - return f'{utils.GLYPHS["bugfix"]}' if branch_prefix in ['bugfix', 'bug', 'hotfix'] else \ - f'{utils.GLYPHS["release"]}' if branch_prefix == 'release' else \ - f'{utils.GLYPHS["feature"]}' if branch_prefix in ['feature', 'feat', 'develop'] else \ - f'{utils.GLYPHS["branch"]}' - - @staticmethod - def _get_branch_color(branch_name: str) -> str: - return RED if branch_name in ['bugfix', 'bug', 'hotfix'] else \ - BLUE if branch_name == 'release' else \ - GREEN if branch_name in ['feature', 'feat', 'develop'] else \ - GREEN - - @staticmethod - def _get_color_index(label: str) -> (str, str): - if label not in Commands._label_color_cache: - Commands._label_color_cache[label] = Commands._current_color_index - Commands._current_color_index = (Commands._current_color_index + 1) % len(BKG) - return Commands._label_color_cache[label] + _label_color_cache = {} + _current_color_index = 0 + + def __init__(self, repos): + self._last_printed_lines = 0 + self.repos = repos + + # `mud info` command implementation + def info(self, repos: Dict[str, List[str]]) -> None: + table = self._get_table() + for path, labels in repos.items(): + output = subprocess.check_output('git status --porcelain', shell=True, text=True, cwd=path) + files = output.splitlines() + + formatted_path = self._get_formatted_path(path) + branch = self._get_branch_status(path) + status = self._get_status_string(files) + colored_labels = self._get_formatted_labels(labels, utils.GLYPHS["label"]) + + # Sync with origin status + ahead_behind_cmd = subprocess.run('git rev-list --left-right --count HEAD...@{upstream}', shell=True, text=True, cwd=path, capture_output=True) + stdout = ahead_behind_cmd.stdout.strip().split() + origin_sync = '' + if len(stdout) >= 2: + ahead, behind = stdout[0], stdout[1] + if ahead and ahead != '0': + origin_sync += f'{BRIGHT_GREEN}{utils.GLYPHS["ahead"]} {ahead}{RESET}' + if behind and behind != '0': + if origin_sync: + origin_sync += ' ' + origin_sync += f'{BRIGHT_BLUE}{utils.GLYPHS["behind"]} {behind}{RESET}' + + if not origin_sync.strip(): + origin_sync = f'{BLUE}{utils.GLYPHS["synced"]}{RESET}' + + table.add_row([formatted_path, branch, origin_sync, status, colored_labels]) + + self._print_table(table) + + # `mud status` command implementation + def status(self, repos: Dict[str, List[str]]): + table = self._get_table() + for path, labels in repos.items(): + output = subprocess.check_output('git status --porcelain', shell=True, text=True, cwd=path) + files = output.splitlines() + + formatted_path = self._get_formatted_path(path) + branch = self._get_branch_status(path) + status = self._get_status_string(files) + + colored_output = [] + + for file in files[:5]: + file_status = file[:2].strip() + filename = file[3:].strip() + parts = filename.split(os.sep) + if file_status == 'M': + color = YELLOW + elif file_status == 'A': + color = GREEN + elif file_status == 'R': + color = BLUE + elif file_status == 'D': + color = RED + else: + color = CYAN + + shortened_parts = [part[0] if index < len(parts) - 1 and part else f'{RESET}{color}{part}' for index, part in enumerate(parts)] + filename = os.sep.join(shortened_parts) + colored_output.append(f'{color}{DIM}{filename}{RESET}') + if len(files) > 5: + colored_output.append('...') + + table.add_row([formatted_path, branch, status, ', '.join(colored_output)]) + + self._print_table(table) + + # `mud labels` command implementation + def labels(self, repos: Dict[str, List[str]]): + table = self._get_table() + for path, labels in repos.items(): + formatted_path = self._get_formatted_path(path) + colored_labels = self._get_formatted_labels(labels, utils.GLYPHS["label"]) + table.add_row([formatted_path, colored_labels]) + + self._print_table(table) + + # `mud log` command implementation + def log(self, repos: Dict[str, List[str]]) -> None: + table = self._get_table() + for path in repos.keys(): + formatted_path = self._get_formatted_path(path) + branch = self._get_branch_status(path) + author = self._get_authors_name(path) + commit = self._get_commit_message(path, 35) + + # Commit time + commit_time_cmd = subprocess.run('git log -1 --pretty=format:%cd --date=relative', shell=True, text=True, cwd=path, capture_output=True) + commit_time = commit_time_cmd.stdout.strip() + + table.add_row([formatted_path, branch, author, commit_time, commit]) + + self._print_table(table) + + # `mud branch` command implementation + def branches(self, repos: Dict[str, List[str]]) -> None: + table = self._get_table() + all_branches = {} + + # Preparing branches for sorting to display them in the right order. + for path in repos.keys(): + raw_branches = [line.strip() for line in subprocess.check_output('git branch', shell=True, text=True, cwd=path).split('\n') if line.strip()] + for branch in raw_branches: + branch = branch.replace(' ', '').replace('*', '') + if branch not in all_branches: + all_branches[branch] = 0 + all_branches[branch] += 1 + branch_counter = Counter(all_branches) + + for path, labels in repos.items(): + formatted_path = self._get_formatted_path(path) + branches = subprocess.check_output('git branch', shell=True, text=True, cwd=path).splitlines() + current_branch = next((branch.lstrip('* ') for branch in branches if branch.startswith('*')), None) + branches = [branch.lstrip('* ') for branch in branches] + sorted_branches = sorted(branches, key=lambda x: branch_counter.get(x, 0), reverse=True) + + if current_branch and current_branch in sorted_branches: + sorted_branches.remove(current_branch) + sorted_branches.insert(0, current_branch) + + formatted_branches = self._get_formatted_branches(sorted_branches, current_branch) + table.add_row([formatted_path, formatted_branches]) + + self._print_table(table) + + # `mud tags` command implementation + def tags(self, repos: Dict[str, List[str]]): + table = self._get_table() + + for path, labels in repos.items(): + formatted_path = self._get_formatted_path(path) + tags = [line.strip() for line in subprocess.check_output('git tag', shell=True, text=True, cwd=path).splitlines() if line.strip()] + tags = [f"{utils.GLYPHS['tag']} {tag}" for tag in tags] + tags = ' '.join(tags) + table.add_row([formatted_path, tags]) + + self._print_table(table) + + # `mud ` when run_async = 0 and run_table = 0 + def run_ordered(self, repos: List[str], command: [str]) -> None: + command_str = ' '.join(command) + for path in repos: + result = subprocess.run(command_str, shell=True, cwd=path, capture_output=True, text=True) + print(f'{self._get_formatted_path(path)}{RESET} {command_str}{RESET} {RED + utils.GLYPHS["failed"] if result.stderr else GREEN + utils.GLYPHS["finished"]}{RESET}') + if result.stdout and not result.stdout.strip().isspace(): + print(result.stdout.strip()) + if result.stderr and not result.stderr.strip().isspace(): + print(result.stderr.strip()) + + # `mud ` when run_async = 1 and run_table = 0 + async def run_async(self, repos: List[str], command: List[str]) -> None: + sem = asyncio.Semaphore(len(repos)) + + async def run_process(path: str) -> None: + async with sem: + process = await asyncio.create_subprocess_exec(*command, cwd=path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = await process.communicate() + print(f'{self._get_formatted_path(path)}{GRAY}>{RESET} {YELLOW}{" ".join(command)}{RESET}') + if stderr: + print(stderr.decode()) + if stdout and not stdout.isspace(): + print(stdout.decode()) + + await asyncio.gather(*(run_process(path) for path in repos)) + + # `mud ` when run_async = 1 and run_table = 1 + async def run_async_table_view(self, repos: List[str], command: List[str]) -> None: + sem = asyncio.Semaphore(len(repos)) + table = {repo: ['', ''] for repo in repos} + + async def task(repo: str) -> None: + async with sem: + await self._run_process(repo, table, command) + + tasks = [asyncio.create_task(task(repo)) for repo in repos] + await asyncio.gather(*tasks) + + async def _run_process(self, repo_path: str, table: Dict[str, List[str]], command: List[str]) -> None: + process = await asyncio.create_subprocess_exec(*command, cwd=repo_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + table[repo_path] = ['', f'{YELLOW}{utils.GLYPHS["running"]}'] + + while True: + line = await process.stdout.readline() + if not line: + break + line = line.decode().strip() + line = table[repo_path][0] if not line.strip() else line + table[repo_path] = [line, f'{YELLOW}{utils.GLYPHS["running"]}'] + self._print_process(table) + + return_code = await process.wait() + if return_code == 0: + status = f'{GREEN}{utils.GLYPHS["finished"]}' + else: + status = f'{RED}{utils.GLYPHS["failed"]} Code: {return_code}' + + table[repo_path] = [table[repo_path][0], status] + self._print_process(table) + + def _print_process(self, info: Dict[str, List[str]]) -> None: + table = self._get_table() + + for path, (line, status) in info.items(): + formatted_path = self._get_formatted_path(path) + table.add_row([formatted_path, line, status]) + + table_str = self._table_to_str(table) + num_lines = table_str.count('\n') + 1 + + if hasattr(self, '_last_printed_lines') and self._last_printed_lines > 0: + for _ in range(self._last_printed_lines): + print('\033[A\033[K', end='') + + print(f'{table_str}\n', end='') + self._last_printed_lines = num_lines + + def _print_table(self, table: PrettyTable): + table = self._table_to_str(table) + if len(table) != 0: + print(table) + + @staticmethod + def _get_status_string(files: List[str]): + modified, added, removed, moved = 0, 0, 0, 0 + + for file in files: + file = file.lstrip() + if file.startswith('M'): + modified += 1 + elif file.startswith('A') or file.startswith('??'): + added += 1 + elif file.startswith('D'): + removed += 1 + elif file.startswith('R'): + moved += 1 + status = '' + if added: + status += f'{BRIGHT_GREEN}{added} {utils.GLYPHS["added"]}{RESET} ' + if modified: + status += f'{YELLOW}{modified} {utils.GLYPHS["modified"]}{RESET} ' + if moved: + status += f'{BLUE}{moved} {utils.GLYPHS["moved"]}{RESET} ' + if removed: + status += f'{RED}{removed} {utils.GLYPHS["removed"]}{RESET} ' + if not files: + status = f'{GREEN}{utils.GLYPHS["clear"]}{RESET}' + return status + + @staticmethod + def _table_to_str(table: PrettyTable) -> str: + table = table.get_string() + table = '\n'.join(line.lstrip() for line in table.splitlines()) + return table + + @staticmethod + def _get_table() -> PrettyTable: + return PrettyTable(border=False, header=False, style=PLAIN_COLUMNS, align='l') + + @staticmethod + def _get_formatted_path(path: str) -> str: + return f'{DIM}{GRAY}../{RESET}{DIM}{path}{RESET}' + + @staticmethod + def _get_branch_status(path: str) -> str: + branch_cmd = subprocess.run('git rev-parse --abbrev-ref HEAD', shell=True, text=True, cwd=path, capture_output=True) + branch_stdout = branch_cmd.stdout.strip() + if branch_stdout == 'master' or branch_stdout == 'main': + branch = f'{YELLOW}{utils.GLYPHS["master"]}{RESET}{utils.GLYPHS["space"]}{branch_stdout}' + elif branch_stdout == 'develop': + branch = f'{GREEN}{utils.GLYPHS["feature"]}{RESET}{utils.GLYPHS["space"]}{branch_stdout}' + elif '/' in branch_stdout: + branch_path = branch_stdout.split('/') + icon = Commands._get_branch_icon(branch_path[0]) + branch_color = Commands._get_branch_color(branch_path[0]) + branch = f'{branch_color}{icon}{RESET}{utils.GLYPHS["space"]}{branch_path[0]}{RESET}/{BOLD}{("/".join(branch_path[1:]))}' + else: + branch = f'{CYAN}{utils.GLYPHS["branch"]}{RESET}{utils.GLYPHS["space"]}{branch_stdout}' + return branch + + @staticmethod + def _get_authors_name(path: str) -> str: + cmd = subprocess.run('git log -1 --pretty=format:%an', shell=True, text=True, cwd=path, capture_output=True) + git_config_user_cmd = subprocess.run(['git', 'config', 'user.name'], text=True, capture_output=True) + committer_color = '' if cmd.stdout.strip() == git_config_user_cmd.stdout.strip() else DIM + author = cmd.stdout.strip() + author = author[:20] + '...' if len(author) > 20 else author + author = f'{committer_color}{author}{RESET}' + return author + + @staticmethod + def _get_commit_message(path: str, max_chars: int) -> str: + cmd = subprocess.run('git log -1 --pretty=format:%s', shell=True, text=True, cwd=path, capture_output=True) + log = cmd.stdout.strip() + log = log[:max_chars] + '...' if len(log) > max_chars else log + return log + + @staticmethod + def _get_formatted_labels(labels: List[str], glyph: str) -> str: + if len(labels) == 0: + return '' + colored_label = '' + for label in labels: + color_index = Commands._get_color_index(label) % len(TEXT) + colored_label += f'{TEXT[color_index + 3]}{glyph} {label}{RESET} ' + return colored_label + + @staticmethod + def _get_formatted_branches(branches: List[str], current_branch: str) -> str: + if len(branches) == 0: + return '' + + simplify_branches = utils.settings.config['mud'].getboolean('simplify_branches') is True + output = '' + for branch in branches: + is_origin = branch.startswith('origin/') + branch = branch.replace('origin/', '') if is_origin else branch + current_prefix = f'{UNDERLINE}' if current_branch == branch else '' + current_prefix = current_prefix + DIM if is_origin else current_prefix + origin_prefix = f'{MAGENTA}{DIM}o/' if is_origin else '' + color = WHITE + icon = utils.GLYPHS['branch'] + if branch == 'master' or branch == 'main': + color = YELLOW + icon = f'{utils.GLYPHS["master"]}' + elif branch == 'develop': + color = GREEN + icon = f'{utils.GLYPHS["feature"]}' + elif '/' in branch: + parts = branch.split('/') + end_dim = '' if is_origin else END_DIM + branch = '/'.join([p[0] for p in parts[:-1]] + [end_dim + ( + parts[-1][:10] + '..' if len(parts[-1]) > 10 else parts[-1])]) if simplify_branches else '/'.join( + [p for p in parts[:-1]] + [end_dim + (parts[-1][:10] + '..' if len(parts[-1]) > 10 else parts[-1])]) + branch = f'{DIM}{branch}' + color = Commands._get_branch_color(parts[0]) + icon = Commands._get_branch_icon(parts[0]) + output += f'{current_prefix}{color}{icon}{utils.GLYPHS["space"]}{origin_prefix}{color}{branch}{RESET} ' + return output + + @staticmethod + def _get_branch_icon(branch_prefix: str) -> str: + return f'{utils.GLYPHS["bugfix"]}' if branch_prefix in ['bugfix', 'bug', 'hotfix'] else \ + f'{utils.GLYPHS["release"]}' if branch_prefix == 'release' else \ + f'{utils.GLYPHS["feature"]}' if branch_prefix in ['feature', 'feat', 'develop'] else \ + f'{utils.GLYPHS["branch"]}' + + @staticmethod + def _get_branch_color(branch_name: str) -> str: + return RED if branch_name in ['bugfix', 'bug', 'hotfix'] else \ + BLUE if branch_name == 'release' else \ + GREEN if branch_name in ['feature', 'feat', 'develop'] else \ + GREEN + + @staticmethod + def _get_color_index(label: str) -> (str, str): + if label not in Commands._label_color_cache: + Commands._label_color_cache[label] = Commands._current_color_index + Commands._current_color_index = (Commands._current_color_index + 1) % len(BKG) + return Commands._label_color_cache[label] diff --git a/config.py b/config.py index 15e7d49..1872c1a 100644 --- a/config.py +++ b/config.py @@ -9,110 +9,110 @@ class Config: - def __init__(self): - self.data = {} - - def save(self, file_path: str) -> None: - root = ElementTree.Element('mud') - - def _filter_labels(label: str): - return bool(re.match(r'^\w+$', label)) - - for path, labels in self.data.items(): - dir_element = ElementTree.SubElement(root, 'dir') - dir_element.set('path', path) - - valid_labels = [label for label in labels if _filter_labels(label)] - if valid_labels: - if len(valid_labels) == 1: - formatted_labels = valid_labels[0] - else: - formatted_labels = ', '.join(valid_labels) - dir_element.set('label', formatted_labels) - - rough_string = ElementTree.tostring(root) - parsed = minidom.parseString(rough_string) - pretty_xml = parsed.toprettyxml(indent='\t') - - with open(file_path, 'w') as file: - file.write(pretty_xml) - - def find(self) -> None: - if os.path.exists(utils.CONFIG_FILE_NAME): - self.load(utils.CONFIG_FILE_NAME) - return - - directory = os.getcwd() - - current_path = directory - while os.path.dirname(current_path) != current_path: - os.chdir(current_path) - if os.path.exists(utils.CONFIG_FILE_NAME): - self.load(utils.CONFIG_FILE_NAME) - return utils.CONFIG_FILE_NAME - current_path = os.path.dirname(current_path) - - if utils.settings.mud_settings['config_path'] != '' and os.path.exists( - utils.settings.mud_settings['config_path']): - directory = os.path.dirname(utils.settings.mud_settings['config_path']) - os.chdir(directory) - os.environ['PWD'] = directory - self.load(utils.CONFIG_FILE_NAME) - return - - utils.print_error(f'{BOLD}.mudconfig{RESET} file was not found. Type `mud init` to create configuration file.') - return - - def load(self, file_path: str) -> None: - self.data = {} - tree = ElementTree.parse(file_path) - root = tree.getroot() - for dir_element in root.findall('dir'): - path = dir_element.get('path') - if not os.path.isdir(path): - utils.print_error(f'Invalid path {BOLD}{path}{RESET}.') - continue - - if not os.path.isdir(os.path.join(path, '.git')): - utils.print_error(f'{BOLD}.git{RESET} directory not found at target "{path}".') - continue - - labels = [label.strip() for label in dir_element.get('label', '').split(',') if label.strip()] - self.data[path] = labels - - def all(self) -> Dict[str, List[str]]: - return self.data - - def paths(self) -> List[str]: - return list(self.data.keys()) - - def with_label(self, label: str) -> Dict[str, List[str]]: - if label == '': - return self.all() - result = {} - for path, labels in self.data.items(): - if label in labels: - result[path] = labels - return result - - def add_label(self, path: str, label: str) -> None: - if path is None: - path = label - label = None - if not os.path.isdir(path): - utils.print_error(f'Invalid path {BOLD}{path}{RESET}. Remember that path should be relative.') - return - if path not in self.data: - self.data[path] = [] - if label is not None and label not in self.data[path]: - self.data[path].append(label) - - def remove_path(self, path: str) -> None: - if path in self.data: - del self.data[path] - - def remove_label(self, path: str, label: str) -> None: - if path in self.data and label in self.data[path]: - self.data[path].remove(label) - if not self.data[path]: - del self.data[path] + def __init__(self): + self.data = {} + + def save(self, file_path: str) -> None: + root = ElementTree.Element('mud') + + def _filter_labels(label: str): + return bool(re.match(r'^\w+$', label)) + + for path, labels in self.data.items(): + dir_element = ElementTree.SubElement(root, 'dir') + dir_element.set('path', path) + + valid_labels = [label for label in labels if _filter_labels(label)] + if valid_labels: + if len(valid_labels) == 1: + formatted_labels = valid_labels[0] + else: + formatted_labels = ', '.join(valid_labels) + dir_element.set('label', formatted_labels) + + rough_string = ElementTree.tostring(root) + parsed = minidom.parseString(rough_string) + pretty_xml = parsed.toprettyxml(indent='\t') + + with open(file_path, 'w') as file: + file.write(pretty_xml) + + def find(self) -> None: + if os.path.exists(utils.CONFIG_FILE_NAME): + self.load(utils.CONFIG_FILE_NAME) + return + + directory = os.getcwd() + + current_path = directory + while os.path.dirname(current_path) != current_path: + os.chdir(current_path) + if os.path.exists(utils.CONFIG_FILE_NAME): + self.load(utils.CONFIG_FILE_NAME) + return utils.CONFIG_FILE_NAME + current_path = os.path.dirname(current_path) + + if utils.settings.mud_settings['config_path'] != '' and os.path.exists( + utils.settings.mud_settings['config_path']): + directory = os.path.dirname(utils.settings.mud_settings['config_path']) + os.chdir(directory) + os.environ['PWD'] = directory + self.load(utils.CONFIG_FILE_NAME) + return + + utils.print_error(f'{BOLD}.mudconfig{RESET} file was not found. Type `mud init` to create configuration file.') + return + + def load(self, file_path: str) -> None: + self.data = {} + tree = ElementTree.parse(file_path) + root = tree.getroot() + for dir_element in root.findall('dir'): + path = dir_element.get('path') + if not os.path.isdir(path): + utils.print_error(f'Invalid path {BOLD}{path}{RESET}.') + continue + + if not os.path.isdir(os.path.join(path, '.git')): + utils.print_error(f'{BOLD}.git{RESET} directory not found at target "{path}".') + continue + + labels = [label.strip() for label in dir_element.get('label', '').split(',') if label.strip()] + self.data[path] = labels + + def all(self) -> Dict[str, List[str]]: + return self.data + + def paths(self) -> List[str]: + return list(self.data.keys()) + + def with_label(self, label: str) -> Dict[str, List[str]]: + if label == '': + return self.all() + result = {} + for path, labels in self.data.items(): + if label in labels: + result[path] = labels + return result + + def add_label(self, path: str, label: str) -> None: + if path is None: + path = label + label = None + if not os.path.isdir(path): + utils.print_error(f'Invalid path {BOLD}{path}{RESET}. Remember that path should be relative.') + return + if path not in self.data: + self.data[path] = [] + if label is not None and label not in self.data[path]: + self.data[path].append(label) + + def remove_path(self, path: str) -> None: + if path in self.data: + del self.data[path] + + def remove_label(self, path: str, label: str) -> None: + if path in self.data and label in self.data[path]: + self.data[path].remove(label) + if not self.data[path]: + del self.data[path] diff --git a/main.py b/main.py index feaa80f..7192602 100755 --- a/main.py +++ b/main.py @@ -5,10 +5,10 @@ from mud import Mud if __name__ == '__main__': - try: - utils.settings = settings.Settings(utils.SETTINGS_FILE_NAME) - utils.set_up() - mud = Mud() - mud.run() - except KeyboardInterrupt: - utils.print_error('Stopped by user.') \ No newline at end of file + try: + utils.settings = settings.Settings(utils.SETTINGS_FILE_NAME) + utils.set_up() + mud = Mud() + mud.run() + except KeyboardInterrupt: + utils.print_error('Stopped by user.') \ No newline at end of file diff --git a/mud.py b/mud.py index e80743c..1b26634 100644 --- a/mud.py +++ b/mud.py @@ -18,240 +18,240 @@ DIVERGED_ATTR = '-d', '--diverged' # Commands COMMANDS = { - 'help': ['help', '--help', '-h'], - 'update': ['update'], - 'configure': ['configure', 'config'], - 'version': ['--version', '-v', 'version'], - 'set-global': ['--set-global'], - 'init': ['init'], - 'add': ['add', 'a'], - 'remove': ['remove', 'rm'], - 'info': ['info', 'i'], - 'log': ['log', 'l'], - 'tags': ['tags', 'tag', 't'], - 'labels': ['labels', 'lb'], - 'status': ['status', 'st'], - 'branches': ['branch', 'branches', 'br'], + 'help': ['help', '--help', '-h'], + 'update': ['update'], + 'configure': ['configure', 'config'], + 'version': ['--version', '-v', 'version'], + 'set-global': ['--set-global'], + 'init': ['init'], + 'add': ['add', 'a'], + 'remove': ['remove', 'rm'], + 'info': ['info', 'i'], + 'log': ['log', 'l'], + 'tags': ['tags', 'tag', 't'], + 'labels': ['labels', 'lb'], + 'status': ['status', 'st'], + 'branches': ['branch', 'branches', 'br'], } class Mud: - def __init__(self): - self.cmd_runner = None - self.config = None - self.parser = self._create_parser() + def __init__(self): + self.cmd_runner = None + self.config = None + self.parser = self._create_parser() - @staticmethod - def _create_parser() -> ArgumentParser: - parser = argparse.ArgumentParser(description=f'{BOLD}mud{RESET} allows you to run commands in multiple repositories.') - subparsers = parser.add_subparsers(dest='command') + @staticmethod + def _create_parser() -> ArgumentParser: + parser = argparse.ArgumentParser(description=f'{BOLD}mud{RESET} allows you to run commands in multiple repositories.') + subparsers = parser.add_subparsers(dest='command') - subparsers.add_parser(COMMANDS['configure'][0], aliases=COMMANDS['configure'][1:], help='Runs the interactive configuration wizard.') - subparsers.add_parser(COMMANDS['update'][0], aliases=COMMANDS['update'][1:], help='Update mud to the latest version.') - subparsers.add_parser(COMMANDS['init'][0], aliases=COMMANDS['init'][1:], help=f'Initializes the {BOLD}.mudconfig{RESET} and adds all repositories in this directory to {BOLD}.mudconfig{RESET}.') - subparsers.add_parser(COMMANDS['info'][0], aliases=COMMANDS['info'][1:], help='Displays branch divergence and working directory changes') - subparsers.add_parser(COMMANDS['log'][0], aliases=COMMANDS['log'][1:], help='Displays log of latest commit messages for all repositories in a table view.') - subparsers.add_parser(COMMANDS['tags'][0], aliases=COMMANDS['tags'][1:], help='Displays git tags in repositories.') - subparsers.add_parser(COMMANDS['labels'][0], aliases=COMMANDS['labels'][1:], help='Displays mud labels across repositories.') - subparsers.add_parser(COMMANDS['status'][0], aliases=COMMANDS['status'][1:], help='Displays working directory changes.') - subparsers.add_parser(COMMANDS['branches'][0], aliases=COMMANDS['branches'][1:], help='Displays all branches in repositories.') + subparsers.add_parser(COMMANDS['configure'][0], aliases=COMMANDS['configure'][1:], help='Runs the interactive configuration wizard.') + subparsers.add_parser(COMMANDS['update'][0], aliases=COMMANDS['update'][1:], help='Update mud to the latest version.') + subparsers.add_parser(COMMANDS['init'][0], aliases=COMMANDS['init'][1:], help=f'Initializes the {BOLD}.mudconfig{RESET} and adds all repositories in this directory to {BOLD}.mudconfig{RESET}.') + subparsers.add_parser(COMMANDS['info'][0], aliases=COMMANDS['info'][1:], help='Displays branch divergence and working directory changes') + subparsers.add_parser(COMMANDS['log'][0], aliases=COMMANDS['log'][1:], help='Displays log of latest commit messages for all repositories in a table view.') + subparsers.add_parser(COMMANDS['tags'][0], aliases=COMMANDS['tags'][1:], help='Displays git tags in repositories.') + subparsers.add_parser(COMMANDS['labels'][0], aliases=COMMANDS['labels'][1:], help='Displays mud labels across repositories.') + subparsers.add_parser(COMMANDS['status'][0], aliases=COMMANDS['status'][1:], help='Displays working directory changes.') + subparsers.add_parser(COMMANDS['branches'][0], aliases=COMMANDS['branches'][1:], help='Displays all branches in repositories.') - add_parser = subparsers.add_parser(COMMANDS['add'][0], aliases=COMMANDS['add'][1:], help='Adds repository or labels an existing repository.') - add_parser.add_argument('label', help='The label to add (optional).', nargs='?', default='', type=str) - add_parser.add_argument('path', help='Repository to add (optional).', nargs='?', type=str) + add_parser = subparsers.add_parser(COMMANDS['add'][0], aliases=COMMANDS['add'][1:], help='Adds repository or labels an existing repository.') + add_parser.add_argument('label', help='The label to add (optional).', nargs='?', default='', type=str) + add_parser.add_argument('path', help='Repository to add (optional).', nargs='?', type=str) - remove_parser = subparsers.add_parser(COMMANDS['remove'][0], aliases=COMMANDS['remove'][1:], help='Removes repository or removes the label from an existing repository.') - remove_parser.add_argument('label', help='Label to remove from repository (optional).', nargs='?', default='', type=str) - remove_parser.add_argument('path', help='Repository to remove (optional).', nargs='?', type=str) + remove_parser = subparsers.add_parser(COMMANDS['remove'][0], aliases=COMMANDS['remove'][1:], help='Removes repository or removes the label from an existing repository.') + remove_parser.add_argument('label', help='Label to remove from repository (optional).', nargs='?', default='', type=str) + remove_parser.add_argument('path', help='Repository to remove (optional).', nargs='?', type=str) - parser.add_argument(*TABLE_ATTR, metavar='TABLE', nargs='?', default='', type=str, help=f'Switches table view, runs in table view it is disabled in {BOLD}.mudsettings{RESET}.') - parser.add_argument(*LABEL_PREFIX, metavar='LABEL', nargs='?', default='', type=str, help='Filters repositories by provided label.') - parser.add_argument(*BRANCH_PREFIX, metavar='BRANCH', nargs='?', default='', type=str, help='Filter repositories by provided branch.') - parser.add_argument(*MODIFIED_ATTR, action='store_true', help='Filters modified repositories.') - parser.add_argument(*DIVERGED_ATTR, action='store_true', help='Filters repositories with diverged branches.') - parser.add_argument(COMMANDS['set-global'][0], help=f'Sets {BOLD}.mudconfig{RESET} in the current repository as your fallback {BOLD}.mudconfig{RESET}.', action='store_true') - parser.add_argument(COMMANDS['version'][0], help='Displays the current version of mud.', action='store_true') - parser.add_argument('catch_all', nargs='*', help='Type any commands to execute among repositories.') - return parser + parser.add_argument(*TABLE_ATTR, metavar='TABLE', nargs='?', default='', type=str, help=f'Switches table view, runs in table view it is disabled in {BOLD}.mudsettings{RESET}.') + parser.add_argument(*LABEL_PREFIX, metavar='LABEL', nargs='?', default='', type=str, help='Filters repositories by provided label.') + parser.add_argument(*BRANCH_PREFIX, metavar='BRANCH', nargs='?', default='', type=str, help='Filter repositories by provided branch.') + parser.add_argument(*MODIFIED_ATTR, action='store_true', help='Filters modified repositories.') + parser.add_argument(*DIVERGED_ATTR, action='store_true', help='Filters repositories with diverged branches.') + parser.add_argument(COMMANDS['set-global'][0], help=f'Sets {BOLD}.mudconfig{RESET} in the current repository as your fallback {BOLD}.mudconfig{RESET}.', action='store_true') + parser.add_argument(COMMANDS['version'][0], help='Displays the current version of mud.', action='store_true') + parser.add_argument('catch_all', nargs='*', help='Type any commands to execute among repositories.') + return parser - def run(self) -> None: - # Displays default help message - if len(sys.argv) == 1 or sys.argv[1] in COMMANDS['help']: - self.parser.print_help() - return - # Sets global repository in .mudsettings - if sys.argv[1] in COMMANDS['set-global']: - config_path = os.path.join(os.getcwd(), utils.CONFIG_FILE_NAME) - if os.path.exists(config_path): - utils.settings.config.set('mud', 'config_path', config_path) - utils.settings.save() - print(f'Current {BOLD}.mudconfig{RESET} set as a global configuration.') - return - # Prints version - elif sys.argv[1] in COMMANDS['version']: - utils.version() - return - # Checks for available updates - elif sys.argv[1] in COMMANDS['update']: - utils.check_updates(True) - return - # Runs configuration wizard - elif sys.argv[1] in COMMANDS['configure']: - utils.configure() - return - current_directory = os.getcwd() - self.config = config.Config() + def run(self) -> None: + # Displays default help message + if len(sys.argv) == 1 or sys.argv[1] in COMMANDS['help']: + self.parser.print_help() + return + # Sets global repository in .mudsettings + if sys.argv[1] in COMMANDS['set-global']: + config_path = os.path.join(os.getcwd(), utils.CONFIG_FILE_NAME) + if os.path.exists(config_path): + utils.settings.config.set('mud', 'config_path', config_path) + utils.settings.save() + print(f'Current {BOLD}.mudconfig{RESET} set as a global configuration.') + return + # Prints version + elif sys.argv[1] in COMMANDS['version']: + utils.version() + return + # Checks for available updates + elif sys.argv[1] in COMMANDS['update']: + utils.check_updates(True) + return + # Runs configuration wizard + elif sys.argv[1] in COMMANDS['configure']: + utils.configure() + return + current_directory = os.getcwd() + self.config = config.Config() - # Discovers repositories in current directory - if sys.argv[1] in COMMANDS['init']: - self.init(self.parser.parse_args()) - return + # Discovers repositories in current directory + if sys.argv[1] in COMMANDS['init']: + self.init(self.parser.parse_args()) + return - self.config.find() - self._filter_repos() + self.config.find() + self._filter_repos() - self.cmd_runner = commands.Commands(self.config) - # Handling commands - if len(sys.argv) > 1 and sys.argv[1] in [cmd for group in COMMANDS.values() for cmd in group]: - args = self.parser.parse_args() - if args.command in COMMANDS['init']: - os.chdir(current_directory) - self.init(args) - elif args.command in COMMANDS['add']: - self.add(args) - elif args.command in COMMANDS['remove']: - self.remove(args) - else: - if len(self.repos) == 0: - utils.print_error('No repositories are matching this filter.') - return - if utils.settings.config['mud'].getboolean('auto_fetch'): - self._fetch_all() - if args.command in COMMANDS['info']: - self.cmd_runner.info(self.repos) - elif args.command in COMMANDS['log']: - self.cmd_runner.log(self.repos) - elif args.command in COMMANDS['branches']: - self.cmd_runner.branches(self.repos) - elif args.command in COMMANDS['labels']: - self.cmd_runner.labels(self.repos) - elif args.command in COMMANDS['tags']: - self.cmd_runner.tags(self.repos) - elif args.command in COMMANDS['status']: - self.cmd_runner.status(self.repos) - # Handling subcommands - else: - del sys.argv[0] - if len(sys.argv) == 0: - self.parser.print_help() - return - self._parse_aliases() - if utils.settings.config['mud'].getboolean('run_async'): - try: - if self.table: - asyncio.run(self.cmd_runner.run_async_table_view(self.repos.keys(), sys.argv)) - else: - asyncio.run(self.cmd_runner.run_async(self.repos.keys(), sys.argv)) - except Exception as exception: - utils.print_error('Invalid command.') - print(type(exception)) - else: - self.cmd_runner.run_ordered(self.repos.keys(), sys.argv) + self.cmd_runner = commands.Commands(self.config) + # Handling commands + if len(sys.argv) > 1 and sys.argv[1] in [cmd for group in COMMANDS.values() for cmd in group]: + args = self.parser.parse_args() + if args.command in COMMANDS['init']: + os.chdir(current_directory) + self.init(args) + elif args.command in COMMANDS['add']: + self.add(args) + elif args.command in COMMANDS['remove']: + self.remove(args) + else: + if len(self.repos) == 0: + utils.print_error('No repositories are matching this filter.') + return + if utils.settings.config['mud'].getboolean('auto_fetch'): + self._fetch_all() + if args.command in COMMANDS['info']: + self.cmd_runner.info(self.repos) + elif args.command in COMMANDS['log']: + self.cmd_runner.log(self.repos) + elif args.command in COMMANDS['branches']: + self.cmd_runner.branches(self.repos) + elif args.command in COMMANDS['labels']: + self.cmd_runner.labels(self.repos) + elif args.command in COMMANDS['tags']: + self.cmd_runner.tags(self.repos) + elif args.command in COMMANDS['status']: + self.cmd_runner.status(self.repos) + # Handling subcommands + else: + del sys.argv[0] + if len(sys.argv) == 0: + self.parser.print_help() + return + self._parse_aliases() + if utils.settings.config['mud'].getboolean('run_async'): + try: + if self.table: + asyncio.run(self.cmd_runner.run_async_table_view(self.repos.keys(), sys.argv)) + else: + asyncio.run(self.cmd_runner.run_async(self.repos.keys(), sys.argv)) + except Exception as exception: + utils.print_error('Invalid command.') + print(type(exception)) + else: + self.cmd_runner.run_ordered(self.repos.keys(), sys.argv) - def init(self, args) -> None: - self.config.data = {} - index = 0 - directories = [d for d in os.listdir('.') if os.path.isdir(d) and os.path.isdir(os.path.join(d, '.git'))] - print(directories) - print(os.getcwd()) - for directory in directories: - if directory in self.config.paths(): - continue - self.config.add_label(directory, getattr(args, 'label', '')) - index += 1 - path = f'{DIM}{GRAY}../{RESET}{DIM}{directory}{RESET}' - print(f'{path} {GREEN}added{RESET}') - if index == 0: - utils.print_error('No git repositories were found in this directory.') - return - self.config.save(utils.CONFIG_FILE_NAME) + def init(self, args) -> None: + self.config.data = {} + index = 0 + directories = [d for d in os.listdir('.') if os.path.isdir(d) and os.path.isdir(os.path.join(d, '.git'))] + print(directories) + print(os.getcwd()) + for directory in directories: + if directory in self.config.paths(): + continue + self.config.add_label(directory, getattr(args, 'label', '')) + index += 1 + path = f'{DIM}{GRAY}../{RESET}{DIM}{directory}{RESET}' + print(f'{path} {GREEN}added{RESET}') + if index == 0: + utils.print_error('No git repositories were found in this directory.') + return + self.config.save(utils.CONFIG_FILE_NAME) - def add(self, args) -> None: - self.config.add_label(args.path, args.label) - self.config.save(utils.CONFIG_FILE_NAME) + def add(self, args) -> None: + self.config.add_label(args.path, args.label) + self.config.save(utils.CONFIG_FILE_NAME) - def remove(self, args) -> None: - if args.path: - self.config.remove_label(args.path, args.label) - elif args.label: - self.config.remove_path(args.label) - else: - utils.print_error(f'Invalid input. Please provide a value to remove.') - self.config.save(utils.CONFIG_FILE_NAME) + def remove(self, args) -> None: + if args.path: + self.config.remove_label(args.path, args.label) + elif args.label: + self.config.remove_path(args.label) + else: + utils.print_error(f'Invalid input. Please provide a value to remove.') + self.config.save(utils.CONFIG_FILE_NAME) - # Filter out repositories if user provided filters - def _filter_repos(self) -> None: - self.repos = self.config.all() - branch = None - modified = False - diverged = False - self.table = utils.settings.config['mud'].getboolean('run_table') - index = 1 - while index < len(sys.argv): - arg = sys.argv[index] - if arg.startswith('-'): - arg = sys.argv[1:][index - 1] - if any(arg.startswith(prefix) for prefix in LABEL_PREFIX): - label = arg.split('=', 1)[1] - self.repos = self.config.with_label(label) - elif any(arg.startswith(prefix) for prefix in BRANCH_PREFIX): - branch = arg.split('=', 1)[1] - elif arg in TABLE_ATTR: - self.table = not self.table - elif arg in MODIFIED_ATTR: - modified = True - elif arg in DIVERGED_ATTR: - diverged = True - else: - index += 1 - continue - del sys.argv[index] - continue - break - directory = os.getcwd() - to_delete = [] - for repo in self.repos: - os.chdir(os.path.join(directory, repo)) - has_modifications = subprocess.check_output('git status --porcelain', shell=True) - branch_filter = (branch is not None and branch.strip() and subprocess.check_output('git rev-parse --abbrev-ref HEAD', shell=True, text=True).splitlines()[0] != branch) - is_diverged = not any('ahead' in line or 'behind' in line for line in subprocess.check_output('git status --branch --porcelain', shell=True, text=True).splitlines() if line.startswith('##')) - if (modified and not has_modifications) or (branch and branch_filter) or (diverged and is_diverged): - to_delete.append(repo) + # Filter out repositories if user provided filters + def _filter_repos(self) -> None: + self.repos = self.config.all() + branch = None + modified = False + diverged = False + self.table = utils.settings.config['mud'].getboolean('run_table') + index = 1 + while index < len(sys.argv): + arg = sys.argv[index] + if arg.startswith('-'): + arg = sys.argv[1:][index - 1] + if any(arg.startswith(prefix) for prefix in LABEL_PREFIX): + label = arg.split('=', 1)[1] + self.repos = self.config.with_label(label) + elif any(arg.startswith(prefix) for prefix in BRANCH_PREFIX): + branch = arg.split('=', 1)[1] + elif arg in TABLE_ATTR: + self.table = not self.table + elif arg in MODIFIED_ATTR: + modified = True + elif arg in DIVERGED_ATTR: + diverged = True + else: + index += 1 + continue + del sys.argv[index] + continue + break + directory = os.getcwd() + to_delete = [] + for repo in self.repos: + os.chdir(os.path.join(directory, repo)) + has_modifications = subprocess.check_output('git status --porcelain', shell=True) + branch_filter = (branch is not None and branch.strip() and subprocess.check_output('git rev-parse --abbrev-ref HEAD', shell=True, text=True).splitlines()[0] != branch) + is_diverged = not any('ahead' in line or 'behind' in line for line in subprocess.check_output('git status --branch --porcelain', shell=True, text=True).splitlines() if line.startswith('##')) + if (modified and not has_modifications) or (branch and branch_filter) or (diverged and is_diverged): + to_delete.append(repo) - for repo in to_delete: - del self.repos[repo] - os.chdir(directory) + for repo in to_delete: + del self.repos[repo] + os.chdir(directory) - def _fetch_all(self) -> None: - if utils.settings.config['mud'].getboolean('run_async'): - asyncio.run(self._fetch_all_async()) - else: - for repo in self.repos: - subprocess.run('git fetch', shell=True, cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + def _fetch_all(self) -> None: + if utils.settings.config['mud'].getboolean('run_async'): + asyncio.run(self._fetch_all_async()) + else: + for repo in self.repos: + subprocess.run('git fetch', shell=True, cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - async def _fetch_all_async(self) -> None: - tasks = [self._fetch_repo_async(repo) for repo in self.repos] - await asyncio.gather(*tasks) + async def _fetch_all_async(self) -> None: + tasks = [self._fetch_repo_async(repo) for repo in self.repos] + await asyncio.gather(*tasks) - @staticmethod - async def _fetch_repo_async(repo: str) -> None: - await asyncio.create_subprocess_exec('git fetch', shell=True, cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + @staticmethod + async def _fetch_repo_async(repo: str) -> None: + await asyncio.create_subprocess_exec('git fetch', shell=True, cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - @staticmethod - def _parse_aliases(): - if utils.settings.alias_settings is None: - return - for alias, command in dict(utils.settings.alias_settings).items(): - if sys.argv[0] == alias: - del sys.argv[0] - sys.argv = command.split(' ') + sys.argv + @staticmethod + def _parse_aliases(): + if utils.settings.alias_settings is None: + return + for alias, command in dict(utils.settings.alias_settings).items(): + if sys.argv[0] == alias: + del sys.argv[0] + sys.argv = command.split(' ') + sys.argv diff --git a/settings.py b/settings.py index 75d50d4..2349520 100644 --- a/settings.py +++ b/settings.py @@ -6,48 +6,48 @@ class Settings: - def __init__(self, file_name: str) -> None: - self.file_name = file_name - self.mud_settings = None - self.alias_settings = None - self.config = configparser.ConfigParser() - self.settings_file = os.path.join(os.path.expanduser('~'), self.file_name) - self.defaults = { - 'mud': { - 'config_path': '', - 'nerd_fonts': True, - 'auto_fetch': False, - 'run_async': True, - 'run_table': True, - 'simplify_branches': True, - 'ask_updates': True - }, - 'alias': { - 'to': 'git checkout', - 'fetch': 'git fetch', - 'pull': 'git pull', - 'push': 'git push' - } - } - self.load_settings() + def __init__(self, file_name: str) -> None: + self.file_name = file_name + self.mud_settings = None + self.alias_settings = None + self.config = configparser.ConfigParser() + self.settings_file = os.path.join(os.path.expanduser('~'), self.file_name) + self.defaults = { + 'mud': { + 'config_path': '', + 'nerd_fonts': True, + 'auto_fetch': False, + 'run_async': True, + 'run_table': True, + 'simplify_branches': True, + 'ask_updates': True + }, + 'alias': { + 'to': 'git checkout', + 'fetch': 'git fetch', + 'pull': 'git pull', + 'push': 'git push' + } + } + self.load_settings() - def load_settings(self) -> None: - if not os.path.exists(self.settings_file): - self.config.read_dict(self.defaults) - self.save() - else: - self.config.read(self.settings_file) + def load_settings(self) -> None: + if not os.path.exists(self.settings_file): + self.config.read_dict(self.defaults) + self.save() + else: + self.config.read(self.settings_file) - self.mud_settings = {} - for key in self.defaults[MAIN_SCOPE]: - if isinstance(self.defaults[MAIN_SCOPE][key], bool): - self.mud_settings[key] = self.config.getboolean(MAIN_SCOPE, key, fallback=self.defaults[MAIN_SCOPE][key]) - else: - self.mud_settings[key] = self.config.get(MAIN_SCOPE, key, fallback=self.defaults[MAIN_SCOPE][key]) + self.mud_settings = {} + for key in self.defaults[MAIN_SCOPE]: + if isinstance(self.defaults[MAIN_SCOPE][key], bool): + self.mud_settings[key] = self.config.getboolean(MAIN_SCOPE, key, fallback=self.defaults[MAIN_SCOPE][key]) + else: + self.mud_settings[key] = self.config.get(MAIN_SCOPE, key, fallback=self.defaults[MAIN_SCOPE][key]) - if ALIAS_SCOPE in self.config: - self.alias_settings = self.config[ALIAS_SCOPE] + if ALIAS_SCOPE in self.config: + self.alias_settings = self.config[ALIAS_SCOPE] - def save(self) -> None: - with open(self.settings_file, 'w') as configfile: - self.config.write(configfile) + def save(self) -> None: + with open(self.settings_file, 'w') as configfile: + self.config.write(configfile) diff --git a/utils.py b/utils.py index 8b143d3..e9e8fe3 100644 --- a/utils.py +++ b/utils.py @@ -9,72 +9,72 @@ CONFIG_FILE_NAME = '.mudconfig' GLYPHS = {} ICON_GLYPHS = { - 'ahead': '\uf062', - 'behind': '\uf063', - 'modified': '\uf040', - 'added': '\uf067', - 'removed': '\uf1f8', - 'moved': '\uf064', - 'clear': '\uf00c', - 'synced': '\uf00c', - 'master': '\uf015', - 'bugfix': '\uf188', - 'release': '\uf135', - 'feature': '\uf0ad', - 'branch': '\ue725', - 'failed': '\uf00d', - 'finished': '\uf00c', - 'running': '\uf46a', - 'label': '\uf412', - 'tag': '\uf02b', - '(': '\ue0b6', - ')': '\ue0b4', - 'space': ' ', + 'ahead': '\uf062', + 'behind': '\uf063', + 'modified': '\uf040', + 'added': '\uf067', + 'removed': '\uf1f8', + 'moved': '\uf064', + 'clear': '\uf00c', + 'synced': '\uf00c', + 'master': '\uf015', + 'bugfix': '\uf188', + 'release': '\uf135', + 'feature': '\uf0ad', + 'branch': '\ue725', + 'failed': '\uf00d', + 'finished': '\uf00c', + 'running': '\uf46a', + 'label': '\uf412', + 'tag': '\uf02b', + '(': '\ue0b6', + ')': '\ue0b4', + 'space': ' ', } TEXT_GLYPHS = { - 'ahead': 'Ahead', - 'behind': 'Behind', - 'modified': '*', - 'added': '+', - 'removed': '-', - 'moved': 'M', - 'clear': 'Clear', - 'synced': 'Up to date', - 'master': '', - 'bugfix': '', - 'release': '', - 'feature': '', - 'branch': '', - 'failed': 'Failed', - 'finished': 'Finished', - 'running': 'Running', - 'label': '', - 'tag': ' ', - '(': '', - ')': ' ', - 'space': '', + 'ahead': 'Ahead', + 'behind': 'Behind', + 'modified': '*', + 'added': '+', + 'removed': '-', + 'moved': 'M', + 'clear': 'Clear', + 'synced': 'Up to date', + 'master': '', + 'bugfix': '', + 'release': '', + 'feature': '', + 'branch': '', + 'failed': 'Failed', + 'finished': 'Finished', + 'running': 'Running', + 'label': '', + 'tag': ' ', + '(': '', + ')': ' ', + 'space': '', } settings: Settings def set_up(): - global GLYPHS - GLYPHS = ICON_GLYPHS if settings.mud_settings['nerd_fonts'] else TEXT_GLYPHS + global GLYPHS + GLYPHS = ICON_GLYPHS if settings.mud_settings['nerd_fonts'] else TEXT_GLYPHS - if settings.config['mud'].getboolean('ask_updates') and check_updates(): - sys.exit() + if settings.config['mud'].getboolean('ask_updates') and check_updates(): + sys.exit() def version() -> None: - os.chdir(os.path.dirname(os.path.abspath(__file__))) - hash = subprocess.check_output('git rev-parse --short HEAD', shell=True, text=True).splitlines()[0] - m = random.choice(TEXT[3:]) - u = random.choice(TEXT[3:]) - d = random.choice(TEXT[3:]) - t = random.choice(TEXT[3:]) - v = random.choice(TEXT[3:]) - print(fr''' + os.chdir(os.path.dirname(os.path.abspath(__file__))) + hash = subprocess.check_output('git rev-parse --short HEAD', shell=True, text=True).splitlines()[0] + m = random.choice(TEXT[3:]) + u = random.choice(TEXT[3:]) + d = random.choice(TEXT[3:]) + t = random.choice(TEXT[3:]) + v = random.choice(TEXT[3:]) + print(fr''' {m} __ __{u} __ __{d} _____ {m}/\ '-./ \{u}/\ \/\ \{d}/\ __-. {BOLD}{t}Multi-directory runner{RESET} [{v}{hash}{RESET}] {m}\ \ \-./\ \{u} \ \_\ \{d} \ \/\ \ {RESET}Jasur Sadikov @@ -84,75 +84,75 @@ def version() -> None: def check_updates(explicit: bool = False) -> bool: - target_directory = os.getcwd() - os.chdir(os.path.dirname(os.path.abspath(__file__))) + target_directory = os.getcwd() + os.chdir(os.path.dirname(os.path.abspath(__file__))) - subprocess.run('git fetch', shell=True, check=True) - result = subprocess.run('git status -uno', shell=True, capture_output=True, text=True) + subprocess.run('git fetch', shell=True, check=True) + result = subprocess.run('git status -uno', shell=True, capture_output=True, text=True) - if 'Your branch is behind' in result.stdout: - m = random.choice(TEXT[3:]) - u = random.choice(TEXT[3:]) - d = random.choice(TEXT[3:]) - print(fr''' + if 'Your branch is behind' in result.stdout: + m = random.choice(TEXT[3:]) + u = random.choice(TEXT[3:]) + d = random.choice(TEXT[3:]) + print(fr''' {m} __ __{u} __ __{d} _____ {m}/\ '-./ \{u}/\ \/\ \{d}/\ __-.{RESET} {m}\ \ \-./\ \{u} \ \_\ \{d} \ \/\ \{RESET} {m} \ \_\ \ \_\{u} \_____\{d} \____-{RESET} {m} \/_/ \/_/{u}\/_____/{d}\/____/{RESET} ''') - print(f'{BOLD}New update(s) is available!{RESET}\n') + print(f'{BOLD}New update(s) is available!{RESET}\n') - log = subprocess.run('git log HEAD..@{u} --oneline --color=always', shell=True, text=True, stdout=subprocess.PIPE).stdout - print(log) + log = subprocess.run('git log HEAD..@{u} --oneline --color=always', shell=True, text=True, stdout=subprocess.PIPE).stdout + print(log) - if ask('Do you want to update?'): - update_process = subprocess.run('git pull --force', shell=True, text=False, stdout=subprocess.DEVNULL) - if update_process.returncode == 0: - print(f'{GREEN}{BOLD}Update successful!{RESET}') - else: - print_error('Update failed', update_process.returncode) - os.chdir(target_directory) - return True + if ask('Do you want to update?'): + update_process = subprocess.run('git pull --force', shell=True, text=False, stdout=subprocess.DEVNULL) + if update_process.returncode == 0: + print(f'{GREEN}{BOLD}Update successful!{RESET}') + else: + print_error('Update failed', update_process.returncode) + os.chdir(target_directory) + return True - if explicit: - print('No updates available') + if explicit: + print('No updates available') - os.chdir(target_directory) - return False + os.chdir(target_directory) + return False def configure(): - settings.config['mud']['run_async'] = str(ask('Do you want to run commands simultaneously for multiple repositories?')) - settings.config['mud']['run_table'] = str(ask('Do you want to see command execution progress in table view? This will limit output content.')) - settings.config['mud']['auto_fetch'] = str(ask(f'Do you want to automatically run {BOLD}\'git fetch\'{RESET} whenever you run commands such as {BOLD}\'mud info\'{RESET}?')) - settings.config['mud']['ask_updates'] = str(ask(f'Do you want to get information about latest updates?')) - settings.config['mud']['nerd_fonts'] = str(ask(f'Do you want to use {BOLD}nerd-fonts{RESET}?')) - settings.config['mud']['simplify_branches'] = str(ask(f'Do you want to simplify branches? (ex. {BOLD}feature/name{RESET} -> {BOLD}f/name{RESET}')) - settings.save() - print('Your settings are updated!') - pass + settings.config['mud']['run_async'] = str(ask('Do you want to run commands simultaneously for multiple repositories?')) + settings.config['mud']['run_table'] = str(ask('Do you want to see command execution progress in table view? This will limit output content.')) + settings.config['mud']['auto_fetch'] = str(ask(f'Do you want to automatically run {BOLD}\'git fetch\'{RESET} whenever you run commands such as {BOLD}\'mud info\'{RESET}?')) + settings.config['mud']['ask_updates'] = str(ask(f'Do you want to get information about latest updates?')) + settings.config['mud']['nerd_fonts'] = str(ask(f'Do you want to use {BOLD}nerd-fonts{RESET}?')) + settings.config['mud']['simplify_branches'] = str(ask(f'Do you want to simplify branches? (ex. {BOLD}feature/name{RESET} -> {BOLD}f/name{RESET}')) + settings.save() + print('Your settings are updated!') + pass def ask(text: str) -> bool: - print(f'{text} [Y/n] ', end='', flush=True) - if sys.platform.startswith('win'): - from msvcrt import getch - response = getch().decode().lower() - else: - import tty, termios - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(fd) - response = sys.stdin.read(1).lower() - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - print() - return response in ['y', '\r', '\n'] + print(f'{text} [Y/n] ', end='', flush=True) + if sys.platform.startswith('win'): + from msvcrt import getch + response = getch().decode().lower() + else: + import tty, termios + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(fd) + response = sys.stdin.read(1).lower() + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + print() + return response in ['y', '\r', '\n'] def print_error(text: str, code: int = 255) -> None: - print(f'{RED}Error:{RESET} {text}') - sys.exit(code) \ No newline at end of file + print(f'{RED}Error:{RESET} {text}') + sys.exit(code) \ No newline at end of file