diff --git a/.github/workflows/ci-ui-tests.yaml b/.github/workflows/ci-ui-tests.yaml new file mode 100644 index 000000000..53c743c49 --- /dev/null +++ b/.github/workflows/ci-ui-tests.yaml @@ -0,0 +1,58 @@ +name: ui-tests +on: + push: + branches: + - "main" + - "develop" + - "next" + - "sc4snmp-ui-tests" +jobs: + run-ui-e2e-tests: + name: run UI e2e tests + runs-on: ubuntu-latest + timeout-minutes: 120 + env: + CI_EXECUTION_TYPE: ci + + strategy: + matrix: + execution-type: ["basic", "extended"] + + steps: + - name: Checkout Project + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.10" + + - name: prepare values.yaml for configuration storing + working-directory: integration_tests + run: | + sed -i "s|/home/splunker|$(pwd)|g" values.yaml + + - name: install microk8s + run: | + sudo snap install microk8s --classic --channel=1.25/stable + sudo apt-get install snmp -y + sudo apt-get install python3-dev -y + + - name: run automatic_setup.sh + run: integration_tests/automatic_setup.sh + +# - name: run tests +# working-directory: integration_tests +# run: | +# poetry run pytest --splunk_host="localhost" --splunk_password="changeme2" --trap_external_ip="$(hostname -I | cut -d " " -f1)" +# + - name: install dependencies + working-directory: ui_tests + run: | + pip install -r requirements.txt + export PATH="/home/ubuntu/.local/bin:$PATH" + + - name: run tests + working-directory: ui_tests + run: | + pytest -vvv --splunk-user=admin --splunk-password="changeme2" --splunk-host="localhost" --device-simulator="$(hostname -I | cut -d " " -f1)" -k ${{ matrix.execution-type }} \ No newline at end of file diff --git a/integration_tests/automatic_setup.sh b/integration_tests/automatic_setup.sh index 085bf40f5..a02a3cadd 100755 --- a/integration_tests/automatic_setup.sh +++ b/integration_tests/automatic_setup.sh @@ -26,6 +26,7 @@ wait_for_splunk() { } function define_python() { + echo $(yellow "define python") if command -v python &> /dev/null; then PYTHON=python elif command -v python3 &> /dev/null; then @@ -97,6 +98,7 @@ cd integration_tests chmod u+x prepare_splunk.sh echo $(green "Preparing Splunk instance") ./prepare_splunk.sh +./install_sck.sh sed -i "s/###SPLUNK_TOKEN###/$(cat hec_token)/" values.yaml sed -i "s/###LOAD_BALANCER_ID###/$(hostname -I | cut -d " " -f1)/" values.yaml sudo docker run -d -p 161:161/udp tandrup/snmpsim @@ -128,6 +130,6 @@ wait_for_pod_initialization wait_for_sc4snmp_pods_to_be_up check_metallb_status -define_python +#define_python -deploy_poetry +#deploy_poetry diff --git a/integration_tests/install_sck.sh b/integration_tests/install_sck.sh new file mode 100755 index 000000000..96f28173a --- /dev/null +++ b/integration_tests/install_sck.sh @@ -0,0 +1,10 @@ +sudo microk8s helm3 repo add splunk-otel-collector-chart https://signalfx.github.io/splunk-otel-collector-chart +sudo microk8s helm3 upgrade --install sck \ + --set="clusterName=my-cluster" \ + --set="splunkPlatform.endpoint=https://$(hostname -I | cut -d " " -f1):8088/services/collector" \ + --set="splunkPlatform.insecureSkipVerify=true" \ + --set="splunkPlatform.token=$(cat hec_token)" \ + --set="splunkPlatform.metricsEnabled=true" \ + --set="splunkPlatform.metricsIndex=em_metrics" \ + --set="splunkPlatform.index=em_logs" \ + splunk-otel-collector-chart/splunk-otel-collector diff --git a/integration_tests/prepare_splunk.sh b/integration_tests/prepare_splunk.sh index 183e68680..9a0cb091f 100644 --- a/integration_tests/prepare_splunk.sh +++ b/integration_tests/prepare_splunk.sh @@ -1,6 +1,6 @@ create_splunk_indexes() { - index_names=("netmetrics" "netops") - index_types=("metric" "event") + index_names=("netmetrics" "em_metrics" "netops" "em_logs") + index_types=("metric" "metric" "event" "event") for index in "${!index_names[@]}" ; do if ! curl -k -u admin:"changeme2" "https://localhost:8089/services/data/indexes" \ -d datatype="${index_types[${index}]}" -d name="${index_names[${index}]}" ; then diff --git a/integration_tests/values.yaml b/integration_tests/values.yaml index 8f92d8a6c..eca9f5f13 100644 --- a/integration_tests/values.yaml +++ b/integration_tests/values.yaml @@ -1,3 +1,33 @@ +UI: + enable: true + frontEnd: + NodePort: 30001 + repository: ghcr.io/splunk/sc4snmp-ui/frontend/container + tag: "develop" + pullPolicy: "Always" + backEnd: + NodePort: 30002 + repository: ghcr.io/splunk/sc4snmp-ui/backend/container + tag: "develop" + pullPolicy: "Always" + init: + image: registry.access.redhat.com/ubi9/ubi + pullPolicy: IfNotPresent + + # valuesFileDirectory is obligatory if UI is used. It is an absolute directory path on the host machine + # where values.yaml is located and where configuration files from the UI will be generated. + valuesFileDirectory: "/home/splunker" + + # valuesFileName is an exact name of yaml file with user's configuration, located inside directory specified in + # valuesFileDirectory. It is optional. If it is provided then this file fill be updated with configuration from the UI. + # If the valuesFileName is empty, or provided file name can't be found inside valuesFileDirectory directory, + # then configuration from the UI will be saved in few files, each file for different section, inside + # valuesFileDirectory directory. + valuesFileName: "values.yaml" + + # If keepSectionFiles is set to true, separate configration files for different sections will be saved in + # valuesFileDirectory directory regardless of valuesFileName proper configuration. + keepSectionFiles: false splunk: enabled: true protocol: https @@ -62,6 +92,8 @@ scheduler: # - ['SNMPv2-MIB', 'sysName', 0] # - ['IF-MIB'] # - ['TCP-MIB'] + groups: | + {} poller: usernameSecrets: - sv3poller diff --git a/ui_tests/config/config.py b/ui_tests/config/config.py new file mode 100644 index 000000000..c350a8f07 --- /dev/null +++ b/ui_tests/config/config.py @@ -0,0 +1,34 @@ +import os + + +def get_execution_type(): + execution_type = os.environ.get("CI_EXECUTION_TYPE") + if execution_type is None: + return EXECUTION_TYPE_LOCAL + else: + return execution_type + + +def get_ui_host_ip_address(): + if EXECUTION_TYPE != EXECUTION_TYPE_LOCAL: + # test executed in pipeline in GitHub actions so the UI is on the same VM where tests are executed + return "localhost" + else: + return UI_HOST_FOR_LOCAL_EXECUTION + + +EXECUTION_TYPE_LOCAL = "local" +EXECUTION_TYPE = get_execution_type() + +UI_HOST_FOR_LOCAL_EXECUTION = "PUT_HERE_IP_ADDRESS_OF_SC4SNMP_VM" +UI_HOST = get_ui_host_ip_address() +UI_URL = f"http://{UI_HOST}:30001/" +EVENT_INDEX = "netops" +LOGS_INDEX = "em_logs" + +# timers +IMPLICIT_WAIT_TIMER = 10 + +# yaml file +YAML_FILE_PATH = "./../integration_tests/values.yaml" +DEFAULT_PORT = 161 diff --git a/ui_tests/conftest.py b/ui_tests/conftest.py new file mode 100644 index 000000000..6066c2fa9 --- /dev/null +++ b/ui_tests/conftest.py @@ -0,0 +1,32 @@ +import pytest +from logger.logger import Logger +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() + + +def pytest_addoption(parser): + parser.addoption( + "--device-simulator", + action="store", + dest="device-simulator", + default="127.0.0.1", + help="Device Simulator external IP, basically external IP of VM", + ) + + +@pytest.fixture(scope="function") +def setup(request): + config = {} + host = request.config.getoption("--splunk-host") + config["splunkd_url"] = "https://" + host + ":8089" + config["splunk_user"] = request.config.getoption("--splunk-user") + config["splunk_password"] = request.config.getoption("--splunk-password") + config["device_simulator"] = request.config.getoption("device-simulator") + + return config + + +def pytest_unconfigure(): + logger.info("Closing Web Driver") + WebDriverFactory.close_driver() diff --git a/ui_tests/logger/logger.py b/ui_tests/logger/logger.py new file mode 100644 index 000000000..ba6e261c5 --- /dev/null +++ b/ui_tests/logger/logger.py @@ -0,0 +1,25 @@ +import logging +import sys + +log = None + + +class Logger: + logger = None + + @classmethod + def initialize_logger(cls): + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + formatter = logging.Formatter("%(message)s") + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + logger.addHandler(handler) + # return logger + cls.logger = logger + + @classmethod + def get_logger(cls): + if cls.logger is None: + cls.initialize_logger() + return cls.logger diff --git a/ui_tests/pages/groups_page.py b/ui_tests/pages/groups_page.py new file mode 100644 index 000000000..a5dc86128 --- /dev/null +++ b/ui_tests/pages/groups_page.py @@ -0,0 +1,282 @@ +import time + +import pages.helper as helper +from logger.logger import Logger +from selenium.webdriver.common.by import By +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory.get_driver() + + +class GroupsPage: + def check_if_groups_table_is_displayed(self): + logger.info("Check if groups page is displayed") + groups_table_xpath = "//div[@data-test='sc4snmp:group-table']" + groups_container = driver.find_element(By.XPATH, groups_table_xpath) + return groups_container.is_displayed() + + def click_add_new_group_button(self): + logger.info(f"Click add new group button") + add_group_button_xpath = "//button[@data-test='sc4snmp:new-item-button']//span" + add_grp_btn = driver.find_element(By.XPATH, add_group_button_xpath) + add_grp_btn.click() + time.sleep(1) + + def set_group_name(self, group_name): + logger.info(f"Set group name: {group_name}") + add_grp_input = self._get_group_name_input() + add_grp_input.send_keys(group_name) + + def _get_group_name_input(self): + add_group_input_xpath = ( + "//div[@data-test='sc4snmp:form:group-name-input']//span//input" + ) + add_grp_input = driver.find_element(By.XPATH, add_group_input_xpath) + return add_grp_input + + def click_submit_button_for_add_group(self): + logger.info(f"Click submit button") + add_group_button_xpath = ( + "//button[@data-test='sc4snmp:form:submit-form-button']" + ) + add_grp_btn = driver.find_element(By.XPATH, add_group_button_xpath) + add_grp_btn.click() + # wait for group to be shown on the list + time.sleep(5) + + def click_submit_button_for_add_device(self): + self.click_submit_button_for_add_group() + + def click_cancel_button_for_add_device(self): + logger.info(f"Click cancel button") + cancel_button_xpath = "//button[@data-test='sc4snmp:form:cancel-button']" + cancel_btn = driver.find_element(By.XPATH, cancel_button_xpath) + cancel_btn.click() + + def check_if_groups_is_on_list(self, group_name): + logger.info(f"Checking if group is configured (is on list)") + group_entry_on_list_xpath = "//div[@data-test='sc4snmp:group']//p" + groups_entries = driver.find_elements(By.XPATH, group_entry_on_list_xpath) + for el in groups_entries: + # logger.info(f"group name > |{el.text}|") # debug + if group_name == el.text: + return True + logger.info("Group has not been found on list") + return False + + def delete_group_from_list(self, group_name): + logger.info(f"Removing group from groups list: {group_name}") + self.click_delete_group_button(group_name) + self.confirm_delete() + self.close_delete_popup() + + def click_delete_group_button(self, group_name): + logger.info(f"Clicking delete group button for: {group_name}") + delete_btn_for_group_with_name_xpath = f"//div[@data-test='sc4snmp:group' and child::*[text()='{group_name}']]//button[@data-test='sc4snmp:group:delete-group-button']" + delete_btn = driver.find_element(By.XPATH, delete_btn_for_group_with_name_xpath) + delete_btn.click() + time.sleep(1) + + def close_delete_popup(self): + logger.info(f"Closing profile delete popup") + close_profile_delete_popup_btn_xpath = ( + "//button[@data-test='sc4snmp:errors-modal:cancel-button']" + ) + close_btn = driver.find_element(By.XPATH, close_profile_delete_popup_btn_xpath) + close_btn.click() + time.sleep(1) + + def click_add_device_to_group(self, group_name): + logger.info(f"Click add device to group: {group_name}") + add_device_for_group_with_name_xpath = f"//div[@data-test='sc4snmp:group' and child::*[text()='{group_name}']]//button[@data-test='sc4snmp:group:new-device-button']" + add_device_btn = driver.find_element( + By.XPATH, add_device_for_group_with_name_xpath + ) + add_device_btn.click() + time.sleep(1) + + def get_error_message_while_adding_device_with_no_data(self): + logger.info(f"getting error message while adding device with no data") + error_msg_xpath = f"//p[@data-test='sc4snmp:ip-error']" + err_msg = driver.find_element(By.XPATH, error_msg_xpath) + return err_msg.text + + def get_number_of_devices_for_group(self, group_name): + logger.info(f"getting number of devices for group: {group_name}") + device_row_xpath = "//tr[@data-test='sc4snmp:group-row']" + rows = driver.find_elements(By.XPATH, device_row_xpath) + return len(rows) + + def set_device_ip(self, device_ip, edit=False): + logger.info(f"set device ip: {device_ip}") + device_ip_field_xpath = "//div[@data-test='sc4snmp:form:ip-input']//span//input" + ip_field = driver.find_element(By.XPATH, device_ip_field_xpath) + if edit: + helper.clear_input(ip_field) + ip_field.send_keys(device_ip) + + def check_if_device_is_configured(self, device_ip): + logger.info(f"Checking if device is configured (is on group list)") + device_entry_on_list_xpath = "//td[@data-test='sc4snmp:host-address']" + devices_entries = driver.find_elements(By.XPATH, device_entry_on_list_xpath) + for el in devices_entries: + # logger.info(f"device name > |{el.text}|") # debug + if device_ip == el.text: + return True + logger.info("Device has not been found on list") + return False + + def edit_group_name(self, group_name, new_group_name): + logger.info(f"change group name: {group_name} -> {new_group_name}") + edit_group_button_xpath = f"//div[@data-test='sc4snmp:group' and child::*[text()='{group_name}']]//button[@data-test='sc4snmp:group:edit-group-button']" + edit_group_btn = driver.find_element(By.XPATH, edit_group_button_xpath) + edit_group_btn.click() + add_grp_input = self._get_group_name_input() + helper.clear_input(add_grp_input) + add_grp_input.send_keys(new_group_name) + self.click_submit_button_for_add_group() + + def get_submit_edited_group_name_popup_message(self): + logger.info(f"Get submit edited group name popup text") + edited_group_popup_text_xpath = f"//div[@data-test='modal']//div//p" + edited_group_popup_text = driver.find_element( + By.XPATH, edited_group_popup_text_xpath + ) + return edited_group_popup_text.text + + def close_edited_profile_popup(self): + logger.info(f"Closing edited group popup") + close_popup_btn_xpath = ( + f"//button[@data-test='sc4snmp:errors-modal:cancel-button']" + ) + close_popup_btn = driver.find_element(By.XPATH, close_popup_btn_xpath) + close_popup_btn.click() + time.sleep(2) + + def delete_device_from_group(self, device_ip): + logger.info(f"Delete device from group popup") + delete_device_btn_xpath = f"//button[@data-test='sc4snmp:group-row-delete' and ancestor::tr//td[text()='{device_ip}']]" + delete_device_btn = driver.find_element(By.XPATH, delete_device_btn_xpath) + delete_device_btn.click() + time.sleep(2) + self.confirm_delete() + self.close_delete_popup() + + def click_edit_device(self, device_ip): + logger.info(f"Click edit device button") + edit_device_btn_xpath = f"//button[@data-test='sc4snmp:group-row-edit' and ancestor::tr//td[text()='{device_ip}']]" + edit_device_btn = driver.find_element(By.XPATH, edit_device_btn_xpath) + edit_device_btn.click() + time.sleep(2) + + def confirm_delete(self): + logger.info(f"Confirm delete device from group popup") + confirm_delete_xpath = ( + "//button[@data-test='sc4snmp:delete-modal:delete-button']" + ) + confirm_btn = driver.find_element(By.XPATH, confirm_delete_xpath) + confirm_btn.click() + time.sleep(1) + + def set_device_port(self, port, edit=False): + logger.info(f"set device port: {port}") + self._set_group_field("port", port, edit) + + def set_community_string(self, community_string, edit=False): + logger.info(f"set device community string: {community_string}") + self._set_group_field("community_string", community_string, edit) + + def set_secret(self, secret, edit=False): + logger.info(f"set device secret: {secret}") + self._set_group_field("secret", secret, edit) + + def set_security_engine(self, security_engine, edit=False): + logger.info(f"set security engine: {security_engine}") + self._set_group_field("security_engine", security_engine, edit) + + def _set_group_field(self, field_name, value, edit=False): + xpath = { + "port": "//div[@data-test='sc4snmp:form:port-input']//span//input", + "community_string": "//div[@data-test='sc4snmp:form:community-input']//span//input", + "secret": "//div[@data-test='sc4snmp:form:secret-input']//span//input", + "security_engine": "//div[@data-test='sc4snmp:form:security-engine-input']//span//input", + } + field_input = driver.find_element(By.XPATH, xpath[field_name]) + if edit: + helper.clear_input(field_input) + field_input.send_keys(value) + + def set_snmp_version(self, snmp_version): + logger.info(f"set device snmp version: {snmp_version}") + options = { + "From inventory": "//button[@data-test='sc4snmp:form:version-from-inventory']", + "1": "//button[@data-test='sc4snmp:form:version-1']", + "2c": "//button[@data-test='sc4snmp:form:version-2c']", + "3": "//button[@data-test='sc4snmp:form:version-3']", + } + snmp_version_expander_xpath = ( + "//button[@data-test='sc4snmp:form:select-version']" + ) + expander = driver.find_element(By.XPATH, snmp_version_expander_xpath) + expander.click() + time.sleep(1) + option = driver.find_element(By.XPATH, options[snmp_version]) + option.click() + + def get_device_port(self, device_ip): + logger.info(f"get device port: {device_ip}") + return self._get_group_field_value("port", device_ip) + + def get_device_snmp_version(self, device_ip): + logger.info(f"get device snmp_version: {device_ip}") + return self._get_group_field_value("snmp_version", device_ip) + + def get_device_community_string(self, device_ip): + logger.info(f"get device community string: {device_ip}") + return self._get_group_field_value("community_string", device_ip) + + def get_device_secret(self, device_ip): + logger.info(f"get device secret: {device_ip}") + return self._get_group_field_value("secret", device_ip) + + def get_device_security_engine(self, device_ip): + logger.info(f"get device security engine {device_ip}") + return self._get_group_field_value("security_engine", device_ip) + + def _get_group_field_value(self, field, device_ip): + xpath = { + "port": f"//td[@data-test='sc4snmp:host-port' and ancestor::tr//td[text()='{device_ip}']]", + "snmp_version": f"//td[@data-test='sc4snmp:host-version' and ancestor::tr//td[text()='{device_ip}']]", + "community_string": f"//td[@data-test='sc4snmp:host-community' and ancestor::tr//td[text()='{device_ip}']]", + "secret": f"//td[@data-test='sc4snmp:host-secret' and ancestor::tr//td[text()='{device_ip}']]", + "security_engine": f"//td[@data-test='sc4snmp:host-security-engine' and ancestor::tr//td[text()='{device_ip}']]", + } + community = driver.find_element(By.XPATH, xpath[field]) + return community.text + + def get_warning_message_when_removing_group_which_is_configured_in_inventory(self): + logger.info( + f"getting error message while removing group which is configured in inventory" + ) + warning_msg_xpath = ( + f"//div[@data-test-type='warning' and @data-test='message']//div" + ) + warning_msg = driver.find_element(By.XPATH, warning_msg_xpath) + return warning_msg.text + + def clear_groups(self): + logger.info(f"remove all groups") + group_delete_btn_xpath = ( + f"//button[@data-test='sc4snmp:group:delete-group-button']" + ) + delete_btns = driver.find_elements(By.XPATH, group_delete_btn_xpath) + logger.info(f"Need to remove {len(delete_btns)} items") + while len(delete_btns) > 0: + delete_btns[0].click() + time.sleep(1) + self.confirm_delete() + self.close_delete_popup() + time.sleep(1) + delete_btns = driver.find_elements(By.XPATH, group_delete_btn_xpath) + logger.info(f" {len(delete_btns)} more items for removal") diff --git a/ui_tests/pages/header_page.py b/ui_tests/pages/header_page.py new file mode 100644 index 000000000..eee8d5dc0 --- /dev/null +++ b/ui_tests/pages/header_page.py @@ -0,0 +1,83 @@ +import re +import time + +from logger.logger import Logger +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory.get_driver() + + +class HeaderPage: + def switch_to_profiles(self): + self._switch_to_page("profiles") + + def switch_to_groups(self): + self._switch_to_page("groups") + + def switch_to_inventory(self): + self._switch_to_page("inventory") + + def _switch_to_page(self, page_name): + logger.info(f"Switching to {page_name} tab") + page_button_xpath = { + "profiles": "//button[@data-test='sc4snmp:profiles-tab']", + "groups": "//button[@data-test='sc4snmp:groups-tab']", + "inventory": "//button[@data-test='sc4snmp:inventory-tab']", + } + xpath_button = page_button_xpath[page_name] + tab = driver.find_element(By.XPATH, xpath_button) + tab.click() + page_table_xpath = { + "profiles": "//div[@data-test='sc4snmp:profiles-table']", + "groups": "//div[@data-test='sc4snmp:group-table']", + "inventory": "//div[@data-test='sc4snmp:inventory-table']", + } + WebDriverWait(driver, 10).until( + EC.visibility_of_element_located((By.XPATH, page_table_xpath[page_name])) + ) + + def apply_changes(self): + logger.info("Apply changes") + apply_changes_button_xpath = ( + "//button[@data-test='sc4snmp:apply-changes-button']" + ) + apply_btn = driver.find_element(By.XPATH, apply_changes_button_xpath) + apply_btn.click() + time.sleep(3) + + def close_configuration_applied_notification_popup(self): + logger.info("Close configuration applied popup") + popup_xpath = "//button[@data-test='sc4snmp:errors-modal:cancel-button']" + close_popup_button = driver.find_element(By.XPATH, popup_xpath) + close_popup_button.click() + time.sleep(3) + + def get_time_to_upgrade(self): + logger.info("Get time to upgrade") + popup_text_xpath = "//div[@data-test='modal']//div//p" + popup_txt_element = driver.find_element(By.XPATH, popup_text_xpath) + text = popup_txt_element.text + matches = re.search(r"\d+", text) + number = int(matches.group()) + logger.info(f"Extracted number: {number}") + time.sleep(3) + return number + + def get_popup_error_message(self): + logger.info("Get popup error message") + popup_text_xpath = "//div[@data-test='modal']//div//div" + popup_txt_element = driver.find_element(By.XPATH, popup_text_xpath) + return popup_txt_element.text + + def close_error_popup(self): # two similar methods on profile page + logger.info("Close popup error message") + close_profile_delete_popup_btn_xpath = ( + "//button[@data-test='sc4snmp:errors-modal:cancel-button']" + ) + close_btn = driver.find_element(By.XPATH, close_profile_delete_popup_btn_xpath) + close_btn.click() + time.sleep(1) diff --git a/ui_tests/pages/helper.py b/ui_tests/pages/helper.py new file mode 100644 index 000000000..779fab440 --- /dev/null +++ b/ui_tests/pages/helper.py @@ -0,0 +1,11 @@ +from logger.logger import Logger +from selenium.webdriver.common.keys import Keys + +logger = Logger().get_logger() + + +def clear_input(input_element): + logger.info("Clearing input") + text = input_element.get_attribute("value") + for num in range(len(text)): + input_element.send_keys(Keys.BACKSPACE) diff --git a/ui_tests/pages/inventory_page.py b/ui_tests/pages/inventory_page.py new file mode 100644 index 000000000..d36281360 --- /dev/null +++ b/ui_tests/pages/inventory_page.py @@ -0,0 +1,320 @@ +import time + +import pages.helper as helper +import selenium.common.exceptions +from logger.logger import Logger +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory.get_driver() + + +class InventoryPage: + def check_if_inventory_table_is_displayed(self): + logger.info("Check if inventory page is displayed") + inventory_table_xpath = "//div[@data-test='sc4snmp:inventory-table']" + inventory_container = driver.find_element(By.XPATH, inventory_table_xpath) + return inventory_container.is_displayed() + + def check_if_entry_is_on_list(self, host_ip): + logger.info(f"Checking if host/group entry is configured (is on list)") + entries_on_list_xpath = "//td[@data-test='sc4snmp:inventory-address']" + entries = driver.find_elements(By.XPATH, entries_on_list_xpath) + logger.info(f"list length: {len(entries)}") + for el in entries: + # logger.info(f"entry name > |{el.text}|") # debug + if host_ip == el.text: + return True + logger.info("Entry has not been found on list") + return False + + def click_add_new_device_group_button(self): + logger.info(f"Click add new device/group entry button") + add_group_device_button_xpath = ( + "//button[@data-test='sc4snmp:new-item-button']//span//span" + ) + add_grp_device_btn = driver.find_element( + By.XPATH, add_group_device_button_xpath + ) + add_grp_device_btn.click() + time.sleep(3) + + def click_submit_button_for_add_entry(self): + self._click_submit_button() + + def _click_submit_button(self): + logger.info(f"Click submit button") + add_group_device_item_button_xpath = ( + "//button[@data-test='sc4snmp:form:submit-form-button']" + ) + add_grp_device_btn = driver.find_element( + By.XPATH, add_group_device_item_button_xpath + ) + add_grp_device_btn.click() + time.sleep(5) # wait for group to be shown on the list + + def click_submit_button_for_edit_entry(self): + self._click_submit_button() + + def delete_entry_from_list(self, host_ip): + logger.info(f"Removing entry from inventory list: {host_ip}") + delete_btn_for_inventory_with_host_ip_xpath = f"//button[@data-test='sc4snmp:inventory-row-delete' and ancestor::tr//td[text()='{host_ip}']]" + delete_btn = driver.find_element( + By.XPATH, delete_btn_for_inventory_with_host_ip_xpath + ) + delete_btn.click() + time.sleep(1) + self.confirm_delete() + self.close_delete_popup() + + def close_delete_popup(self): + logger.info(f"Closing inventory delete popup") + self._close_notification_popup() + + def _close_notification_popup(self): + close_inventory_delete_popup_btn_xpath = ( + "//button[@data-test='sc4snmp:errors-modal:cancel-button']" + ) + close_btn = driver.find_element( + By.XPATH, close_inventory_delete_popup_btn_xpath + ) + close_btn.click() + time.sleep(1) + + def close_edit_inventory_entry(self): + logger.info(f"Closing inventory edit popup") + self._close_notification_popup() + + def confirm_delete(self): + logger.info(f"Confirm delete entry") + confirm_delete_xpath = ( + "//button[@data-test='sc4snmp:delete-modal:delete-button']" + ) + confirm_btn = driver.find_element(By.XPATH, confirm_delete_xpath) + confirm_btn.click() + time.sleep(1) + + def set_community_string(self, community_string, edit=False): + logger.info(f"Set community string: {community_string}") + community_input_field_xpath = ( + "//div[@data-test='sc4snmp:form:community-input']//span//input" + ) + community_input_field = driver.find_element( + By.XPATH, community_input_field_xpath + ) + if edit: + helper.clear_input(community_input_field) + community_input_field.send_keys(community_string) + + def click_edit_inventory_entry(self, host_ip): + logger.info(f"Edit entry from inventory list with: {host_ip}") + edit_inventory_entry_btn_xpath = f"//button[@data-test='sc4snmp:inventory-row-edit' and ancestor::tr//td[text()='{host_ip}']]" + edit_inventory_entry_btn = driver.find_element( + By.XPATH, edit_inventory_entry_btn_xpath + ) + edit_inventory_entry_btn.click() + + def get_edit_inventory_notice(self): + logger.info(f"Get edited inventory popup text") + edited_inventory_popup_text_xpath = f"//div[@data-test='modal']//div//p" + edited_inventory_popup_text = driver.find_element( + By.XPATH, edited_inventory_popup_text_xpath + ) + return edited_inventory_popup_text.text + + def select_group_inventory_type(self): + logger.info(f"Select group inventory type") + group_inventory_type_btn_xpath = ( + f"//button[@data-test='sc4snmp:form:inventory-type-group']" + ) + group_inventory_type_btn = driver.find_element( + By.XPATH, group_inventory_type_btn_xpath + ) + group_inventory_type_btn.click() + + def get_host_missing_error(self): + logger.info(f"Get host missing error") + return self._get_error_for_missing_or_invalid_inventory_field("host_missing") + + def get_community_string_missing_error(self): + logger.info(f"Get community string missing error") + return self._get_error_for_missing_or_invalid_inventory_field( + "community_string_missing" + ) + + def get_walk_invalid_value_error(self): + logger.info(f"Get walk interval invalid value error") + return self._get_error_for_missing_or_invalid_inventory_field( + "walk_invalid_value" + ) + + def _get_error_for_missing_or_invalid_inventory_field(self, field): + xpath = { + "host_missing": f"//p[@data-test='sc4snmp:ip-group-error']", + "community_string_missing": f"//p[@data-test='sc4snmp:community-error']", + "walk_invalid_value": f"//p[@data-test='sc4snmp:walk-interval-error']", + } + try: + error_msg = driver.find_element(By.XPATH, xpath[field]) + return error_msg.text + except selenium.common.exceptions.NoSuchElementException: + return None + + def edit_device_port(self, port): + logger.info(f"set/edit inventory device port: {port}") + device_port_field_xpath = ( + "//div[@data-test='sc4snmp:form:port-input']//span//input" + ) + port_field = driver.find_element(By.XPATH, device_port_field_xpath) + helper.clear_input(port_field) + port_field.send_keys(port) + + def select_snmp_version(self, snmp_version): + logger.info(f"set device snmp version: {snmp_version}") + options = { + "1": "//button[@data-test='sc4snmp:form:version-1']", + "2c": "//button[@data-test='sc4snmp:form:version-2c']", + "3": "//button[@data-test='sc4snmp:form:version-3']", + } + snmp_version_expander_xpath = ( + "//button[@data-test='sc4snmp:form:select-version']" + ) + expander = driver.find_element(By.XPATH, snmp_version_expander_xpath) + expander.click() + time.sleep(1) + option = driver.find_element(By.XPATH, options[snmp_version]) + option.click() + + def set_host_or_group_name(self, host_ip, edit=False): + logger.info(f"Set host/group item name: {host_ip}") + self._set_inventory_field("host_group_name", host_ip, edit) + + def set_secret(self, secret, edit=False): + logger.info(f"set inventory device secret: {secret}") + self._set_inventory_field("secret", secret, edit) + + def set_security_engine(self, security_engine, edit=False): + logger.info(f"set inventory device security engine: {security_engine}") + self._set_inventory_field("security_engine", security_engine, edit) + + def _set_inventory_field(self, field, value, edit=False): + xpath = { + "host_group_name": "//div[@data-test='sc4snmp:form:group-ip-input']//span//input", + "secret": "//div[@data-test='sc4snmp:form:secret-input']//span//input", + "security_engine": "//div[@data-test='sc4snmp:form:security-engine-input']//span//input", + } + field_input = driver.find_element(By.XPATH, xpath[field]) + if edit: + helper.clear_input(field_input) + field_input.send_keys(value) + + def set_walk_interval(self, walk_interval): + logger.info(f"set/edit inventory device walk interval: {walk_interval}") + sec_engine_field_xpath = ( + "//div[@data-test='sc4snmp:form:walk-interval-input']//span//input" + ) + sec_engine = driver.find_element(By.XPATH, sec_engine_field_xpath) + helper.clear_input(sec_engine) + sec_engine.send_keys(walk_interval) + time.sleep(1) + + def set_smart_profiles(self, param): + logger.info(f"set inventory device smart profiles enabled to: {param}") + if param == "true" or param == "false": + smart_profile_true_xpath = ( + f"//button[@data-test='sc4snmp:form:smart-profile-{param}']" + ) + option = driver.find_element(By.XPATH, smart_profile_true_xpath) + option.click() + else: + logger.error( + f"Wrong parameter specified. Expected: true or false, received: {param}" + ) + + def select_profiles(self, profiles, edit=False): + logger.info(f"select profiles: {profiles}") + profiles_input_xpath = ( + "//div[@data-test='sc4snmp:form:profiles-multiselect']//div//input" + ) + profile_input = driver.find_element(By.XPATH, profiles_input_xpath) + if edit: + profile_options_xpath = "//button[@data-test='selected-option']" + options = driver.find_elements(By.XPATH, profile_options_xpath) + for option in options: + option.click() + time.sleep(0.5) + time.sleep(1) + for profile in profiles: + profile_input.send_keys(profile) + profile_input.send_keys(Keys.ENTER) + time.sleep(2) + # we need to hide profile list, + # otherwise it can break test execution and popup can intercept clicking on smart profiles + profile_input.send_keys(Keys.ESCAPE) + + def _get_inventory_data(self, host, field): + field_xpath = { + "snmp_version": f"//td[@data-test='sc4snmp:inventory-version' and ancestor::tr//td[text()='{host}']]", + "port": f"//td[@data-test='sc4snmp:inventory-port' and ancestor::tr//td[text()='{host}']]", + "community_string": f"//td[@data-test='sc4snmp:inventory-community' and ancestor::tr//td[text()='{host}']]", + "secret": f"//td[@data-test='sc4snmp:inventory-secret' and ancestor::tr//td[text()='{host}']]", + "security_engine": f"//td[@data-test='sc4snmp:inventory-security-engine' and ancestor::tr//td[text()='{host}']]", + "walk_interval": f"//td[@data-test='sc4snmp:inventory-walk-interval' and ancestor::tr//td[text()='{host}']]", + "profiles": f"//td[@data-test='sc4snmp:inventory-profiles' and ancestor::tr//td[text()='{host}']]", + "smart_profiles": f"//td[@data-test='sc4snmp:inventory-smart-profiles' and ancestor::tr//td[text()='{host}']]", + } + field = driver.find_element(By.XPATH, field_xpath[field]) + return field.text + + def get_snmp_version_for_entry(self, host): + logger.info(f"get {host} inventory -> snmp_version") + return self._get_inventory_data(host, "snmp_version") + + def get_port_for_entry(self, host): + logger.info(f"get {host} inventory -> port") + return self._get_inventory_data(host, "port") + + def get_community_string_for_entry(self, host): + logger.info(f"get {host} inventory -> community_string") + return self._get_inventory_data(host, "community_string") + + def get_secret_for_entry(self, host): + logger.info(f"get {host} inventory -> secret") + return self._get_inventory_data(host, "secret") + + def get_security_engine_for_entry(self, host): + logger.info(f"get {host} inventory -> security_engine") + return self._get_inventory_data(host, "security_engine") + + def get_walk_interval_for_entry(self, host): + logger.info(f"get {host} inventory -> walk_interval") + return self._get_inventory_data(host, "walk_interval") + + def get_profiles_for_entry(self, host): + logger.info(f"get {host} inventory -> profiles") + return self._get_inventory_data(host, "profiles") + + def get_smart_profiles_for_entry(self, host): + logger.info(f"get {host} inventory -> smart_profiles") + return self._get_inventory_data(host, "smart_profiles") + + def clear_inventory(self): + logger.info(f"remove all inventory entries") + delete_btn_for_inventory_with_host_ip_xpath = ( + f"//button[@data-test='sc4snmp:inventory-row-delete']" + ) + delete_btns = driver.find_elements( + By.XPATH, delete_btn_for_inventory_with_host_ip_xpath + ) + logger.info(f"Need to remove {len(delete_btns)} items") + while len(delete_btns) > 0: + delete_btns[0].click() + time.sleep(1) + self.confirm_delete() + self.close_delete_popup() + delete_btns = driver.find_elements( + By.XPATH, delete_btn_for_inventory_with_host_ip_xpath + ) + logger.info(f" {len(delete_btns)} more items for removal") diff --git a/ui_tests/pages/profiles_page.py b/ui_tests/pages/profiles_page.py new file mode 100644 index 000000000..8c29eca0c --- /dev/null +++ b/ui_tests/pages/profiles_page.py @@ -0,0 +1,299 @@ +import time + +import pages.helper as helper +from logger.logger import Logger +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory.get_driver() + + +class ProfilesPage: + def check_if_profiles_table_is_displayed(self): + logger.info("Check if profiles page is displayed") + profiles_table_xpath = "//div[@data-test='sc4snmp:profiles-table']" + profiles_container = driver.find_element(By.XPATH, profiles_table_xpath) + return profiles_container.is_displayed() + + def click_add_profile_button(self): + logger.info("Click Add New Profile button") + xpath = "//button[@data-test='sc4snmp:new-item-button']" + btn = driver.find_element(By.XPATH, xpath) + btn.click() + + def click_submit_button(self): + logger.info("Click Submit button") + xpath = "//button[@data-test='sc4snmp:form:submit-form-button']" + btn = driver.find_element(By.XPATH, xpath) + btn.click() + time.sleep(5) # wait for profile to be shown on the list + + def select_profile_type(self, profile_type): + logger.info(f"Set profile type: {profile_type}") + profiles = { + "standard": "//button[@data-test='sc4snmp:form:condition-standard']", + "base": "//button[@data-test='sc4snmp:form:condition-base']", + "smart": "//button[@data-test='sc4snmp:form:condition-smart']", + "walk": "//button[@data-test='sc4snmp:form:condition-walk']", + "conditional": "//button[@data-test='sc4snmp:form:condition-conditional']", + } + profile_type_expander_xpath = ( + "//button[@data-test='sc4snmp:form:select-condition']" + ) + expander = driver.find_element(By.XPATH, profile_type_expander_xpath) + expander.click() + option = driver.find_element(By.XPATH, profiles[profile_type]) + option.click() + + def set_frequency(self, freq_value): + logger.info(f"Setting profile frequency: {freq_value}") + xpath = "//div[@data-test='sc4snmp:form:frequency-input']//span//input" + freq_field = driver.find_element(By.XPATH, xpath) + helper.clear_input(freq_field) + # freq_field.send_keys(Keys.BACKSPACE) # clear() is not working here + freq_field.send_keys(freq_value) + + def set_profile_name(self, name): + logger.info(f"Setting profile frequency: {name}") + xpath = "//div[@data-test='sc4snmp:form:profile-name-input']//span//input" + name_input = driver.find_element(By.XPATH, xpath) + helper.clear_input(name_input) # this is useful when editing profile name + name_input.send_keys(name) + + def add_varBind(self, mcomponent, mobject=None, mindex=None): + logger.info(f"Adding varBind: {mcomponent, mobject, mindex}") + add_varBind_button_xpath = "//div[@data-test='sc4snmp:form:add-varbinds']//span[contains(text(),'Add varBind')]" + add_varBind_btn = driver.find_element(By.XPATH, add_varBind_button_xpath) + add_varBind_btn.click() + varbind_row_xpath = "//div[@data-test='sc4snmp:form:varbind-row']" + varBinds_rows = driver.find_elements(By.XPATH, varbind_row_xpath) + component_xpath = ( + "//div[@data-test='sc4snmp:form:varbind-mib-component-input']/span/input" + ) + component_input = varBinds_rows[-1].find_element(By.XPATH, component_xpath) + component_input.send_keys(mcomponent) + if mobject is not None: + object_xpath = ( + "//div[@data-test='sc4snmp:form:varbind-mib-object-input']/span/input" + ) + object_input = varBinds_rows[-1].find_element(By.XPATH, object_xpath) + object_input.send_keys(mobject) + if mindex is not None: + index_xpath = ( + "//div[@data-test='sc4snmp:form:varbind-mib-index-input']/span/input" + ) + index_input = varBinds_rows[-1].find_element(By.XPATH, index_xpath) + index_input.send_keys(mindex) + + def edit_varBind(self, new_mcomponent, new_mobject, new_mindex): + logger.info( + f"Editing varBind new values: {new_mcomponent}, {new_mobject}, {new_mindex}" + ) + varbind_row_xpath = "//div[@data-test='sc4snmp:form:varbind-row']" + varBinds_row = driver.find_element(By.XPATH, varbind_row_xpath) + component_xpath = ( + "//div[@data-test='sc4snmp:form:varbind-mib-component-input']/span/input" + ) + component_input = varBinds_row.find_element(By.XPATH, component_xpath) + helper.clear_input(component_input) + component_input.send_keys(new_mcomponent) + + object_xpath = ( + "//div[@data-test='sc4snmp:form:varbind-mib-object-input']/span/input" + ) + object_input = varBinds_row.find_element(By.XPATH, object_xpath) + helper.clear_input(object_input) + object_input.send_keys(new_mobject) + + index_xpath = ( + "//div[@data-test='sc4snmp:form:varbind-mib-index-input']/span/input" + ) + index_input = varBinds_row.find_element(By.XPATH, index_xpath) + helper.clear_input(index_input) + index_input.send_keys(new_mindex) + + def check_if_profile_is_configured(self, profile_name): + logger.info(f"Checking if profile is on profiles list: {profile_name}") + profiles_name_xpath = "//td[@data-test='sc4snmp:profile-name']" + profile_names = driver.find_elements(By.XPATH, profiles_name_xpath) + for element in profile_names: + # logger.info(f"profile name > |{element.text}|") # debug + if profile_name == element.text: + return True + logger.info("Profile has not been found on list") + return False + + def delete_profile_from_list(self, profile_name): + logger.info(f"Removing profile from profiles list: {profile_name}") + self.click_delete_profile_button(profile_name) + self._confirm_delete_profile() + self.close_profile_delete_popup() + + def click_delete_profile_button(self, profile_name): + logger.info(f"click delete profile button -> {profile_name}") + delete_btn_for_profile_with_name_xpath = f"//button[@data-test='sc4snmp:profile-row-delete' and ancestor::tr//td[text()='{profile_name}']]" + delete_btn = driver.find_element( + By.XPATH, delete_btn_for_profile_with_name_xpath + ) + delete_btn.click() + time.sleep(1) + + def _confirm_delete_profile(self): + confirm_delete_xpath = ( + "//button[@data-test='sc4snmp:delete-modal:delete-button']" + ) + confirm_btn = driver.find_element(By.XPATH, confirm_delete_xpath) + confirm_btn.click() + time.sleep(1) + + def close_profile_delete_popup(self): + logger.info(f"Closing profile delete popup") + close_profile_delete_popup_btn_xpath = ( + "//button[@data-test='sc4snmp:errors-modal:cancel-button']" + ) + close_btn = driver.find_element(By.XPATH, close_profile_delete_popup_btn_xpath) + close_btn.click() + time.sleep(1) + + def get_profile_type_for_profile_entry(self, profile_name): + logger.info(f"getting profile type for profile {profile_name}") + profile_type_for_profile_with_name_xpath = f"//td[@data-test='sc4snmp:profile-type' and ancestor::tr//td[text()='{profile_name}']]" + profile_type = driver.find_element( + By.XPATH, profile_type_for_profile_with_name_xpath + ) + return profile_type.text + + def set_smart_profile_field(self, field_value): + logger.info(f"Setting smart profile field {field_value}") + smart_profile_field_xpath = ( + "//div[@data-test='sc4snmp:form:condition-field-input']//span//input" + ) + field = driver.find_element(By.XPATH, smart_profile_field_xpath) + field.send_keys(field_value) + + def add_smart_profile_pattern(self, pattern): + logger.info(f"Add smart profile pattern {pattern}") + add_pattern_button_xpath = "//span[contains(text(),'Add pattern')]" + add_pattern_button = driver.find_element(By.XPATH, add_pattern_button_xpath) + add_pattern_button.click() + time.sleep(1) + pattern_row_xpath = ( + "//div[@data-test='sc4snmp:form:field-pattern']//span//input" + ) + pattern_rows = driver.find_elements(By.XPATH, pattern_row_xpath) + pattern_rows[-1].send_keys(pattern) + + def check_if_frequency_setting_field_is_visible(self): + logger.info(f"Checking if frequency setting field is visible") + xpath = "//div[@data-test='sc4snmp:form:frequency-input']//span//input" + try: + freq_field = driver.find_element(By.XPATH, xpath) + return freq_field.is_displayed() + except Exception as e: + return False + + def add_condition(self, field_value, operation, value): + logger.info(f"Adding condition: {field_value}, {operation}, {value}") + add_condition_button_xpath = ( + "//div[@data-test='sc4snmp:form:add-conditional-profile']//button" + ) + add_condition_btn = driver.find_element(By.XPATH, add_condition_button_xpath) + add_condition_btn.click() + time.sleep(1) + # set field + set_field_xpath = ( + "//div[@data-test='sc4snmp:form:conditional-field']//span//input" + ) + field = driver.find_element(By.XPATH, set_field_xpath) + field.send_keys(field_value) + # select operation + operation_expander_xpath = ( + "//button[@data-test='sc4snmp:form:conditional-select-operation']" + ) + operation_expander = driver.find_element(By.XPATH, operation_expander_xpath) + operation_expander.click() + operation_option_xpath = ( + f"//button[@data-test='sc4snmp:form:conditional-{operation}']" + ) + operation_option = driver.find_element(By.XPATH, operation_option_xpath) + operation_option.click() + # set value + value_field_xpath = ( + "//div[@data-test='sc4snmp:form:conditional-condition']//span//input" + ) + value_field = driver.find_element(By.XPATH, value_field_xpath) + value_field.send_keys(value) + + def click_edit_profile(self, profile_name): + logger.info(f"Edit profile: {profile_name}") + edit_btn_for_profile_with_name_xpath = f"//button[@data-test='sc4snmp:profile-row-edit' and ancestor::tr//td[text()='{profile_name}']]" + edit_btn = driver.find_element(By.XPATH, edit_btn_for_profile_with_name_xpath) + edit_btn.click() + time.sleep(1) + + def close_edited_profile_popup(self): + logger.info(f"Closing edited profile popup") + close_popup_btn_xpath = ( + f"//button[@data-test='sc4snmp:errors-modal:cancel-button']" + ) + close_popup_btn = driver.find_element(By.XPATH, close_popup_btn_xpath) + close_popup_btn.click() + time.sleep(2) + + def get_submit_edited_profile_text(self): + logger.info(f"Get submit edited profile popup text") + edited_profile_popup_text_xpath = f"//div[@data-test='modal']//div//p" + edited_profile_popup_text = driver.find_element( + By.XPATH, edited_profile_popup_text_xpath + ) + return edited_profile_popup_text.text + + def get_profile_freq(self, profile_name): + logger.info(f"Get profile frequency {profile_name}") + profile_freq_xpath = f"//td[@data-test='sc4snmp:profile-frequency' and ancestor::tr//td[text()='{profile_name}']]" + profile_freq = driver.find_element(By.XPATH, profile_freq_xpath) + return profile_freq.text + + def expand_profile(self, profile_name): + logger.info(f"Clik profile expand button: {profile_name}") + profile_expand_btn_xpath = f"//tr[@data-test='sc4snmp:profile-row' and child::td[text()='{profile_name}']]//td[@data-test='expand']" + profile_expand_btn = driver.find_element(By.XPATH, profile_expand_btn_xpath) + profile_expand_btn.click() + time.sleep(1) + + def get_profile_varbind(self, profile_name): + logger.info(f"Get profile varBind {profile_name}") + profile_mcomponent_xpath = ( + f"//td[@data-test='sc4snmp:profile-mib-component-expanded']//p" + ) + mcomponent = driver.find_element(By.XPATH, profile_mcomponent_xpath) + profile_mobject_xpath = ( + f"//td[@data-test='sc4snmp:profile-mib-object_expanded']//p" + ) + mobject = driver.find_element(By.XPATH, profile_mobject_xpath) + profile_mindex_xpath = ( + f"//td[@data-test='sc4snmp:profile-mib-index-expanded']//p" + ) + mindex = driver.find_element(By.XPATH, profile_mindex_xpath) + varBind = { + "mcomponent": mcomponent.text, + "mobject": mobject.text, + "mindex": int(mindex.text), + } + return varBind + + def clear_profiles(self): + logger.info(f"remove all profiles") + profile_delete_btn_xpath = f"//button[@data-test='sc4snmp:profile-row-delete']" + delete_btns = driver.find_elements(By.XPATH, profile_delete_btn_xpath) + logger.info(f"Need to remove {len(delete_btns)} items") + while len(delete_btns) > 0: + delete_btns[0].click() + time.sleep(1) + self._confirm_delete_profile() + self.close_profile_delete_popup() + time.sleep(1) + delete_btns = driver.find_elements(By.XPATH, profile_delete_btn_xpath) + logger.info(f" {len(delete_btns)} more items for removal") diff --git a/ui_tests/pages/yaml_values_reader.py b/ui_tests/pages/yaml_values_reader.py new file mode 100644 index 000000000..aa6c25c04 --- /dev/null +++ b/ui_tests/pages/yaml_values_reader.py @@ -0,0 +1,51 @@ +import yaml +from config import config + + +class YamlValuesReader: + _yaml_file_path = config.YAML_FILE_PATH + + def _open_and_read_yaml_file(self): + with open(self._yaml_file_path) as yaml_file: + try: + # try to load YAML data into a Python dictionary + data = yaml.safe_load(yaml_file) + + # print(data) + return data + + except yaml.YAMLError as e: + print(f"Error reading YAML file: {e}") + + def get_scheduler_profiles(self): + data = self._open_and_read_yaml_file() + profiles = data["scheduler"]["profiles"] + return profiles + + def get_scheduler_groups(self): + data = self._open_and_read_yaml_file() + profiles = data["scheduler"]["groups"] + return profiles + + def get_inventory_entries(self): + data = self._open_and_read_yaml_file() + profiles = data["poller"]["inventory"] + return profiles + + def get_field_value(self, field): + return field + + +## DEBUG -> +# if __name__ == "__main__": +# helper = YamlValuesReader() +# print(helper.get_scheduler_profiles()) +# data = helper.get_scheduler_profiles() +# +# print("-") +# print(helper.get_scheduler_groups()) +# groups = helper.get_scheduler_groups() +# print("-") +# inventory =helper.get_inventory_entries() +# print(inventory) +# print("-") diff --git a/ui_tests/pytest.ini b/ui_tests/pytest.ini new file mode 100644 index 000000000..2eebd2cc5 --- /dev/null +++ b/ui_tests/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +log_level = INFO +addopts = -rfps --durations=10 --disable-pytest-warnings --continue-on-collection-errors + +;log_level = DEBUG \ No newline at end of file diff --git a/ui_tests/requirements.txt b/ui_tests/requirements.txt new file mode 100644 index 000000000..d23ff5c49 --- /dev/null +++ b/ui_tests/requirements.txt @@ -0,0 +1,4 @@ +pytest-splunk-addon +selenium +webdriver_manager +pyyaml \ No newline at end of file diff --git a/ui_tests/splunk_search.py b/ui_tests/splunk_search.py new file mode 100644 index 000000000..be22b775e --- /dev/null +++ b/ui_tests/splunk_search.py @@ -0,0 +1,213 @@ +""" +Copyright 2018-2019 Splunk, Inc.. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import json +import logging +import os +import sys +import time + +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + +TIMEROUT = 500 + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +formatter = logging.Formatter( + "%(asctime)s - %(name)s -" + " %(levelname)s - %(message)s" +) +handler = logging.StreamHandler(sys.stdout) +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def check_events_from_splunk( + start_time="-1h@h", + end_time="now", + url="", + user="", + query="", + password="", +): + """ + send a search request to splunk and return the events from the result + """ + logger.info( + f"search query = {str(query)} ,start_time: {start_time}, end_time: {end_time}" + ) + events = _collect_events(query, start_time, end_time, url, user, password) + + return events + + +def _collect_events(query, start_time, end_time, url="", user="", password=""): + """ + Collect events by running the given search query + @param: query (search query) + @param: start_time (search start time) + @param: end_time (search end time) + returns events + """ + + search_url = "{}/services/search/jobs?output_mode=json".format(url) + logger.debug("requesting: %s", search_url) + data = { + "search": query, + "earliest_time": start_time, + "latest_time": end_time, + } + logger.debug("SEARCH DATA: {}".format(data)) + create_job = _requests_retry_session().post( + search_url, auth=(user, password), verify=False, data=data + ) + _check_request_status(create_job) + + json_res = create_job.json() + job_id = json_res["sid"] + events = _wait_for_job_and_get_events(job_id, url, user, password) + + return events + + +def _collect_metrics( + start_time, end_time, url="", user="", password="", index="", metric_name="" +): + """ + Verify metrics by running the given api query + @param: dimension (metric dimension) + @param: metric_name (metric name) + @param: start_time (search start time) + @param: end_time (search end time) + returns events + """ + api_url = ( + url + + "/services/catalog/metricstore/dimensions/host/values?filter=index%3d" + + index + + "&metric_name=" + + metric_name + + "&earliest=" + + start_time + + "&latest=" + + end_time + + "&output_mode=json" + ) + logger.debug("requesting: %s", api_url) + + create_job = _requests_retry_session().get( + api_url, auth=(user, password), verify=False + ) + + _check_request_status(create_job) + + json_res = create_job.json() + + events = json_res["entry"] + # logger.info('events: %s', events) + + return events + + +def _wait_for_job_and_get_events(job_id, url="", user="", password=""): + """ + Wait for the search job to finish and collect the result events + @param: job_id + returns events + """ + events = [] + job_url = "{}/services/search/jobs/{}?output_mode=json".format(url, str(job_id)) + logger.debug("requesting: %s", job_url) + + for _ in range(TIMEROUT): + res = _requests_retry_session().get( + job_url, auth=(user, password), verify=False + ) + _check_request_status(res) + + job_res = res.json() + dispatch_state = job_res["entry"][0]["content"]["dispatchState"] + + if dispatch_state == "DONE": + events = _get_events(job_id, url, user, password) + break + if dispatch_state == "FAILED": + raise Exception("Search job: {} failed".format(job_url)) + time.sleep(1) + + return events + + +def _get_events(job_id, url="", user="", password=""): + """ + collect the result events from a search job + @param: job_id + returns events + """ + event_url = "{}/services/search/jobs/{}/events?output_mode=json".format( + url, str(job_id) + ) + logger.debug("requesting: %s", event_url) + + event_job = _requests_retry_session().get( + event_url, auth=(user, password), verify=False + ) + _check_request_status(event_job) + + event_job_json = event_job.json() + events = event_job_json["results"] + logger.debug("Events from get_events method returned %s events", len(events)) + + return events + + +def _check_request_status(req_obj): + """ + check if a request is successful + @param: req_obj + returns True/False + """ + if not req_obj.ok: + raise Exception( + "status code: {} \n details: {}".format( + str(req_obj.status_code), req_obj.text + ) + ) + + +def _requests_retry_session( + retries=10, backoff_factor=0.1, status_forcelist=(500, 502, 504) +): + """ + create a retry session for HTTP/HTTPS requests + @param: retries (num of retry time) + @param: backoff_factor + @param: status_forcelist (list of error status code to trigger retry) + @param: session + returns: session + """ + session = requests.Session() + retry = Retry( + total=int(retries), + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount("http://", adapter) + session.mount("https://", adapter) + + return session diff --git a/ui_tests/tests/test_basic.py b/ui_tests/tests/test_basic.py new file mode 100644 index 000000000..5b4f8b717 --- /dev/null +++ b/ui_tests/tests/test_basic.py @@ -0,0 +1,56 @@ +import pytest +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from webdriver.webriver_factory import WebDriverFactory + +driver = WebDriverFactory().get_driver() +logger = Logger().get_logger() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() + + +@pytest.mark.basic +def test_check_page_title_is_correct(): + """ + Test that SC4SNMP UI page tile is correct + """ + page_title = driver.title + + logger.info(f"Page Title: {page_title}") + assert "SC4SNMP Manager" == page_title + + +@pytest.mark.basic +def test_check_selected_tab_behaviour(): + """ + Test that selected tab stays selected upon refreshing page + check if corresponding tables are displayed + """ + p_header.switch_to_profiles() + url = driver.current_url + assert "/?tab=Profiles" in url + assert p_profiles.check_if_profiles_table_is_displayed() + driver.refresh() + url = driver.current_url + assert "/?tab=Profiles" in url + + p_header.switch_to_groups() + url = driver.current_url + assert "/?tab=Groups" in url + assert p_groups.check_if_groups_table_is_displayed() + driver.refresh() + url = driver.current_url + assert "/?tab=Groups" in url + + p_header.switch_to_inventory() + url = driver.current_url + assert "/?tab=Inventory" in url + assert p_inventory.check_if_inventory_table_is_displayed() + driver.refresh() + url = driver.current_url + assert "/?tab=Inventory" in url diff --git a/ui_tests/tests/test_error_handling_and_complex_scenarios.py b/ui_tests/tests/test_error_handling_and_complex_scenarios.py new file mode 100644 index 000000000..edbc2da2b --- /dev/null +++ b/ui_tests/tests/test_error_handling_and_complex_scenarios.py @@ -0,0 +1,524 @@ +import time + +import pytest +from config import config +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from pages.yaml_values_reader import YamlValuesReader +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory().get_driver() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() +values_reader = YamlValuesReader() + + +@pytest.mark.basic +def test_trying_to_configure_profle_with_the_same_name(): + """ + Configure profile + try to configure profile with the same name again + check error message + """ + profile_name = "same_profile" + profile_freq = 10 + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(profile_freq) + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(profile_freq) + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + + message = p_header.get_popup_error_message() + assert ( + message + == f"Profile with name {profile_name} already exists. Profile was not added." + ) + p_header.close_error_popup() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + + p_profiles.delete_profile_from_list(profile_name) + + +@pytest.mark.basic +def test_trying_to_configure_group_with_the_same_name(): + """ + Configure group + try to configure group with the same name again + check error message + """ + group_name = "same_group" + + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + + # try to add same group again + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + # check error message + message = p_header.get_popup_error_message() + assert ( + message == f"Group with name {group_name} already exists. Group was not added." + ) + p_header.close_error_popup() + + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + + p_groups.delete_group_from_list(group_name) + + +@pytest.mark.basic +def test_trying_to_add_group_device_which_already_exists(): + """ + Configure group with device + try to add the same device to the group + check error message + """ + group_name = "same_group_device" + device_ip = "10.20.20.10" + port = 324 + snmp_version = "2c" + community_string = "test-device" + + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + # add device to grp + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.set_device_port(port) + p_groups.set_snmp_version(snmp_version) + p_groups.set_community_string(community_string) + p_groups.click_submit_button_for_add_device() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + + # try to add same device again + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.set_device_port(port) + p_groups.click_submit_button_for_add_device() + is_configured = p_groups.check_if_device_is_configured(device_ip) + assert is_configured is True + + # check error message + message = p_header.get_popup_error_message() + assert ( + message + == f"Host {device_ip}:{port} already exists in group {group_name}. Record was not added." + ) + p_header.close_error_popup() + + is_configured = p_groups.check_if_device_is_configured(device_ip) + assert is_configured is True + + p_groups.delete_group_from_list(group_name) + + +@pytest.mark.basic +def test_trying_to_add_inventory_with_host_which_already_exists(): + """ + Configure inventory with host + try to add the same host as another inventory entry + check error message + """ + host_ip = "100.200.100.200" + community_string = "test-device" + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host_ip) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is True + + # try to add same device again + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host_ip) + p_inventory.set_community_string("different_string") + p_inventory.click_submit_button_for_add_entry() + + # check error message + message = p_header.get_popup_error_message() + assert ( + message + == f"Host {host_ip}:{config.DEFAULT_PORT} already exists in the inventory. Record was not added." + ) + p_header.close_error_popup() + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is True + + p_inventory.delete_entry_from_list(host_ip) + + +@pytest.mark.basic +def test_trying_to_add_inventory_with_group_which_is_already_added(): + """ + Configure inventory with group + try to add the same group as another inventory entry + check error message + """ + # add group + group_name = f"test-group-inventory" + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + + community_string = "public" + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # try to add same device again + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.set_community_string("public_test_same_group") + p_inventory.click_submit_button_for_add_entry() + + # check error message + message = p_header.get_popup_error_message() + assert ( + message + == f"Group {group_name} has already been added to the inventory. Record was not added." + ) + p_header.close_error_popup() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(group_name) + p_header.switch_to_groups() + p_groups.delete_group_from_list(group_name) + + +@pytest.mark.basic +def test_trying_to_add_inventory_group_with_host_which_is_configured_as_host(): + """ + Configure inventory with group with host + try to add the inventory entry with the same host which is configured in group + check error message + """ + # add group + group_name = f"test-group-inventory" + device_ip = "40.50.60.70" + community_string = "public" + + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.click_submit_button_for_add_device() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # try to add the same host as inventory entry again + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(device_ip) + p_inventory.set_community_string("public_test_same_host") + p_inventory.click_submit_button_for_add_entry() + + # check error message + message = p_header.get_popup_error_message() + assert ( + message + == f"Host {device_ip}:{config.DEFAULT_PORT} already exists in group {group_name}. Record was not added." + ) + p_header.close_error_popup() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(group_name) + p_header.switch_to_groups() + p_groups.delete_group_from_list(group_name) + + +@pytest.mark.basic +def test_removing_group_which_is_configured_in_inventory(): + """ + Configure inventory -> add group as inventory entry + remove group which was added into inventory + check that upon removing group inventory entry is also removed + """ + # add group + group_name = f"test-group-inventory" + community_string = "public" + + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # delete group + p_header.switch_to_groups() + # p_groups.delete_group_from_list(group_name) + p_groups.click_delete_group_button(group_name) + message = ( + p_groups.get_warning_message_when_removing_group_which_is_configured_in_inventory() + ) + assert message == "WARNING: This group is configured in the inventory" + p_groups.confirm_delete() + p_groups.close_delete_popup() + + # check inventory is also removed + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_header.switch_to_inventory() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_removing_profile_which_is_configured_in_inventory(): + """ + Configure inventory with profile + remove profile which was added into inventory + check that upon removing profile, this profile in inventory entry is also removed + """ + # add group + profile_name = "removing_profile" + host = "99.99.99.99" + community_string = "public" + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host) + p_inventory.select_profiles([profile_name]) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # delete profile + p_header.switch_to_profiles() + p_profiles.click_delete_profile_button(profile_name) + + message = ( + p_groups.get_warning_message_when_removing_group_which_is_configured_in_inventory() + ) + assert ( + message + == "WARNING: This profile is configured in some records in the inventory" + ) + p_profiles._confirm_delete_profile() + p_profiles.close_profile_delete_popup() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + + # check inventory - no profile + p_header.switch_to_inventory() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + received_profiles = p_inventory.get_profiles_for_entry(host) + assert "" == received_profiles + # delete inventory entry + p_inventory.delete_entry_from_list(host) + + +@pytest.mark.basic +def test_try_to_add_to_inventory_group_which_does_not_exist(): + """ + Configure inventory with group which does not exist + check error message + """ + + group_name = "does_not_exist" + community_string = "abcd" + p_header.switch_to_inventory() + + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + + # check error message + message = p_header.get_popup_error_message() + assert ( + message + == f"Group {group_name} doesn't exist in the configuration. Record was not added." + ) + p_header.close_error_popup() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_trying_to_edit_profile_name_into_profile_name_that_exists(): + """ + Configure two profiles + try to change one profile to the second + check error message + """ + profile_name_1 = "profile_1" + profile_name_2 = "profile_2" + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name_1) + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name_2) + p_profiles.add_varBind("IP-MIB") + p_profiles.click_submit_button() + + # edit profile name + p_profiles.click_edit_profile(profile_name_1) + p_profiles.set_profile_name(profile_name_2) + p_profiles.click_submit_button() + + message = p_header.get_popup_error_message() + assert ( + message + == f"Profile with name {profile_name_2} already exists. Profile was not edited." + ) + p_header.close_error_popup() + exist = p_profiles.check_if_profile_is_configured(profile_name_1) + assert exist is True + exist = p_profiles.check_if_profile_is_configured(profile_name_2) + assert exist is True + + p_profiles.delete_profile_from_list(profile_name_1) + p_profiles.delete_profile_from_list(profile_name_2) + + +@pytest.mark.basic +def test_trying_to_edit_group_name_into_another_group_name(): + """ + Configure two groups + try to change one group to the second + check error message + """ + group_name_1 = "group_1" + group_name_2 = "group_2" + + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name_1) + p_groups.click_submit_button_for_add_group() + + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name_2) + p_groups.click_submit_button_for_add_group() + + # edit group name + p_groups.edit_group_name(group_name_1, group_name_2) + + message = p_header.get_popup_error_message() + assert ( + message + == f"Group with name {group_name_2} already exists. Group was not edited." + ) + p_header.close_error_popup() + is_on_list = p_groups.check_if_groups_is_on_list(group_name_1) + assert is_on_list is True + is_on_list = p_groups.check_if_groups_is_on_list(group_name_2) + assert is_on_list is True + + p_groups.delete_group_from_list(group_name_1) + p_groups.delete_group_from_list(group_name_2) + + +@pytest.mark.basic +def test_trying_to_edit_inventory_host_into_host_which_exists(): + """ + Configure two inventory hosts + try to change one host to the second + check error message + """ + host_1 = "11.11.11.11" + community_1 = "com1" + host_2 = "22.22.22.22" + community_2 = "abcs" + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host_1) + p_inventory.set_community_string(community_1) + p_inventory.click_submit_button_for_add_entry() + + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host_2) + p_inventory.set_community_string(community_2) + p_inventory.click_submit_button_for_add_entry() + + # edit inventory host + p_inventory.click_edit_inventory_entry(host_1) + p_inventory.set_host_or_group_name(host_2, True) + p_inventory.click_submit_button_for_add_entry() + + message = p_header.get_popup_error_message() + assert ( + message + == f"Host {host_2}:{config.DEFAULT_PORT} already exists in the inventory. Record was not edited." + ) + p_header.close_error_popup() + is_on_list = p_inventory.check_if_entry_is_on_list(host_1) + assert is_on_list is True + is_on_list = p_inventory.check_if_entry_is_on_list(host_2) + assert is_on_list is True + + p_inventory.delete_entry_from_list(host_1) + p_inventory.delete_entry_from_list(host_2) diff --git a/ui_tests/tests/test_groups_basic.py b/ui_tests/tests/test_groups_basic.py new file mode 100644 index 000000000..7b4b3014a --- /dev/null +++ b/ui_tests/tests/test_groups_basic.py @@ -0,0 +1,249 @@ +import time + +import pytest +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory().get_driver() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() + + +@pytest.mark.basic +def test_add_and_remove_group(): + """ + Test that user is able to add group, + check newly added group is displayed on groups list + remove group and check it is not on the list + """ + group_name = f"test-group" + p_header.switch_to_groups() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + p_groups.delete_group_from_list(group_name) + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_change_group_name(): + """ + Test that user is able to add group, + check that user is able to change group name + """ + group_name = f"change-name" + new_group_name = "new-group-name" + p_header.switch_to_groups() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + is_on_list_new = p_groups.check_if_groups_is_on_list(new_group_name) + assert is_on_list_new is False + # edit name + p_groups.edit_group_name(group_name, new_group_name) + message = p_groups.get_submit_edited_group_name_popup_message() # common method? + expected_message = ( + f"{group_name} was also renamed to {new_group_name} in the inventory" + ) + assert expected_message == message + p_groups.close_edited_profile_popup() # common method? + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + is_on_list_new = p_groups.check_if_groups_is_on_list(new_group_name) + assert is_on_list_new is True + + p_groups.delete_group_from_list(new_group_name) + is_on_list = p_groups.check_if_groups_is_on_list(new_group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_try_adding_device_to_group_with_no_data(): + """ + Test that user is not able to add device with no data + check error message + then click cancel + check no device on list + """ + group_name = f"device-with-no-data" + p_header.switch_to_groups() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + p_groups.click_add_device_to_group(group_name) + p_groups.click_submit_button_for_add_device() + message = p_groups.get_error_message_while_adding_device_with_no_data() + assert message == "Address or host name is required" + p_groups.click_cancel_button_for_add_device() + number_of_devices = p_groups.get_number_of_devices_for_group(group_name) + assert 0 == number_of_devices + p_groups.delete_group_from_list(group_name) + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_add_and_remove_device_into_group(): + """ + Test that user is able to add device into group, + After adding device into group that group is auto selected + check added device displayed on devices list + remove device and check it is not on the list anymore + """ + group_name = f"test-add-one-device" + device_ip = "1.2.3.4" + p_header.switch_to_groups() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.click_submit_button_for_add_device() + number_of_devices = p_groups.get_number_of_devices_for_group(group_name) + assert 1 == number_of_devices + is_configured = p_groups.check_if_device_is_configured(device_ip) + assert is_configured is True + p_groups.delete_device_from_group(device_ip) + number_of_devices = p_groups.get_number_of_devices_for_group(group_name) + assert 0 == number_of_devices + is_configured = p_groups.check_if_device_is_configured(device_ip) + assert is_configured is False + + p_groups.delete_group_from_list(group_name) + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_add_device_with_all_fields(): + """ + Test that user is able to add device into group, + After adding device into group that group is auto selected + check added device displayed on devices list + remove device and check it is not on the list anymore + """ + group_name = f"test-add-one-device" + device_ip = "1.2.3.4" + port = 1234 + snmp_version = "2c" + community_string = "public" + secret = "secret" + security_engine = "8000000903000AAAEF536715" + + p_header.switch_to_groups() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + # add device to grp + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.set_device_port(port) + p_groups.set_snmp_version(snmp_version) + p_groups.set_community_string(community_string) + p_groups.set_secret(secret) + p_groups.set_security_engine(security_engine) + + p_groups.click_submit_button_for_add_device() + is_configured = p_groups.check_if_device_is_configured(device_ip) + assert is_configured is True + + p_groups.delete_group_from_list(group_name) + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_edit_device_with_all_fields(): + """ + Test that user is able to add device into group, + User is able to edit device + remove device and check it is not on the list anymore + """ + group_name = f"test-edit-device" + device_ip = "1.2.3.4" + port = 1234 + snmp_version = "2c" + community_string = "public" + secret = "secret" + security_engine = "8000000903000AAAEF536715" + + p_header.switch_to_groups() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + # add device to grp + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.set_device_port(port) + p_groups.set_snmp_version(snmp_version) + p_groups.set_community_string(community_string) + p_groups.set_secret(secret) + p_groups.set_security_engine(security_engine) + p_groups.click_submit_button_for_add_device() + + # edit device data + new_device_ip = "4.3.2.1" + new_port = 4321 + new_snmp_version = "1" + new_community_string = "community" + new_secret = "test" + new_security_engine = "8000000903000AAAEF511115" + + p_groups.click_edit_device(device_ip) + p_groups.set_device_ip(new_device_ip, True) + p_groups.set_device_port(new_port, True) + p_groups.set_snmp_version(new_snmp_version) + p_groups.set_community_string(new_community_string, True) + p_groups.set_secret(new_secret, True) + p_groups.set_security_engine(new_security_engine, True) + p_groups.click_submit_button_for_add_device() + # verify + is_configured = p_groups.check_if_device_is_configured(device_ip) + assert is_configured is False + is_configured = p_groups.check_if_device_is_configured(new_device_ip) + assert is_configured is True + port = p_groups.get_device_port(new_device_ip) + assert int(port) == new_port + snmp_version_received = p_groups.get_device_snmp_version(new_device_ip) + assert snmp_version_received == new_snmp_version + community_string_received = p_groups.get_device_community_string(new_device_ip) + assert community_string_received == new_community_string + secret_received = p_groups.get_device_secret(new_device_ip) + assert secret_received == new_secret + security_engine_received = p_groups.get_device_security_engine(new_device_ip) + assert security_engine_received == new_security_engine + + p_groups.delete_group_from_list(group_name) + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False diff --git a/ui_tests/tests/test_inventory_basic.py b/ui_tests/tests/test_inventory_basic.py new file mode 100644 index 000000000..b8cb71b33 --- /dev/null +++ b/ui_tests/tests/test_inventory_basic.py @@ -0,0 +1,407 @@ +import time + +import pytest +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory().get_driver() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() + + +@pytest.mark.basic +def test_add_and_remove_inventory_entry(): + """ + Test that user is able to add inventory entry, + check newly added inventory is displayed on inventory list + remove inventory entry and check it is not on the list + """ + host_ip = "1.2.3.4" + community_string = "public" + p_header.switch_to_inventory() + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is False + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host_ip) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is True + p_inventory.delete_entry_from_list(host_ip) + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is False + + +@pytest.mark.basic +def test_add_device_into_inventory_then_change_it(): + """ + Test that user is able to add inventory entry, + check newly added inventory is displayed on inventory list + user is able to edit host + changed host is visible in inventory + remove inventory entry and check it is not on the list + """ + host_ip = "1.2.3.4" + community_string = "public" + p_header.switch_to_inventory() + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is False + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host_ip) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is True + # change + new_host_ip = "10.20.30.40" + p_inventory.click_edit_inventory_entry(host_ip) + p_inventory.set_host_or_group_name(new_host_ip, True) + p_inventory.click_submit_button_for_edit_entry() + + expected_notice = "Address or port was edited which resulted in deleting the old device and creating the new one at the end of the list." + received_notice = p_inventory.get_edit_inventory_notice() + assert expected_notice == received_notice + p_inventory.close_edit_inventory_entry() + + is_on_list = p_inventory.check_if_entry_is_on_list(host_ip) + assert is_on_list is False + is_on_list = p_inventory.check_if_entry_is_on_list(new_host_ip) + assert is_on_list is True + # delete + p_inventory.delete_entry_from_list(new_host_ip) + is_on_list = p_inventory.check_if_entry_is_on_list(new_host_ip) + assert is_on_list is False + + +@pytest.mark.basic +def test_add_group_into_inventory_entry(): + """ + Test that user is able to add inventory entry, + check newly added inventory is displayed on inventory list + user is able to edit host + changed host is visible in inventory + remove inventory entry and check it is not on the list + """ + # add group + group_name = f"test-group-inventory" + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is True + + community_string = "public" + p_header.switch_to_inventory() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is False + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.set_community_string(community_string) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(group_name) + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is False + p_header.switch_to_groups() + p_groups.delete_group_from_list(group_name) + is_on_list = p_groups.check_if_groups_is_on_list(group_name) + assert is_on_list is False + + +@pytest.mark.basic +def test_try_to_add_device_with_no_data_into_inventory(): + """ + Test that user is not able to add inventory entry with no data + set host, check community string required + set community + check inventory added + remove inventory entry and check it is not on the list + """ + host = "1.2.2.1" + community = "teststring" + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.click_submit_button_for_add_entry() + error = p_inventory.get_host_missing_error() + assert error == "Address or host name is required" + error = p_inventory.get_community_string_missing_error() + assert ( + error == "When using SNMP version 1 or 2c, community string must be specified" + ) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + + p_inventory.set_host_or_group_name(host) + p_inventory.click_submit_button_for_add_entry() + error = p_inventory.get_host_missing_error() + assert error is None + error = p_inventory.get_community_string_missing_error() + assert ( + error == "When using SNMP version 1 or 2c, community string must be specified" + ) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + + p_inventory.set_community_string(community) + error = p_inventory.get_community_string_missing_error() + assert ( + error == "When using SNMP version 1 or 2c, community string must be specified" + ) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(host) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + + +@pytest.mark.basic +def test_setting_min_walk_interval_value_in_inventory(): + """ + Test that user able to set walk interval value + acceptable valus are in range 1800 - 604800 + test min boundary + remove inventory entry and check it is not on the list + """ + host = "3.3.3.3" + community = "public" + + # min + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + + p_inventory.set_walk_interval("1799") + p_inventory.click_submit_button_for_add_entry() + error = p_inventory.get_walk_invalid_value_error() + assert error == "Walk Interval number must be an integer in range 1800-604800." + p_inventory.set_walk_interval("1800") + + p_inventory.click_submit_button_for_add_entry() + error = p_inventory.get_walk_invalid_value_error() + assert error is None + + # this two fields are set at the end to validate behavior of setting walk interval + p_inventory.set_host_or_group_name(host) + p_inventory.set_community_string(community) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(host) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + + +@pytest.mark.basic +def test_setting_max_walk_interval_value_in_inventory(): + """ + Test that user able to set walk interval value + acceptable valus are in range 1800 - 604800 + test max boundary + remove inventory entry and check it is not on the list + """ + host = "4.4.4.4" + community = "pub_test" + + # min + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + + p_inventory.set_walk_interval("604801") + p_inventory.click_submit_button_for_add_entry() + error = p_inventory.get_walk_invalid_value_error() + assert error == "Walk Interval number must be an integer in range 1800-604800." + p_inventory.set_walk_interval("604800") + p_inventory.click_submit_button_for_add_entry() + error = p_inventory.get_walk_invalid_value_error() + assert error is None + + # this two fields are set at the end to validate behavior of setting walk interval + p_inventory.set_host_or_group_name(host) + p_inventory.set_community_string(community) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(host) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + + +@pytest.mark.basic +def test_try_to_add_device_with_all_available_fields_into_inventory(): + """ + Test that user is not able to add inventory entry all available fields + then remove inventory entry and check it is not on the list + """ + host = "1.2.2.1" + port = "1234" + snmp_version = "3" + community = "teststring" + secret = "test_secret" + security_engine = "8000000903000AAAEF536715" + walk_interval = "3600" + profile_1 = "profile_1" + profile_2 = "profile_2" + profiles = [profile_1, profile_2] + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_1) + p_profiles.add_varBind("IP-MIB", "ifDescr") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_2) + p_profiles.add_varBind("IP-MIB", "ifError") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host) + p_inventory.edit_device_port(port) + p_inventory.select_snmp_version(snmp_version) + p_inventory.set_community_string(community) + p_inventory.set_secret(secret) + p_inventory.set_security_engine(security_engine) + p_inventory.set_walk_interval(walk_interval) + p_inventory.select_profiles(profiles) + p_inventory.set_smart_profiles("true") + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # delete + p_inventory.delete_entry_from_list(host) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + time.sleep(10) + + p_header.switch_to_profiles() + p_profiles.delete_profile_from_list(profile_1) + p_profiles.delete_profile_from_list(profile_2) + + +@pytest.mark.basic +def test_edit_inventory_entry_with_all_available_fields(): + """ + Test that user is not able to add inventory entry all available fields + check that user can edit all fields + check values of edited fields + then remove inventory entry and check it is not on the list + """ + host = "99.20.10.10" + port = "1234" + snmp_version = "3" + community = "teststring" + secret = "test_secret" + security_engine = "8000000903000AAAEF536715" + walk_interval = "3600" + smart_profiles = "false" + profile_1 = "profile_1_edit" + profile_2 = "profile_2_edit" + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_1) + p_profiles.add_varBind("IP-MIB", "ifDescr") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_2) + p_profiles.add_varBind("IP-MIB", "ifError") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host) + p_inventory.edit_device_port(port) + p_inventory.select_snmp_version(snmp_version) + p_inventory.set_community_string(community) + p_inventory.set_secret(secret) + p_inventory.set_security_engine(security_engine) + p_inventory.set_walk_interval(walk_interval) + p_inventory.select_profiles([profile_1]) + p_inventory.set_smart_profiles(smart_profiles) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # edit + new_host = "10.20.30.40" + new_port = "55555" + new_snmp_version = "2c" + new_community = "test_new_community" + new_secret = "changed_secret" + new_security_engine = "800000090BC0DD111101" + new_walk_interval = "10000" + new_smart_profiles = "true" + + p_inventory.click_edit_inventory_entry(host) + p_inventory.set_host_or_group_name(new_host, True) + p_inventory.edit_device_port(new_port) + p_inventory.select_snmp_version(new_snmp_version) + p_inventory.set_community_string(new_community, True) + p_inventory.set_secret(new_secret, True) + p_inventory.set_security_engine(new_security_engine, True) + p_inventory.set_walk_interval(new_walk_interval) + p_inventory.select_profiles([profile_2], True) + p_inventory.set_smart_profiles(new_smart_profiles) + p_inventory.click_submit_button_for_add_entry() + + expected_notice = "Address or port was edited which resulted in deleting the old device and creating the new one at the end of the list." + received_notice = p_inventory.get_edit_inventory_notice() + assert expected_notice == received_notice + p_inventory.close_edit_inventory_entry() + + # check + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + is_on_list = p_inventory.check_if_entry_is_on_list(new_host) + assert is_on_list is True + received_port = p_inventory.get_port_for_entry(new_host) + assert new_port == received_port + received_snmp_version = p_inventory.get_snmp_version_for_entry(new_host) + assert new_snmp_version == received_snmp_version + received_community_string = p_inventory.get_community_string_for_entry(new_host) + assert new_community == received_community_string + received_secret = p_inventory.get_secret_for_entry(new_host) + assert new_secret == received_secret + received_sec_engine = p_inventory.get_security_engine_for_entry(new_host) + assert new_security_engine == received_sec_engine + received_walk_interval = p_inventory.get_walk_interval_for_entry(new_host) + assert new_walk_interval == received_walk_interval + received_profiles = p_inventory.get_profiles_for_entry(new_host) + assert profile_2 == received_profiles + received_smart_profiles = p_inventory.get_smart_profiles_for_entry(new_host) + assert new_smart_profiles == received_smart_profiles + + # delete + p_inventory.delete_entry_from_list(new_host) + is_on_list = p_inventory.check_if_entry_is_on_list(new_host) + assert is_on_list is False + + p_header.switch_to_profiles() + p_profiles.delete_profile_from_list(profile_1) + p_profiles.delete_profile_from_list(profile_2) diff --git a/ui_tests/tests/test_profiles_basic.py b/ui_tests/tests/test_profiles_basic.py new file mode 100644 index 000000000..22eafce6a --- /dev/null +++ b/ui_tests/tests/test_profiles_basic.py @@ -0,0 +1,199 @@ +import time + +import pytest +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory().get_driver() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() + + +@pytest.mark.basic +@pytest.mark.parametrize("profile_type", ["standard", "base"]) +def test_add_profile(profile_type): + """ + Test that user is able to add profile, + check newly added profile is displayed on profiles list + remove profile and check it is not on the list + """ + profile_name = f"test-profile-{profile_type}" + p_header.switch_to_profiles() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(100) + p_profiles.select_profile_type(profile_type) + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + time.sleep(5) # wait for profile to be shown on the list + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + profile_type_of_profile = p_profiles.get_profile_type_for_profile_entry( + profile_name + ) + assert profile_type == profile_type_of_profile + p_profiles.delete_profile_from_list(profile_name) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + + +@pytest.mark.basic +def test_add_smart_profile(): + """ + Test that user is able to add smart profile, + check newly added profile is displayed on profiles list + remove profile and check it is not on the list + """ + profile_type = "smart" + profile_name = f"test-profile-{profile_type}" + p_header.switch_to_profiles() + time.sleep(5) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(3600) + p_profiles.select_profile_type(profile_type) + p_profiles.set_smart_profile_field("SNMPv2-MIB.sysDescr") + p_profiles.add_smart_profile_pattern(".*linux.*") + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + time.sleep(5) # wait for profile to be shown on the list + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + profile_type_of_profile = p_profiles.get_profile_type_for_profile_entry( + profile_name + ) + assert profile_type == profile_type_of_profile + p_profiles.delete_profile_from_list(profile_name) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + + +@pytest.mark.basic +def test_add_walk_profile(): + """ + Test that user is able to add walk profile, + check newly added profile is displayed on profiles list + remove profile and check it is not on the list + """ + profile_type = "walk" + profile_name = f"test-profile-{profile_type}" + p_header.switch_to_profiles() + time.sleep(5) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.select_profile_type(profile_type) + visible = p_profiles.check_if_frequency_setting_field_is_visible() + assert visible is False + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + time.sleep(5) # wait for profile to be shown on the list + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + profile_type_of_profile = p_profiles.get_profile_type_for_profile_entry( + profile_name + ) + assert profile_type == profile_type_of_profile + p_profiles.delete_profile_from_list(profile_name) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + + +@pytest.mark.basic +def test_add_conditional_profile(): + """ + Test that user is able to add conditional profile, + check newly added profile is displayed on profiles list + remove profile and check it is not on the list + """ + profile_type = "conditional" + profile_name = f"test-profile-{profile_type}" + p_header.switch_to_profiles() + time.sleep(5) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.select_profile_type(profile_type) + p_profiles.add_condition("IF-MIB.ifAdminStatus", "equals", "up") + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + profile_type_of_profile = p_profiles.get_profile_type_for_profile_entry( + profile_name + ) + assert profile_type == profile_type_of_profile + p_profiles.delete_profile_from_list(profile_name) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + + +@pytest.mark.basic +def test_edit_profile(): + """ + Test that user is able to edit profile, + editing profile name works + editing frequency works + editing varBinds works + """ + profile_type = "standard" + profile_name = f"test-profile-{profile_type}" + p_header.switch_to_profiles() + time.sleep(5) + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(100) + p_profiles.select_profile_type(profile_type) + p_profiles.add_varBind("IP-MIB", "ifDescr", 1) + p_profiles.click_submit_button() + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is True + # edit profile + new_freq = 45 + new_profile_name = "new_name" + new_varBind = {"mcomponent": "IP-MIBv2", "mobject": "ifDescr_v2", "mindex": 2} + + p_profiles.click_edit_profile(profile_name) + p_profiles.set_profile_name(new_profile_name) + p_profiles.set_frequency(new_freq) + p_profiles.edit_varBind( + new_varBind["mcomponent"], new_varBind["mobject"], new_varBind["mindex"] + ) + p_profiles.click_submit_button() + + # verify notice : If {pname} was used in some records in the inventory, it was updated to {new_pname} + received = p_profiles.get_submit_edited_profile_text() + expected = f"If {profile_name} was used in some records in the inventory, it was updated to {new_profile_name}" + assert expected == received + p_profiles.close_edited_profile_popup() + # check edited fields + # name + exist = p_profiles.check_if_profile_is_configured(profile_name) + assert exist is False + exist = p_profiles.check_if_profile_is_configured(new_profile_name) + assert exist is True + # freq + received_freq = p_profiles.get_profile_freq(new_profile_name) + assert new_freq == int(received_freq) + # varBinds - this verification is very case specific as profile row and expanded row does not have same Web element container + p_profiles.expand_profile(new_profile_name) + varBind = p_profiles.get_profile_varbind(new_profile_name) + assert new_varBind == varBind + p_profiles.delete_profile_from_list(new_profile_name) + exist = p_profiles.check_if_profile_is_configured(new_profile_name) + assert exist is False diff --git a/ui_tests/tests/test_save_update_configuration.py b/ui_tests/tests/test_save_update_configuration.py new file mode 100644 index 000000000..ac8a2eda2 --- /dev/null +++ b/ui_tests/tests/test_save_update_configuration.py @@ -0,0 +1,259 @@ +import time + +import pytest +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from pages.yaml_values_reader import YamlValuesReader +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory().get_driver() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() +values_reader = YamlValuesReader() + + +@pytest.fixture(autouse=True, scope="module") +def setup_and_teardown(): + # clear profiles + p_header.switch_to_profiles() + p_profiles.clear_profiles() + + # clear groups + p_header.switch_to_groups() + p_groups.clear_groups() + + # clear inventory + # we should wait for timer expiry to not have temporary records in inventory with delete flag set to true as this will cause test failures + p_header.switch_to_inventory() + p_inventory.clear_inventory() + p_header.apply_changes() + time_to_upgrade = p_header.get_time_to_upgrade() + p_header.close_configuration_applied_notification_popup() + time.sleep(time_to_upgrade + 30) # wait for upgrade + walk time + polling + yield + # teardown here if needed + + +@pytest.mark.extended +def test_check_that_profile_config_is_stored_upon_applying_configuration(): + """ + Configure profile + check that profile is stored in yaml file + edit profile, change freq + add new profile + click apply changes once again + changes stored even when the timer has not yet expired + """ + profile_name_1 = "store_profile" + profile_freq_1 = 10 + profile_freq_1_new = 10 + profile_name_2 = "profile_next" + profile_freq_2 = 77 + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name_1) + p_profiles.set_frequency(profile_freq_1) + p_profiles.add_varBind("IF-MIB", "ifInErrors", "1") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + # check no profile + profiles = values_reader.get_scheduler_profiles() + assert "{}\n" == profiles # profiles should be empty + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check that configuration is stored + expected_profile_output = f"{profile_name_1}:\n frequency: {profile_freq_1}\n varBinds:\n - ['IF-MIB', 'ifInErrors', '1']\n" + profiles = values_reader.get_scheduler_profiles() + assert expected_profile_output == profiles + + # edit profile + p_profiles.click_edit_profile(profile_name_1) + p_profiles.set_frequency(profile_freq_1_new) + p_profiles.click_submit_button() + # add another profile + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name_2) + p_profiles.set_frequency(profile_freq_2) + p_profiles.add_varBind("SNMPv2-MIB", "sysDescr") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + # check that configuration is not changed because it has been not applied + profiles = values_reader.get_scheduler_profiles() + assert expected_profile_output == profiles + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check that configuration is stored + expected_profile_output_2 = f"{profile_name_1}:\n frequency: {profile_freq_1_new}\n varBinds:\n - ['IF-MIB', 'ifInErrors', '1']\n{profile_name_2}:\n frequency: {profile_freq_2}\n varBinds:\n - ['SNMPv2-MIB', 'sysDescr']\n" + profiles = values_reader.get_scheduler_profiles() + assert expected_profile_output_2 == profiles + + # finalize - clear + p_profiles.delete_profile_from_list(profile_name_1) + p_profiles.delete_profile_from_list(profile_name_2) + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check no profile + profiles = values_reader.get_scheduler_profiles() + assert "{}\n" == profiles # profiles should be empty + + +@pytest.mark.extended +def test_check_that_group_config_is_stored_upon_applying_configuration(): + """ + Configure group + check that group is stored in yaml file + add device to group + click apply changes once again + changes stored even when the timer has not yet expired + """ + + group_name = f"test-group-store" + device_ip = "11.22.33.44" + port = 1234 + snmp_version = "2c" + community_string = "public" + secret = "secret" + security_engine = "8000000903000AAAEF536715" + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + + # check no group + groups = values_reader.get_scheduler_groups() + assert "{}\n" == groups # groups should be empty + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check that configuration is stored + expected_group_output = f"{group_name}: []\n" + groups = values_reader.get_scheduler_groups() + assert expected_group_output == groups + + # edit group + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(device_ip) + p_groups.set_device_port(port) + p_groups.set_snmp_version(snmp_version) + p_groups.set_community_string(community_string) + p_groups.set_secret(secret) + p_groups.set_security_engine(security_engine) + p_groups.click_submit_button_for_add_device() + + # check that configuration is not changed because it has been not applied + groups = values_reader.get_scheduler_groups() + assert expected_group_output == groups + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check that configuration is stored + expected_groups_output_2 = f"test-group-store:\n- address: {device_ip}\n port: {port}\n version: '{snmp_version}'\n community: '{community_string}'\n secret: '{secret}'\n security_engine: {security_engine}\n" + groups = values_reader.get_scheduler_groups() + assert expected_groups_output_2 == groups + + # finalize - clear + p_groups.delete_group_from_list(group_name) + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check no group + groups = values_reader.get_scheduler_groups() + assert "{}\n" == groups # groups should be empty + + +@pytest.mark.extended +def test_check_that_inventory_config_is_stored_upon_applying_configuration(): + """ + add inventory entry + check that inventory is stored in yaml file + remove inventory + click apply changes once again + changes stored even when the timer has not yet expired + """ + inventory_first_row = "address,port,version,community,secret,security_engine,walk_interval,profiles,smart_profiles,delete" + host = "88.77.66.55" + port = "612" + snmp_version = "2c" + community = "green" + walk_interval = "3600" + smart_profiles = "false" + profile = "test_profile_1" + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile) + p_profiles.add_varBind("IP-MIB", "ifDescr") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host) + p_inventory.edit_device_port(port) + p_inventory.select_snmp_version(snmp_version) + p_inventory.set_community_string(community) + p_inventory.set_walk_interval(walk_interval) + p_inventory.select_profiles([profile]) + p_inventory.set_smart_profiles(smart_profiles) + p_inventory.click_submit_button_for_add_entry() + + # check no inventory entry + inventory = values_reader.get_inventory_entries() + assert inventory_first_row == inventory # groups should be empty + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check that configuration is stored + expected_inventory_output = f"{inventory_first_row}\n{host},{port},{snmp_version},{community},,,{walk_interval},{profile},f,f" + inventory = values_reader.get_inventory_entries() + assert expected_inventory_output == inventory + + # remove inventory + p_inventory.delete_entry_from_list(host) + + # check that configuration is not changed because it has been not applied + inventory = values_reader.get_inventory_entries() + assert expected_inventory_output == inventory + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # check that configuration is stored + expected_inventory_output_2 = f"{inventory_first_row}\n{host},{port},{snmp_version},{community},,,{walk_interval},{profile},f,t" + inventory = values_reader.get_inventory_entries() + assert expected_inventory_output_2 == inventory + + # finalize - clear + p_header.switch_to_profiles() + p_profiles.delete_profile_from_list(profile) + + # apply changes + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() diff --git a/ui_tests/tests/test_splunk_integration.py b/ui_tests/tests/test_splunk_integration.py new file mode 100644 index 000000000..2738ce1d7 --- /dev/null +++ b/ui_tests/tests/test_splunk_integration.py @@ -0,0 +1,505 @@ +import time + +import pytest +from config import config +from logger.logger import Logger +from pages.groups_page import GroupsPage +from pages.header_page import HeaderPage +from pages.inventory_page import InventoryPage +from pages.profiles_page import ProfilesPage +from splunk_search import check_events_from_splunk +from webdriver.webriver_factory import WebDriverFactory + +logger = Logger().get_logger() +driver = WebDriverFactory().get_driver() +p_header = HeaderPage() +p_profiles = ProfilesPage() +p_groups = GroupsPage() +p_inventory = InventoryPage() + + +@pytest.fixture(autouse=True, scope="module") +def setup_and_teardown(): + # clear profiles + p_header.switch_to_profiles() + p_profiles.clear_profiles() + + # clear groups + p_header.switch_to_groups() + p_groups.clear_groups() + + # clear inventory + p_header.switch_to_inventory() + p_inventory.clear_inventory() + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + yield + # teardown here if needed + + +@pytest.mark.extended +def test_applying_changes_for_device_that_does_not_exists(setup): + """ + Configure device which does not exist + walk checking, and no polling + Test that after applying changes: + walk is scheduled + no polling is scheduled + no events received on netops index + """ + host = "1.2.3.4" + community = "public" + profile_name = "splunk_profile_1" + profile_freq = 10 + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(profile_freq) + p_profiles.add_varBind("IF-MIB", "ifInErrors") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host) + p_inventory.select_profiles([profile_name]) + p_inventory.set_community_string(community) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # apply changes + p_header.apply_changes() + time_to_upgrade = p_header.get_time_to_upgrade() + p_header.close_configuration_applied_notification_popup() + time.sleep(time_to_upgrade + 60) # wait for upgrade + + # check data in Splunk + # check walk scheduled + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';walk"' + ) + events = check_events_from_splunk( + start_time="-3m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 1 + + # check no profiles polling + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';*;poll"' + ) + events = check_events_from_splunk( + start_time="-3m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 0 + + # check no events + search_query = "index=" + config.EVENT_INDEX + " *" + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 0 + + # delete + p_inventory.delete_entry_from_list(host) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + # clear inventory + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # clear profiles + p_header.switch_to_profiles() + p_profiles.delete_profile_from_list(profile_name) + + +@pytest.mark.extended +def test_setting_group_in_inventory(setup): + """ + Configure group with device, + configure smart profiles - disabled, + configure one standard profile + apply changes + check no polling on smart profiles + check standard profile is working + """ + group_name = "splk-interaction-grp" + host = setup["device_simulator"] + community = "public" + profile_name = "standard_profile_12s" + profile_freq = 12 + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_name) + p_profiles.set_frequency(profile_freq) + p_profiles.add_varBind("IF-MIB", "ifDescr") + p_profiles.click_submit_button() + time.sleep(1) # wait for profile to be shown on the list + + p_header.switch_to_groups() + p_groups.click_add_new_group_button() + p_groups.set_group_name(group_name) + p_groups.click_submit_button_for_add_group() + p_groups.click_add_device_to_group(group_name) + p_groups.set_device_ip(host) + p_groups.click_submit_button_for_add_device() + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.select_group_inventory_type() + p_inventory.set_host_or_group_name(group_name) + p_inventory.select_profiles([profile_name]) + p_inventory.set_community_string(community) + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is True + + # apply changes + p_header.apply_changes() + time_to_upgrade = p_header.get_time_to_upgrade() + p_header.close_configuration_applied_notification_popup() + time.sleep(time_to_upgrade + 60) # wait for upgrade + walk time + polling + + # check data in Splunk + # check walk scheduled + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';walk"' + ) + events = check_events_from_splunk( + start_time="-2m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 1 + + # check profiles polling + search_query = ( + "index=" + + config.LOGS_INDEX + + ' "Sending due task sc4snmp;' + + host + + ';12;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # checking smart profiles not working + search_query = ( + "index=" + + config.LOGS_INDEX + + ' "Sending due task sc4snmp;' + + host + + ';600;poll"' + ) + events = check_events_from_splunk( + start_time="-2m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 0 + + # checking polling for mandatory profile - 1200 - this should be visible even when smart profiles are disabled + search_query = ( + "index=" + + config.LOGS_INDEX + + ' "Sending due task sc4snmp;' + + host + + ';1200;poll"' + ) + events = check_events_from_splunk( + start_time="-2m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 1 + + # check events received + search_query = "index=" + config.EVENT_INDEX + " *" + events = check_events_from_splunk( + start_time="-2m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # delete + p_inventory.delete_entry_from_list(group_name) + is_on_list = p_inventory.check_if_entry_is_on_list(group_name) + assert is_on_list is False + # clear inventory + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # clear groups + p_header.switch_to_groups() + p_groups.delete_group_from_list(group_name) + + # clear profiles + p_header.switch_to_profiles() + p_profiles.delete_profile_from_list(profile_name) + + +@pytest.mark.extended +def test_setting_host_in_inventory(setup): + """ + Configure device, enable smart profiles and two standard profiles, and one base profile + check smart profiles are working + check standard profiles are working + remove one profile freq: 10s + check profile is not working anymore + check second profile is still working + """ + + host = setup["device_simulator"] + community = "public" + new_community = "test1234" + profile_1_name = "standard_profile_10s" + profile_1_freq = 10 + profile_2_name = "standard_profile_7s" + profile_2_freq = 7 + base_profile_name = "base" + base_profile_freq = 5 + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_1_name) + p_profiles.set_frequency(profile_1_freq) + p_profiles.add_varBind("IF-MIB", "ifDescr") + p_profiles.click_submit_button() + + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(profile_2_name) + p_profiles.set_frequency(profile_2_freq) + p_profiles.add_varBind("SNMPv2-MIB", "sysName") + p_profiles.click_submit_button() + + p_header.switch_to_profiles() + p_profiles.click_add_profile_button() + p_profiles.set_profile_name(base_profile_name) + p_profiles.select_profile_type("base") + p_profiles.set_frequency(base_profile_freq) + p_profiles.add_varBind("IF-MIB", "ifDescr") + p_profiles.click_submit_button() + + p_header.switch_to_inventory() + p_inventory.click_add_new_device_group_button() + p_inventory.set_host_or_group_name(host) + p_inventory.select_profiles([profile_1_name, profile_2_name]) + p_inventory.set_community_string(community) + p_inventory.set_smart_profiles("true") + p_inventory.click_submit_button_for_add_entry() + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is True + + # apply changes + p_header.apply_changes() + time_to_upgrade = p_header.get_time_to_upgrade() + p_header.close_configuration_applied_notification_popup() + time.sleep(time_to_upgrade + 30) # wait for upgrade + walk time + polling + + # check data in Splunk + # check walk scheduled + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';walk"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 1 + + # check profiles polling + time.sleep(60) # wait to be sure that profile are being polled + search_query = ( + "index=" + + config.LOGS_INDEX + + ' "Sending due task sc4snmp;' + + host + + ';10;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';7;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # checking smart/base profiles + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';5;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # check events received + search_query = "index=" + config.EVENT_INDEX + " *" + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # remove profiles + p_inventory.click_edit_inventory_entry(host) + p_inventory.select_profiles([profile_2_name], True) + p_inventory.set_smart_profiles("false") + # p_inventory.set_community_string(new_community, True) + p_inventory.click_submit_button_for_add_entry() + # apply changes + p_header.apply_changes() + time_to_upgrade = p_header.get_time_to_upgrade() + p_header.close_configuration_applied_notification_popup() + time.sleep(time_to_upgrade + 90) # wait for upgrade + walk time + polling + + # check walk scheduled + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';walk"' + ) + events = check_events_from_splunk( + start_time="-2m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 1 + + # check profiles polling + time.sleep(60) # wait to be sure that disabled profile is no more polled + search_query = ( + "index=" + + config.LOGS_INDEX + + ' "Sending due task sc4snmp;' + + host + + ';10;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 0 + + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';7;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # checking smart/base profiles - no polling + search_query = ( + "index=" + config.LOGS_INDEX + ' "Sending due task sc4snmp;' + host + ';5;poll"' + ) + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) == 0 + + # check events received + search_query = "index=" + config.EVENT_INDEX + " *" + events = check_events_from_splunk( + start_time="-1m@m", + url=setup["splunkd_url"], + user=setup["splunk_user"], + query=["search {}".format(search_query)], + password=setup["splunk_password"], + ) + logger.info("Splunk received %s events in the last minute", len(events)) + assert len(events) > 1 + + # delete + p_inventory.delete_entry_from_list(host) + is_on_list = p_inventory.check_if_entry_is_on_list(host) + assert is_on_list is False + # clear inventory + p_header.apply_changes() + p_header.close_configuration_applied_notification_popup() + + # clear profiles + p_header.switch_to_profiles() + p_profiles.delete_profile_from_list(profile_1_name) + p_profiles.delete_profile_from_list(profile_2_name) + p_profiles.delete_profile_from_list(base_profile_name) diff --git a/ui_tests/webdriver/webriver_factory.py b/ui_tests/webdriver/webriver_factory.py new file mode 100644 index 000000000..638c7be69 --- /dev/null +++ b/ui_tests/webdriver/webriver_factory.py @@ -0,0 +1,52 @@ +import time + +import config.config as config +import pytest +from logger.logger import Logger +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service as ChromeService +from webdriver_manager.chrome import ChromeDriverManager + +logger = Logger().get_logger() + + +class WebDriverFactory: + _driver = None + + @classmethod + def get_driver(cls): + if cls._driver is None: + logger.info(f"Execution type: {config.EXECUTION_TYPE}") + logger.info(f"UI URL: {config.UI_URL}") + chrome_options = Options() + if config.EXECUTION_TYPE != "local": + logger.info(f"Enable headless execution") + chrome_options.add_argument("--headless") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_argument("--window-size=1920x1080") + # web_driver = webdriver.Chrome(options=chrome_options) + + cls._driver = webdriver.Chrome( + service=ChromeService(ChromeDriverManager().install()), + options=chrome_options, + ) + + cls._driver.maximize_window() + cls._driver.implicitly_wait(config.IMPLICIT_WAIT_TIMER) + cls._driver.get(config.UI_URL) + return cls._driver + + @classmethod + def close_driver(cls): + if cls._driver is not None: + logger.info("Killing webdriver and closing browser") + cls._driver.quit() + cls._driver = None + else: + logger.warn("Unable to kill driver it does not exist") + + @classmethod + def restart_driver(cls): + cls.close_driver() + return cls.get_driver()