forked from machacekondra/python-rrmngmnt
-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9c492e1
commit c470943
Showing
4 changed files
with
319 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
from rrmngmnt.service import Service | ||
|
||
IPTABLES = 'iptables' | ||
|
||
|
||
class Firewall(Service): | ||
""" | ||
Class for firewall services | ||
""" | ||
def __init__(self, host): | ||
""" | ||
Args: | ||
host (host): Host object to run commands on | ||
""" | ||
super(Firewall, self).__init__(host) | ||
self.host = host | ||
|
||
def is_active(self, firewall_service): | ||
""" | ||
Check if the relevant firewall service is active on the host | ||
Args: | ||
firewall_service (str): Service name | ||
Returns: | ||
bool: True if the service is active on host, False if not | ||
""" | ||
return self.host.service(firewall_service).status() | ||
|
||
def chain(self, chain_name): | ||
""" | ||
Return Chain class to run commands on specefic firewall chain | ||
Args: | ||
chain_name (str): Name of chain to make changes | ||
Returns: | ||
chain: Chain class object | ||
""" | ||
return Chain(self.host, chain_name) | ||
|
||
|
||
class Chain(Service): | ||
""" | ||
Class for Firewall specific chain commands | ||
""" | ||
def __init__(self, host, chain_name): | ||
""" | ||
Args: | ||
host (host): Host object to run commands on | ||
chain_name (str): Name of the firewall chain | ||
""" | ||
super(Chain, self).__init__(host) | ||
self.host = host | ||
self.firewall_service = IPTABLES | ||
self.chain_name = chain_name.upper() | ||
if self.chain_name == 'OUTPUT': | ||
self.address_type = '--destination' | ||
elif self.chain_name == 'INPUT': | ||
self.address_type = '--source' | ||
else: | ||
raise NotImplementedError("only INPUT/OUTPUT chains are supported") | ||
|
||
def edit_chain( | ||
self, action, chain_name, address_type, dest, target, protocol='all', | ||
ports=None | ||
): | ||
""" | ||
Changes firewall configuration | ||
Args: | ||
action (str): action to perform | ||
chain_name (str): affected chain name | ||
address_type (str): '--destination' for outgoing rules, | ||
'--source' for incoming | ||
dest (dict): 'address' key and value containing destination host or | ||
list of destination hosts | ||
target (str): target rule to apply | ||
protocol (str): affected network protocol, Default is 'all' | ||
ports (list): list of ports to configure | ||
Returns: | ||
bool: True if configuration change succeeded, False otherwise | ||
Raises: | ||
NotImplementedError: In case the users specifies more than 15 ports | ||
to block | ||
Example: | ||
edit_chain( | ||
action='--append',chain='OUTPUT', address_type='--destination', | ||
dest={'address': nfs_server}, target='DROP' | ||
) | ||
""" | ||
dest = ",".join(dest['address']) | ||
cmd = [ | ||
self.firewall_service, action, chain_name, address_type, dest, | ||
'--jump', target.upper(), '--protocol', protocol | ||
] | ||
|
||
if ports: | ||
# Iptables multiport module accepts up to 15 ports | ||
if len(ports) > 15: | ||
raise NotImplementedError("Up to 15 ports can be specified") | ||
ports = ",".join(ports) | ||
|
||
if protocol.lower() == 'all': | ||
# Adjust the protocol type, '--dports' option requires specific | ||
# type | ||
cmd[-1] = 'tcp' | ||
|
||
cmd.extend(['--match', 'multiport', '--dports', ports]) | ||
|
||
return not self.host.executor().run_cmd(cmd)[0] | ||
|
||
def list_rules(self): | ||
""" | ||
List all existing rules in a specific Chain | ||
Returns: | ||
list: List of existing rules | ||
""" | ||
cmd = [self.firewall_service, '--list-rules', self.chain_name] | ||
rules = self.host.executor().run_cmd(cmd)[1] | ||
return rules.splitlines() | ||
|
||
def add_rule(self, dest, target, protocol='all', ports=None): | ||
""" | ||
Add new firewall rule to a specific chain | ||
Args: | ||
dest (dict): 'address' key and value containing destination host or | ||
list of destination hosts | ||
target (str): Target rule to apply | ||
protocol (str): affected network protocol, Default is 'all' | ||
ports (list): list of ports to configure | ||
Returns: | ||
bool: False if adding new rule failed, True if it succeeded | ||
""" | ||
return self.edit_chain( | ||
'--append', self.chain_name, self.address_type, dest, target, | ||
protocol, ports | ||
) | ||
|
||
def delete_rule(self, dest, target, protocol='all', ports=None): | ||
""" | ||
Delete existing firewall rule from a specific chain | ||
Args: | ||
dest (dict): 'address' key and value containing destination host or | ||
list of destination hosts | ||
target (str): Target rule to apply | ||
protocol (str): affected network protocol, Default is 'all' | ||
ports (list): list of ports to configure | ||
Returns: | ||
bool: False if deleting rule failed, True if it succeeded | ||
""" | ||
return self.edit_chain( | ||
'--delete', self.chain_name, self.address_type, dest, target, | ||
protocol, ports | ||
) | ||
|
||
def clean_rules(self): | ||
""" | ||
Delete all rules in a specific chain | ||
Returns: | ||
bool: True if succeeded, False otherwise | ||
""" | ||
cmd = [self.firewall_service, '--flush', self.chain_name] | ||
return not self.host.executor().run_cmd(cmd)[0] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# -*- coding: utf-8 -*- | ||
import pytest | ||
from rrmngmnt import Host | ||
from rrmngmnt.user import RootUser | ||
from .common import FakeExecutor | ||
|
||
host_executor = Host.executor | ||
|
||
|
||
def teardown_module(): | ||
Host.executor = host_executor | ||
|
||
|
||
def fake_cmd_data(cmd_to_data): | ||
def executor(self, user=None, pkey=False): | ||
e = FakeExecutor(user, self.ip) | ||
e.cmd_to_data = cmd_to_data.copy() | ||
return e | ||
Host.executor = executor | ||
|
||
|
||
def get_host(ip='1.1.1.1'): | ||
h = Host(ip) | ||
h.users.append(RootUser('123456')) | ||
return h | ||
|
||
|
||
class TestFirewall(object): | ||
|
||
data = { | ||
'which systemctl': (0, '/usr/bin/systemctl', ''), | ||
'systemctl list-unit-files | grep -o ^[^.][^.]*.service ' | ||
'| cut -d. -f1 | sort | uniq': ( | ||
0, | ||
'\n'.join( | ||
[ | ||
'iptables', | ||
'noniptables', | ||
] | ||
), | ||
'' | ||
), | ||
'systemctl status iptables.service': (0, '', ''), | ||
'systemctl status noniptables.service': (1, '', ''), | ||
} | ||
|
||
@classmethod | ||
def setup_class(cls): | ||
fake_cmd_data(cls.data) | ||
|
||
def test_running_service_positive(self): | ||
assert get_host().firewall.is_active('iptables') | ||
|
||
|
||
class TestChain(object): | ||
|
||
data = { | ||
'iptables --append OUTPUT --destination 2.2.2.2 --jump DROP ' | ||
'--protocol all': (0, '', ''), | ||
'iptables --append INPUT --source 2.2.2.2 --jump DROP ' | ||
'--protocol all': (0, '', ''), | ||
'iptables --delete OUTPUT --destination 2.2.2.2 --jump DROP ' | ||
'--protocol all': (0, '', ''), | ||
'iptables --delete INPUT --source 2.2.2.2 --jump DROP ' | ||
'--protocol all': (0, '', ''), | ||
'iptables --append OUTPUT --destination 2.2.2.2 --jump DROP ' | ||
'--protocol tcp --match multiport --dports ' | ||
'1,2,3,4,5,6,7,8,9,10,11,12,13,14,15': (0, '', ''), | ||
'iptables --append OUTPUT --destination 2.2.2.2 --jump DROP ' | ||
'--protocol tcp --match multiport --dports ' | ||
'1,2,3,4,5,6,7,8,9,10,11,12,13,14,15, 16': ( | ||
4, '', 'iptables v1.4.21: too many ports specified' | ||
), | ||
'iptables --flush OUTPUT': (0, '', '') | ||
} | ||
|
||
destination_host = {'address': ['2.2.2.2']} | ||
ports = [ | ||
'1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', | ||
'14', '15' | ||
] | ||
too_many_ports = [ | ||
'1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', | ||
'14', '15', '16' | ||
] | ||
|
||
@classmethod | ||
def setup_class(cls): | ||
fake_cmd_data(cls.data) | ||
|
||
def test_wrong_chain_name(self): | ||
with pytest.raises(NotImplementedError): | ||
get_host().firewall.chain('CHAIN') | ||
|
||
def test_add_outgoing_rule(self): | ||
assert get_host().firewall.chain('OUTPUT').add_rule( | ||
self.destination_host, 'DROP' | ||
) | ||
|
||
def test_add_incoming_rule(self): | ||
assert get_host().firewall.chain('INPUT').add_rule( | ||
self.destination_host, 'DROP' | ||
) | ||
|
||
def test_delete_outgoing_rule(self): | ||
assert get_host().firewall.chain('OUTPUT').delete_rule( | ||
self.destination_host, 'DROP' | ||
) | ||
|
||
def test_delete_incoming_rule(self): | ||
assert get_host().firewall.chain('OUTPUT').delete_rule( | ||
self.destination_host, 'DROP' | ||
) | ||
|
||
def test_add_outgoing_rule_with_ports(self): | ||
assert get_host().firewall.chain('OUTPUT').add_rule( | ||
self.destination_host, 'DROP', ports=self.ports | ||
) | ||
|
||
def test_add_outgoing_rule_with_too_many_ports(self): | ||
with pytest.raises(NotImplementedError): | ||
get_host().firewall.chain('OUTPUT').add_rule( | ||
self.destination_host, 'DROP', ports=self.too_many_ports | ||
) | ||
|
||
def test_clean_firewall_rules(self): | ||
assert get_host().firewall.chain('OUTPUT').clean_rules() |