diff --git a/sssd_test_framework/hosts/samba.py b/sssd_test_framework/hosts/samba.py index 352189b..2bd50d3 100644 --- a/sssd_test_framework/hosts/samba.py +++ b/sssd_test_framework/hosts/samba.py @@ -30,6 +30,9 @@ def __init__(self, *args, **kwargs) -> None: self._features: dict[str, bool] | None = None + self.admin: str = self.config.get("username", "Administrator") + """Username of the admin user, defaults to value of ``Administrator``.""" + self.adminpw: str = self.config.get("adminpw", self.bindpw) """Password of the admin user, defaults to value of ``bindpw``.""" diff --git a/sssd_test_framework/roles/ad.py b/sssd_test_framework/roles/ad.py index 5efdf34..fe9f84c 100644 --- a/sssd_test_framework/roles/ad.py +++ b/sssd_test_framework/roles/ad.py @@ -549,7 +549,7 @@ def _add(self, attrs: CLIBuilderArgs) -> None: def _modify(self, attrs: CLIBuilderArgs) -> None: """ - Modifiy Active Directory object. + Modify Active Directory object. :param attrs: Object attributes in :class:`pytest_mh.cli.CLIBuilder` format, defaults to dict() :type attrs: pytest_mh.cli.CLIBuilderArgs, optional @@ -1652,19 +1652,21 @@ def __init__(self, role: AD, name: str) -> None: self.target: str | None = None """Group policy target.""" - self._search_base: str = f"cn=policies,cn=system,{self.role.host.naming_context}" + self.search_base: str = f"cn=policies,cn=system,{self.role.host.naming_context}" """Group policy search base.""" - self._dn = self.get("DistinguishedName") + self.dn = self.get("DistinguishedName") """Group policy dn.""" - self._cn = self.get("CN") + self.cn = self.get("CN") """Group policy cn.""" def get(self, key: str) -> str | None: """ Get group policy attributes. + This method is unique for the GPO class, unlike SambaGPO class, the ADObject class is not inherited. + :param key: Attribute to get. :type key: str :return: Key value. @@ -1673,7 +1675,7 @@ def get(self, key: str) -> str | None: result = self.role.host.conn.run( rf""" $query = "(&(ObjectClass=groupPolicyContainer)(DisplayName={self.name}))" - Get-ADObject -SearchBase "{self._search_base}" -Properties "*" -LDAPFilter $query + Get-ADObject -SearchBase "{self.search_base}" -Properties "*" -LDAPFilter $query """ ).stdout_lines @@ -1694,7 +1696,7 @@ def delete(self) -> None: """ Delete group policy object. """ - self.role.host.conn.run(f'Remove-GPO -Guid "{self._cn}" -Confirm:$false') + self.role.host.conn.run(f'Remove-GPO -Guid "{self.cn}" -Confirm:$false') def add(self) -> GPO: """ @@ -1713,13 +1715,13 @@ def add(self) -> GPO: """ self.role.host.conn.run(f'New-GPO -name "{self.name}"') - self._cn = self.get("CN") - self._dn = self.get("DistinguishedName") + self.cn = self.get("CN") + self.dn = self.get("DistinguishedName") self.role.host.conn.run( rf""" Import-Module GroupPolicy, PSIni - $path = "C:\\Windows\\SYSVOL\\domain\\Policies\\{self._cn}\\Machine\\Microsoft\\Windows NT\\SecEdit" + $path = "C:\\Windows\\SYSVOL\\domain\\Policies\\{self.cn}\\Machine\\Microsoft\\Windows NT\\SecEdit" $file = Join-Path $path GptTmpl.inf $content = @{{'Unicode'=@{{'Unicode'='yes'}};'Version'=@{{'signature'='"$CHICAGO$"';'Revision'='1'}}}} New-Item -Path "$path" -ItemType Directory @@ -1733,43 +1735,50 @@ def add(self) -> GPO: def link( self, - op: str | None = "New", target: str | None = None, - args: list[str] | str | None = None, + enforced: bool | None = None, + disabled: bool | None = False, + order: int | None = 0, ) -> GPO: """ - Link the group policy to the a target object inside the directory, a site, domain or an ou. - - ..Note:: - The New and Set cmdlets are identical. To modify an an existing link, - change the $op parameter to "Set", i.e. to disable 'Enforced' - - ou_policy.link("Set", args=["-Enforced No"]) + Link the group policy to the target object inside the directory, a site, domain or an ou. - :param op: Cmdlet operation, defaults to "New" - :type op: str, optional :param target: Group policy target :type target: str, optional - :param args: Additional arguments - :type args: list[str] | None, optional + :param enforced: Enforced the policy + :type enforced: bool, optional + :param disabled: Disable the policy + :type disabled: bool, optional + :param order: Order number + :type order: int, optional :return: Group policy object :rtype: GPO """ - if args is None: - args = [] - - if isinstance(args, list): - args = " ".join(args) - elif args is None: - args = "" - if target is None and self.target is None: self.target = "Default-First-Site-Name" if target is not None and self.target is None: self.target = target - self.role.host.conn.run(f'{op}-GPLink -Guid "{self._cn}" -Target "{self.target}" -LinkEnabled Yes {args}') + args: CLIBuilderArgs = { + "Guid": (self.cli.option.VALUE, self.cn), + "Target": (self.cli.option.VALUE, self.target), + "Enforced": (self.cli.option.VALUE, "Yes" if enforced else "No"), + "LinkEnabled": (self.cli.option.VALUE, "Yes" if not disabled else "No"), + "Order": (self.cli.option.VALUE, order), + } + + # The cmdlets take the same arguments, but one is for new links and the other is for existing links. + # This is combined to simplify gpo management. + new_link = self.role.host.conn.run( + self.cli.command("New-GPLink", args), + raise_on_error=False, + ) + if new_link.rc != 0: + self.role.host.conn.run( + self.cli.command("Set-GPLink", args), + raise_on_error=False, + ) return self @@ -1780,7 +1789,7 @@ def unlink(self) -> GPO: :return: Group policy object :rtype: GPO """ - self.role.host.conn.run(f'Remove-GPLink -Guid "{self._cn}" -Target "{self.target}"') + self.role.host.conn.run(f'Remove-GPLink -Guid "{self.cn}" -Target "{self.target}"') return self @@ -1806,7 +1815,7 @@ def permissions(self, target: str, permission_level: str, target_type: str | Non # Setting the permission using ADSI is a workaround for automation. $authenticated_users = New-Object System.Security.Principal.SecurityIdentifier("S-1-5-11") - $gpo = Get-GPO -Guid "{self._cn}" + $gpo = Get-GPO -Guid "{self.cn}" $gid = $gpo.id $search_base = "cn=policies,cn=system," + "{self.host.naming_context}" $filter = "(&(objectClass=groupPolicyContainer)(cn={{$gid}}))" @@ -1822,7 +1831,7 @@ def permissions(self, target: str, permission_level: str, target_type: str | Non ) else: self.role.host.conn.run( - f'Set-GPPermission -Guid "{self._cn}" ' + f'Set-GPPermission -Guid "{self.cn}" ' f'-TargetName "{target}" ' f'-PermissionLevel "{permission_level}" ' f'-TargetType "{target_type}" -Replace:$True -Confirm:$False' @@ -1836,7 +1845,7 @@ def policy(self, logon_rights: dict[str, list[ADObject]], cfg: dict[str, Any] | This method does the remaining configuration of the group policy. It updates 'GptTmpl.inf' with security logon right keys with the SIDs of users and groups - objects. The *Remote* keys can be omitted, in which the corresponding keys values + objects. The *Remote* keys can be omitted, in which the interactive key's value will then be used. To add users and groups to the policy, the SID must be used for the values. The @@ -1887,7 +1896,7 @@ def policy(self, logon_rights: dict[str, list[ADObject]], cfg: dict[str, Any] | self.host.conn.run( rf""" Import-Module PSIni - $path = "C:\\Windows\\SYSVOL\\domain\\Policies\\{self._cn}\\Machine\\Microsoft\\Windows NT\\SecEdit" + $path = "C:\\Windows\\SYSVOL\\domain\\Policies\\{self.cn}\\Machine\\Microsoft\\Windows NT\\SecEdit" $file = Join-Path $path GptTmpl.inf $policy = @{{"Privilege Rights"={ps_logon_rights}}} Out-IniFile -InputObject $policy -FilePath "$file" @@ -1900,7 +1909,7 @@ def policy(self, logon_rights: dict[str, list[ADObject]], cfg: dict[str, Any] | self.host.conn.run( rf""" Import-Module PSIni - $path = "C:\\Windows\\SYSVOL\\domain\\Policies\\{self._cn}\\Machine\\Microsoft\\Windows NT\\SecEdit" + $path = "C:\\Windows\\SYSVOL\\domain\\Policies\\{self.cn}\\Machine\\Microsoft\\Windows NT\\SecEdit" $file = Join-Path $path GptTmpl.inf $policy = {ps_cfg} Out-IniFile -InputObject $policy -FilePath "$file" @@ -1911,7 +1920,7 @@ def policy(self, logon_rights: dict[str, list[ADObject]], cfg: dict[str, Any] | self.host.conn.run( rf""" $gpc = "[{{827D319E-6EAC-11D2-A4EA-00C04F79F83A}}{{803E14A0-B4FB-11D0-A0D0-00A0C90F574B}}]" - Set-ADObject -Identity "{self._dn}" -Replace @{{gPCMachineExtensionNames=$gpc}} + Set-ADObject -Identity "{self.dn}" -Replace @{{gPCMachineExtensionNames=$gpc}} Exit 0 """ ) diff --git a/sssd_test_framework/roles/generic.py b/sssd_test_framework/roles/generic.py index b1892f6..9ca6b99 100644 --- a/sssd_test_framework/roles/generic.py +++ b/sssd_test_framework/roles/generic.py @@ -16,14 +16,18 @@ "ProtocolName", "GenericProvider", "GenericADProvider", + "GenericOrganizationalUnit", "GenericUser", "GenericGroup", + "GenericComputer", + "GenericSite", "GenericNetgroup", "GenericNetgroupMember", "GenericSudoRule", "GenericAutomount", "GenericAutomountMap", "GenericAutomountKey", + "GenericGPO", ] @@ -37,7 +41,7 @@ class ProtocolName(Protocol): class GenericProvider(ABC, MultihostRole[BaseHost]): """ - Generic provider interface. All providers implements this interface. + Generic provider interface. All providers implement this interface. .. note:: @@ -93,7 +97,7 @@ def test_example(client: Client, provider: GenericProvider): assert result is not None assert result.user.name == 'user-1' - :param name: User name. + :param name: Username. :type name: str :return: New user object. :rtype: GenericUser @@ -283,11 +287,140 @@ def fqn(self, name: str) -> str: """ pass + @property + @abstractmethod + def dn(self) -> str: + """ + Distinguished Name. + """ + pass + @property @abstractmethod def firewall(self) -> Firewall: pass + @abstractmethod + def ou(self, name: str) -> GenericOrganizationalUnit: + """ + Get OU object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyAD) + def test_example(client: Client, provider: GenericADProvider): + # Create OU + provider.ou("test_ou").add() + + :param name: OU name. + :type name: str + :return: OU object. + :rtype: GenericOrganizationalUnit + """ + pass + + @abstractmethod + def computer(self, name: str) -> GenericComputer: + """ + Get computer object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyAD) + def test_example(client: Client, provider: GenericADProvider): + # Create a new OU + ou = provider.ou("test_ou").add().dn + + # Moves a computer object, takes the hostname and gets the shortname + provider.computer(client.host.hostname.split("."[0])).move(ou) + + :param name: Computer name. + :type name: str + :return: OU object. + :rtype: GenericComputer + """ + pass + + @abstractmethod + def site(self, name: str) -> GenericSite: + """ + Get site object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyAD) + def test_example(client: Client, provider: GenericADProvider): + # Create New Site, this name cannot contain spaces + site = provider.site('New-Site').add() + + :param name: Site name. + :type name: str, cannot contain spaces + :return: Site object. + :rtype: GenericSite + """ + pass + + @abstractmethod + def gpo(self, name: str) -> GenericGPO: + """ + Get group policy object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopologyGroup.AnyAD) + def test_gpo_is_set_to_enforcing(client: Client, provider: GenericADProvider): + user = provider.user("user").add() + allow_user = provider.user("allow_user").add() + deny_user = provider.user("deny_user").add() + + provider.gpo("test policy").add().policy( + { + "SeInteractiveLogonRight": [allow_user, provider.group("Domain Admins")], + "SeRemoteInteractiveLogonRight": [allow_user, provider.group("Domain Admins")], + "SeDenyInteractiveLogonRight": [deny_user], + "SeDenyRemoteInteractiveLogonRight": [deny_user], + } + ).link() + + client.sssd.domain["ad_gpo_access_control"] = "enforcing" + client.sssd.start() + + assert client.auth.ssh.password(username="allow_user", password="Secret123") + assert not client.auth.ssh.password(username="user", password="Secret123") + assert not client.auth.ssh.password(username="deny_user", password="Secret123") + + """ + pass + + +class GenericOrganizationalUnit(ABC, BaseObject): + """ + Generic ou management. + """ + + @property + @abstractmethod + def name(self): + """ + OU name. + """ + pass + + @abstractmethod + def add(self, name: str) -> GenericOrganizationalUnit: + """ + Create a new OU. + :param name: + :type name: str + :return: self + :rtype: GenericOrganizationalUnit + """ + pass + class GenericUser(ABC, BaseObject): """ @@ -535,6 +668,55 @@ def remove_members(self, members: list[GenericUser | GenericGroup]) -> GenericGr pass +class GenericComputer(ABC, BaseObject): + """ + Generic computer management. + """ + + @property + @abstractmethod + def name(self): + """ + Computer name. + """ + pass + + @abstractmethod + def move(self, target: str) -> GenericComputer: + """ + Move a computer object. + :param target: Target path. + :type target: str + :return: Self. + :rtype: GenericComputer + """ + pass + + +class GenericSite(ABC, BaseObject): + """ + Generic site management. + """ + + @property + @abstractmethod + def name(self): + """ + Site name. + """ + pass + + @abstractmethod + def add(self) -> GenericSite: + """ + Create new site. + + :return: Self. + :type: GenericSite + """ + pass + + class GenericNetgroup(ABC, BaseObject): """ Generic netgroup management. @@ -961,3 +1143,104 @@ def dump(self) -> str: @abstractmethod def __str__(self) -> str: pass + + +class GenericGPO( + ABC, + BaseObject, +): + """ + Generic GPO management. + """ + + @property + @abstractmethod + def name(self): + """ + GPO name. + """ + pass + + @abstractmethod + def get(self, key: str) -> str | None: + """ + Get GPO attribute. + + :param key: Attribute key. + :type key: str + :return: Attribute value, optional + :rtype: str | None + """ + pass + + @abstractmethod + def delete(self) -> None: + """ + Delete GPO. + """ + pass + + @abstractmethod + def add(self) -> GenericGPO: + """ + Add GPO. + """ + pass + + @abstractmethod + def link( + self, + target: str | None = None, + enforced: bool | None = False, + disabled: bool | None = False, + ) -> GenericGPO: + """ + Link GPO. + + :param target: Target location, optional. + :type target: str | None + :param enforced: Enforce boolean. + :type enforced: bool | None + :param disabled: Disabled boolean. + :type disabled: bool | None + :return: Self. + :rtype: GenericGPO + """ + pass + + @abstractmethod + def unlink(self) -> None: + """ + Unlink GPO. + """ + pass + + @abstractmethod + def permissions(self, target: str, permission_level: str, target_type: str | None = "Group") -> GenericGPO: + """ + Configure GPO permissions. + + :param target: Target location + :type target: str | None + :param permission_level: Permission level + :type permission_level: str + :param target_type: Target type, defaults to "Group" + :type target_type: str | None = "Group" + :return: Self. + :rtype: GenericGPO + """ + pass + + @abstractmethod + def policy(self, logon_rights: dict[str, list[GenericUser]], cfg: dict[str, Any] | None = None) -> GenericGPO: + """ + GPO configuration. + + :param logon_rights: Logon rights. + :type logon_rights: dict[str, list[GenericUser]] + :param cfg: Extra configuration parameters. + :type cfg: dict[str, Any] | None + :return: Self. + :rtype: GenericGPO + """ + pass diff --git a/sssd_test_framework/roles/samba.py b/sssd_test_framework/roles/samba.py index f6dd21f..adffc5e 100644 --- a/sssd_test_framework/roles/samba.py +++ b/sssd_test_framework/roles/samba.py @@ -2,27 +2,30 @@ from __future__ import annotations +import base64 +import configparser from typing import Any, TypeAlias import ldap.modlist from pytest_mh.cli import CLIBuilderArgs from pytest_mh.conn import ProcessResult -from sssd_test_framework.utils.ldap import LDAPRecordAttributes - from ..hosts.samba import SambaHost from ..misc import attrs_parse, to_list_of_strings +from ..utils.ldap import LDAPRecordAttributes from .base import BaseLinuxLDAPRole, BaseObject, DeleteAttribute from .ldap import LDAPAutomount, LDAPNetgroup, LDAPNetgroupMember, LDAPObject, LDAPOrganizationalUnit, LDAPSudoRule __all__ = [ "Samba", "SambaObject", + "SambaComputer", "SambaUser", "SambaGroup", "SambaOrganizationalUnit", "SambaAutomount", "SambaSudoRule", + "SambaGPO", ] @@ -143,7 +146,7 @@ def test_example(client: Client, samba: Samba): assert result.user.name == 'user-1' assert result.group.name == 'domain users' - :param name: User name. + :param name: Username. :type name: str :return: New user object. :rtype: SambaUser @@ -225,6 +228,65 @@ def test_example_netgroup(client: Client, samba: Samba): """ return SambaNetgroup(self, name, basedn) + def computer(self, name: str) -> SambaComputer: + """ + Get computer object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.Samba) + def test_example(client: Client, samba: Samba): + # Create OU + ou = samba.ou("test").add().dn + # Move computer object + samba.computer(client.host.hostname.split(".")[0]).move(ou) + + client.sssd.start() + + :param name: Computer name. + :type name: str + :return: New computer object. + :rtype: ADComputer + """ + return SambaComputer(self, name) + + def gpo(self, name: str) -> SambaGPO: + """ + Get group policy object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.AD) + def test_ad__gpo_is_set_to_enforcing(client: Client, samba: Samba): + user = ad.user("user").add() + allow_user = ad.user("allow_user").add() + deny_user = ad.user("deny_user").add() + + provider.gpo("test policy").add().policy( + { + "SeInteractiveLogonRight": [allow_user, provider.group("Domain Admins")], + "SeRemoteInteractiveLogonRight": [allow_user, provider.group("Domain Admins")], + "SeDenyInteractiveLogonRight": [deny_user], + "SeDenyRemoteInteractiveLogonRight": [deny_user], + } + ).link() + + client.sssd.domain["ad_gpo_access_control"] = "enforcing" + client.sssd.start() + + assert client.auth.ssh.password(username="allow_user", password="Secret123") + assert not client.auth.ssh.password(username="user", password="Secret123") + assert not client.auth.ssh.password(username="deny_user", password="Secret123") + + :param name: Name of the GPO. + :type name: str + :return: New GPO object. + :rtype: SambaGPO + """ + return SambaGPO(self, name) + def ou(self, name: str, basedn: LDAPObject | str | None = None) -> SambaOrganizationalUnit: """ Get organizational unit object. @@ -258,6 +320,25 @@ def test_example(client: Client, samba: Samba): """ return SambaOrganizationalUnit(self, name, basedn) + def site(self, name: str) -> SambaSite: + """ + Get site object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.Samba) + def test_example(client: Client, samba: Samba): + # Create New Site, this name cannot contain spaces + site = samba.site('New-Site').add() + + :param name: Site name. + :type name: str, cannot contain spaces + :return: New site object. + :rtype: SambaSite + """ + return SambaSite(self, name) + def sudorule(self, name: str, basedn: LDAPObject | str | None = "ou=sudoers") -> SambaSudoRule: """ Get sudo rule object. @@ -311,8 +392,15 @@ def __init__(self, role: Samba, command: str, name: str) -> None: self.name: str = name """Object name.""" + self.naming_context: str = role.ldap.naming_context + """Domain naming context.""" + self.__dn: str | None = None + self.__sid: str | None = None + + self.__cn: str | None = None + def _exec(self, op: str, args: list[str] | None = None, **kwargs) -> ProcessResult: """ Execute samba-tool command. @@ -332,6 +420,9 @@ def _exec(self, op: str, args: list[str] | None = None, **kwargs) -> ProcessResu if args is None: args = [] + if self.command == "gpo": + return self.role.host.conn.exec(["samba-tool", self.command, op, self.__cn, *args], **kwargs) + return self.role.host.conn.exec(["samba-tool", self.command, op, self.name, *args], **kwargs) def _add(self, attrs: CLIBuilderArgs) -> None: @@ -397,8 +488,34 @@ def get(self, attrs: list[str] | None = None) -> dict[str, list[str]]: :return: Dictionary with attribute name as a key. :rtype: dict[str, list[str]] """ - cmd = self._exec("show") - return attrs_parse(cmd.stdout_lines, attrs) + + # The samba-tool gpo show command returns a limited list of attributes, so we use LDAP instead + # The LDAP output is formatted to be like samba-tool + if self.command == "gpo": + result = self.role.host.ldap_conn.search_s( + f"cn=system,{self.naming_context}", + ldap.SCOPE_SUBTREE, + f"(&(objectClass=groupPolicyContainer)(displayName={self.name}))", + attrlist=attrs, + ) + + (_, result_attrs) = result[0] + out: list[str] = [] + for key, values in result_attrs.items(): + for value in values: + try: + decoded = value.decode("utf-8") + except UnicodeDecodeError: + decoded = base64.b64encode(value).decode("utf-8") + # The dn is missing from the output + if key == "distinguishedName": + out.insert(0, f"dn: {decoded}") + out.append(f"{key}: {decoded}") + cmd = out + else: + cmd = self._exec("show").stdout_lines + + return attrs_parse(cmd, attrs) @property def dn(self) -> str: @@ -412,6 +529,30 @@ def dn(self) -> str: self.__dn = obj.pop("dn")[0] return self.__dn + @property + def cn(self) -> str: + """ + Object's distinguished name. + """ + if self.__cn is not None: + return self.__cn + + obj = self.get(["cn"]) + self.__cn = obj.pop("cn")[0] + return self.__cn + + @property + def sid(self) -> str: + """ + Object's security identifier. + """ + if self.__sid is not None: + return self.__sid + + obj = self.get(["objectSid"]) + self.__sid = obj.pop("objectSid")[0] + return self.__sid + class SambaUser(SambaObject): """ @@ -679,6 +820,238 @@ def __get_member_args(self, members: list[SambaUser | SambaGroup]) -> list[str]: return [",".join([x.name for x in members])] +class SambaComputer(SambaObject): + """ + AD computer management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param role: AD role object. + :type role: AD + :param name: Computer name. + :type name: str + """ + super().__init__(role, "computer", name) + + def move(self, target: str) -> SambaComputer: + """ + Move a computer object. + + :param target: Target path. + :type target: str + :return: Self. + :rtype: SambaComputer + """ + self._exec("move", [target]) + + return self + + +class SambaSite(SambaObject): + """ + AD Sites management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param role: Samba role object. + :type role: Samba + :param name: Site name, cannot contain spaces. + :type name: str + """ + super().__init__(role, "sites", name) + + def add(self) -> SambaSite: + """ + Create new Samba site. + + :return: Self. + :rtype: SambaSite + """ + self._exec("create") + + return self + + +class SambaGPO(SambaObject): + """ + Group policy object management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param name: GPO name, defaults to 'Domain Test Policy' + :type name: str, optional + """ + super().__init__(role, "gpo", name) + + self.target: str | None = None + """Group policy target.""" + + self.search_base: str = f"cn=policies,cn=system,{self.role.host.naming_context}" + """Group policy search base.""" + + # samba-tool gpo commands edit the database files directly and need to be authenticated. + self.credentials: str = f" --username={self.role.host.admin} --password={self.role.host.adminpw}" + """Credentials to manage GPOs.""" + + def add(self) -> SambaGPO: + """ + Add a group policy object. + + :return: Samba group policy object + :rtype: SambaGPO + """ + self.host.conn.run(f'samba-tool gpo create "{self.name}" {self.credentials}') + + return self + + def delete(self) -> None: + """ + Delete group policy object. + """ + self.role.host.conn.run(f'samba-tool gpo del "{self.cn}" {self.credentials}') + + def link( + self, + target: str | None = None, + enforced: bool | None = False, + disabled: bool | None = False, + ) -> SambaGPO: + """ + Link the group policy to the target object inside the directory, a site, domain or an ou. + + :param target: Group policy target, defaults to 'Default-First-Site-Name' + :type target: str, optional + :param enforced: Enforced the policy + :type enforced: bool, optional + :param disabled: Disable the policy + :type disabled: bool, optional + :return: Samba group policy object + :rtype: SambaGPO + """ + if target is None and self.target is None: + self.target = f"CN=Default-First-Site-Name,CN=Sites,CN=Configuration,{self.role.host.naming_context}" + + if target is not None and self.target is None: + self.target = target + + args: CLIBuilderArgs = { + "Target": (self.cli.option.POSITIONAL, self.target), + "Guid": (self.cli.option.POSITIONAL, self.cn), + "enforce": (self.cli.option.SWITCH, enforced), + "disable": (self.cli.option.SWITCH, disabled), + "username": (self.cli.option.VALUE, self.role.host.admin), + "password": (self.cli.option.VALUE, self.role.host.adminpw), + } + + self.host.conn.run(self.cli.command("samba-tool gpo setlink", args)) + + return self + + def unlink(self) -> SambaGPO: + """ + Unlink the group policy from the target. + + :return: Samba group policy object + :rtype: SambaGPO + """ + self.host.conn.run(f'samba-tool gpo dellink "{self.target}" "{self.cn}" {self.credentials}') + + return self + + def policy(self, logon_rights: dict[str, list[SambaObject]], cfg: dict[str, Any] | None = None) -> SambaGPO: + """ + Group policy configuration. + + This method does the remaining configuration of the group policy. It updates + 'GptTmpl.inf' with security logon right keys with the SIDs of users and groups + objects. The *Remote* keys can be omitted, in which the interactive key's value + will then be used. + + To add users and groups to the policy, the SID must be used for the values. The + values need to be prefixed with an '*' and use a comma for a de-limiter, i.e. + `*SID1-2-3-4,*SID-5-6-7-8` + + Additionally, gPCMachineExtensionNames need to be updated in the directory so + the GPO is readable to the client. The value is a list of Client Side + Extensions (CSEs), that is an index of what part of the policy is pushed and + processed by the client. + + :param logon_rights: List of logon rights. + :type logon_rights: dict[str, list[SambaObject]] + :param cfg: Extra configuration for GptTmpl.inf file, defaults to None + :type cfg: dict[str, Any] | None, optional + :return: Samba Group policy object + :rtype: SambaGPO + """ + _path: str = ( + f"/var/lib/samba/sysvol/" + f"{self.role.domain}/" + f"Policies/{self.cn}" + f"/MACHINE/Microsoft/Windows " + f"NT/SecEdit/" + ) + _full_path: str = f"{_path}GptTmpl.inf" + + _keys: list[str] = [ + "SeInteractiveLogonRight", + "SeRemoteInteractiveLogonRight", + "SeDenyInteractiveLogonRight", + "SeDenyRemoteInteractiveLogonRight", + ] + + for i in _keys: + if i not in logon_rights.keys() and i == "SeRemoteInteractiveLogonRight": + logon_rights[i] = logon_rights["SeInteractiveLogonRight"] + if i not in logon_rights.keys() and i == "SeDenyRemoteInteractiveLogonRight": + logon_rights[i] = logon_rights["SeDenyInteractiveLogonRight"] + + for i in _keys: + if i not in logon_rights.keys(): + raise KeyError(f"Expected {i} but got {logon_rights.keys()}") + + _logon_rights: dict[str, Any] = {} + for k, v in logon_rights.items(): + sids: list[str] = [] + for j in v: + sids.append(f"*{j.sid}") + _logon_rights = {**_logon_rights, **{k: ",".join(sids)}} + + config = configparser.ConfigParser(interpolation=None) + config.optionxform = str # type: ignore + config["Unicode"] = {} + config["Unicode"]["Unicode"] = "yes" + config["Version"] = {} + config["Version"]["signature"] = '"$CHICAGO$"' + config["Version"]["Revision"] = "1" + config["Privilege Rights"] = {} + + for k, v in _logon_rights.items(): + config["Privilege Rights"][k] = v # type: ignore + + if cfg is not None: + for _k, _v in cfg.items(): + config[_k] = {} + for __k, __v in _v.items(): + config[_k][__k] = __v + + config.write(open("/tmp/GptTmpl.inf", "w")) + + # The enable the GPO the gPCMachineExtensionNames attributes needs to be updated with the proper CSEs + attrs: LDAPRecordAttributes = { + "gPCMachineExtensionNames": "[{827D319E-6EAC-11D2-A4EA-00C04F79F83A}" + "{803E14A0-B4FB-11D0-A0D0-00A0C90F574B}]" + } + self._modify(attrs) + + self.role.fs.mkdir_p(_path, mode="750", user="BUILTIN\\administrators", group="users") + self.role.fs.upload("/tmp/GptTmpl.inf", _full_path, mode="750", user="BUILTIN\\administrators", group="users") + + return self + + SambaOrganizationalUnit: TypeAlias = LDAPOrganizationalUnit[SambaHost, Samba] SambaAutomount: TypeAlias = LDAPAutomount[SambaHost, Samba] SambaSudoRule: TypeAlias = LDAPSudoRule[SambaHost, Samba, SambaUser, SambaGroup]