From 38dbb80d6ef31231f68c7e06821baa817422e159 Mon Sep 17 00:00:00 2001 From: Ionut Balutoiu Date: Thu, 19 Oct 2023 13:16:04 +0300 Subject: [PATCH] Add group policy configuration Allow configuration of a zone group default sync policy. This is useful in scenarios where we want to have selective buckets sync. Valuable especially with the new `cloud-sync` relation. This is based on Ceph multisite sync policy: https://docs.ceph.com/en/latest/radosgw/multisite-sync-policy/ Additionally, three more Juju actions are added to selectively enable, disable, or reset buckets sync: * `enable-buckets-sync` * `disable-buckets-sync` * `reset-buckets-sync` These new actions are meant to be used in conjunction with a default zone group sync policy that allows syncing, but it's disabled by default. Change-Id: I4a8076192269aaeaca50668ebcebc0a52c6d2c84 func-test-pr: https://github.com/openstack-charmers/zaza-openstack-tests/pull/1269 Signed-off-by: Ionut Balutoiu (cherry picked from commit 4e608c14859c3d151254fcb8209dee59917ad7bc) --- README.md | 3 + actions.yaml | 26 ++ actions/actions.py | 178 ++++++++ actions/disable-buckets-sync | 1 + actions/enable-buckets-sync | 1 + actions/reset-buckets-sync | 1 + config.yaml | 28 ++ hooks/hooks.py | 91 ++++ hooks/multisite.py | 401 ++++++++++++++++++ hooks/primary-relation-changed | 1 + unit_tests/test_actions.py | 175 ++++++++ unit_tests/test_ceph_radosgw_context.py | 1 + unit_tests/test_ceph_radosgw_utils.py | 6 +- unit_tests/test_hooks.py | 105 +++++ unit_tests/test_multisite.py | 230 ++++++++++ .../testdata/test_create_sync_group_flow.json | 20 + .../testdata/test_create_sync_group_pipe.json | 49 +++ unit_tests/testdata/test_get_sync_group.json | 45 ++ .../testdata/test_list_sync_groups.json | 45 ++ 19 files changed, 1404 insertions(+), 3 deletions(-) create mode 120000 actions/disable-buckets-sync create mode 120000 actions/enable-buckets-sync create mode 120000 actions/reset-buckets-sync create mode 120000 hooks/primary-relation-changed create mode 100644 unit_tests/testdata/test_create_sync_group_flow.json create mode 100644 unit_tests/testdata/test_create_sync_group_pipe.json create mode 100644 unit_tests/testdata/test_get_sync_group.json create mode 100644 unit_tests/testdata/test_list_sync_groups.json diff --git a/README.md b/README.md index 335ae407..b6dd248c 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,9 @@ not deployed then see file `actions.yaml`. * `readwrite` * `resume` * `tidydefaults` +* `enable-buckets-sync` +* `disable-buckets-sync` +* `reset-buckets-sync` # Documentation diff --git a/actions.yaml b/actions.yaml index abe46ad2..5b24635a 100644 --- a/actions.yaml +++ b/actions.yaml @@ -19,3 +19,29 @@ force-enable-multisite: zonegroup: type: string description: Existing Zonegroup to be reconfigured as the 'zonegroup' config value. +enable-buckets-sync: + description: | + Enable buckets sync in the multi-site replication. This is meant to be + used only when the default zonegroup sync policy is not "enabled", but it is + "allowed". + params: + buckets: + type: string + description: Comma-separated list of buckets' names to enable syncing. +disable-buckets-sync: + description: | + Forbid buckets sync in the multi-site replication. This is useful when you + want to disable syncing for some buckets, but you want to sync all the + other buckets. + params: + buckets: + type: string + description: Comma-separated list of buckets' names to disable syncing. +reset-buckets-sync: + description: | + Reset buckets sync policy. After this is executed, the buckets will be + synced according to the default zone group sync policy. + params: + buckets: + type: string + description: Comma-separated list of buckets' names to reset sync policy. diff --git a/actions/actions.py b/actions/actions.py index db0aa548..bced130b 100755 --- a/actions/actions.py +++ b/actions/actions.py @@ -49,6 +49,8 @@ service_restart, ) +DEFAULT_SYNC_POLICY_ID = 'default' + def pause(args): """Pause the Ceilometer services. @@ -227,6 +229,179 @@ def force_enable_multisite(args): action_fail(message + " : {}".format(cpe.output)) +def is_multisite_sync_policy_action_allowed(): + """Check if the current Juju unit is allowed to run sync policy actions. + + This method checks if the current Juju unit is allowed to execute + the Juju actions to configure Multisite sync policies: + * enable-buckets-sync + * disable-buckets-sync + * reset-buckets-sync + These Juju actions are allowed to run only on the leader unit of the + primary RGW zone. + + :return: Whether the current Juju unit is allowed to run the Multisite + sync policy Juju actions. + :rtype: Boolean + """ + if not is_leader(): + action_fail("This action can only be executed on leader unit.") + return False + + realm = config('realm') + zone = config('zone') + zonegroup = config('zonegroup') + + if not all((realm, zonegroup, zone)): + action_fail("Missing required charm configurations realm({}), " + "zonegroup({}) and zone({}).".format( + realm, zonegroup, zone + )) + return False + + if not multisite.is_multisite_configured(zone=zone, zonegroup=zonegroup): + action_fail("Multisite is not configured") + return False + + zonegroup_info = multisite.get_zonegroup_info(zonegroup) + if zonegroup_info is None: + action_fail("Failed to fetch zonegroup ({}) info".format(zonegroup)) + return False + + zone_info = multisite.get_zone_info(zone) + if zone_info is None: + action_fail("Failed to fetch zone ({}) info".format(zone)) + return False + + if zonegroup_info['master_zone'] != zone_info['id']: + action_fail('This action can only be executed on primary RGW ' + 'application units.') + return False + + return True + + +def update_buckets_sync_policy(buckets, sync_policy_state): + """Update the sync policy state for all the given buckets. + + This method gets a list of bucket names and a sync policy state to set + for all of them. The sync policy state can be one of the following: + "allowed", "enabled", or "forbidden". Validation for the sync policy + state is done in the "multisite.create_sync_group" module method. + + The sync policy state is set by creating a bucket-level sync group with + the given state, followed by a sync group pipe that match all the source + and destination buckets. If the bucket already has a sync group, it is + updated with the new state. + + :param buckets: List of bucket names. + :type buckets: list + :param sync_policy_state: The sync policy state to set for the buckets. + :type sync_policy_state: str + """ + zone = config('zone') + zonegroup = config('zonegroup') + existing_buckets = multisite.list_buckets(zonegroup=zonegroup, zone=zone) + messages = [] + for bucket in buckets: + if bucket in existing_buckets: + multisite.create_sync_group( + bucket=bucket, + group_id=DEFAULT_SYNC_POLICY_ID, + status=sync_policy_state) + multisite.create_sync_group_pipe( + bucket=bucket, + group_id=DEFAULT_SYNC_POLICY_ID, + pipe_id=DEFAULT_SYNC_POLICY_ID, + source_zones=['*'], + dest_zones=['*']) + message = 'Updated "{}" bucket sync policy to "{}"'.format( + bucket, sync_policy_state) + else: + message = ('Bucket "{}" does not exist in the zonegroup "{}" and ' + 'zone "{}"'.format(bucket, zonegroup, zone)) + log(message) + messages.append(message) + action_set( + values={ + 'message': '\n'.join(messages) + } + ) + + +def reset_buckets_sync_policy(buckets): + """Reset the sync policy state for all the given buckets. + + For every bucket in the given list, this method resets the sync policy + state. This is done by removing the bucket-level sync group. + + :param buckets: List of bucket names. + :type buckets: list + """ + zone = config('zone') + zonegroup = config('zonegroup') + existing_buckets = multisite.list_buckets(zonegroup=zonegroup, zone=zone) + messages = [] + for bucket in buckets: + if bucket in existing_buckets: + multisite.remove_sync_group( + bucket=bucket, + group_id=DEFAULT_SYNC_POLICY_ID) + message = 'Reset "{}" bucket sync policy'.format(bucket) + else: + message = ('Bucket "{}" does not exist in the zonegroup "{}" and ' + 'zone "{}"'.format(bucket, zonegroup, zone)) + log(message) + messages.append(message) + action_set( + values={ + 'message': '\n'.join(messages) + } + ) + + +def enable_buckets_sync(args): + """Enable sync for the given buckets""" + if not is_multisite_sync_policy_action_allowed(): + return + try: + update_buckets_sync_policy( + buckets=action_get('buckets').split(','), + sync_policy_state=multisite.SYNC_POLICY_ENABLED, + ) + except subprocess.CalledProcessError as cpe: + message = "Failed to enable sync for the given buckets" + log(message, level=ERROR) + action_fail(message + " : {}".format(cpe.output)) + + +def disable_buckets_sync(args): + """Disable sync for the given buckets""" + if not is_multisite_sync_policy_action_allowed(): + return + try: + update_buckets_sync_policy( + buckets=action_get('buckets').split(','), + sync_policy_state=multisite.SYNC_POLICY_FORBIDDEN, + ) + except subprocess.CalledProcessError as cpe: + message = "Failed to disable sync for the given buckets" + log(message, level=ERROR) + action_fail(message + " : {}".format(cpe.output)) + + +def reset_buckets_sync(args): + """Reset sync policy for the given buckets""" + if not is_multisite_sync_policy_action_allowed(): + return + try: + reset_buckets_sync_policy(buckets=action_get('buckets').split(',')) + except subprocess.CalledProcessError as cpe: + message = "Failed to reset sync for the given buckets" + log(message, level=ERROR) + action_fail(message + " : {}".format(cpe.output)) + + # A dictionary of all the defined actions to callables (which take # parsed arguments). ACTIONS = { @@ -237,6 +412,9 @@ def force_enable_multisite(args): "readwrite": readwrite, "tidydefaults": tidydefaults, "force-enable-multisite": force_enable_multisite, + "enable-buckets-sync": enable_buckets_sync, + "disable-buckets-sync": disable_buckets_sync, + "reset-buckets-sync": reset_buckets_sync, } diff --git a/actions/disable-buckets-sync b/actions/disable-buckets-sync new file mode 120000 index 00000000..405a394e --- /dev/null +++ b/actions/disable-buckets-sync @@ -0,0 +1 @@ +actions.py \ No newline at end of file diff --git a/actions/enable-buckets-sync b/actions/enable-buckets-sync new file mode 120000 index 00000000..405a394e --- /dev/null +++ b/actions/enable-buckets-sync @@ -0,0 +1 @@ +actions.py \ No newline at end of file diff --git a/actions/reset-buckets-sync b/actions/reset-buckets-sync new file mode 120000 index 00000000..405a394e --- /dev/null +++ b/actions/reset-buckets-sync @@ -0,0 +1 @@ +actions.py \ No newline at end of file diff --git a/config.yaml b/config.yaml index a64a8f18..c931a840 100644 --- a/config.yaml +++ b/config.yaml @@ -429,6 +429,34 @@ options: description: | Name of RADOS Gateway Zone to create for multi-site replication. This option must be specific to the local site e.g. us-west or us-east. + sync-policy-state: + type: string + default: enabled + description: | + This setting is used by the primary ceph-radosgw in multi-site + replication. + + By default, all the buckets are synced from a primary RGW zone to the + secondary zone. This config option allows us to have selective buckets + sync. If this is set, it will be used as the default policy state for + all the buckets in the zonegroup. + + Valid values are: + * enabled - sync is allowed and enabled + * allowed - sync is allowed + * forbidden - sync is not allowed + sync-policy-flow-type: + type: string + default: symmetrical + description: | + This setting is used by the secondary ceph-radosgw in multi-site + replication, and it's effective only when 'sync-policy-state' config is + set on the primary ceph-radosgw. + + Valid values are: + * directional - data is only synced in one direction, from primary to + secondary. + * symmetrical - data is synced in both directions. namespace-tenants: type: boolean default: False diff --git a/hooks/hooks.py b/hooks/hooks.py index d362a9bb..5d54a4f4 100755 --- a/hooks/hooks.py +++ b/hooks/hooks.py @@ -43,6 +43,7 @@ relation_set, log, DEBUG, + WARNING, Hooks, UnregisteredHookError, status_set, is_leader, @@ -134,6 +135,7 @@ ] MULTISITE_SYSTEM_USER = 'multisite-sync' +MULTISITE_DEFAULT_SYNC_GROUP_ID = 'default' def upgrade_available(): @@ -845,6 +847,86 @@ def primary_relation_joined(relation_id=None): secret=secret) +@hooks.hook('primary-relation-changed') +def primary_relation_changed(relation_id=None, unit=None): + if not is_leader(): + log('Cannot setup multisite configuration, this unit is not the ' + 'leader') + return + if not ready_for_service(legacy=False): + log('unit not ready, deferring multisite configuration') + return + + sync_policy_state = config('sync-policy-state') + if not sync_policy_state: + log("The config sync-policy-state is not set. Skipping zone group " + "default sync policy configuration") + return + + secondary_data = relation_get(rid=relation_id, unit=unit) + if not all((secondary_data.get('zone'), + secondary_data.get('sync_policy_flow_type'))): + log("Defer processing until secondary RGW has provided required data") + return + + zonegroup = config('zonegroup') + primary_zone = config('zone') + secondary_zone = secondary_data['zone'] + sync_flow_type = secondary_data['sync_policy_flow_type'] + + if (secondary_data.get('zone_tier_type') == 'cloud' and + sync_flow_type != multisite.SYNC_FLOW_DIRECTIONAL): + log("The secondary zone is set with cloud tier type. Ignoring " + "configured {} sync policy flow, and using {}.".format( + sync_flow_type, + multisite.SYNC_FLOW_DIRECTIONAL), + level=WARNING) + sync_flow_type = multisite.SYNC_FLOW_DIRECTIONAL + + flow_id = '{}-{}'.format(primary_zone, secondary_zone) + pipe_id = '{}-{}'.format(primary_zone, secondary_zone) + + mutation = multisite.is_sync_group_update_needed( + group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID, + flow_id=flow_id, + pipe_id=pipe_id, + source_zone=primary_zone, + dest_zone=secondary_zone, + desired_status=sync_policy_state, + desired_flow_type=sync_flow_type, + ) + + if mutation: + multisite.create_sync_group( + group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID, + status=sync_policy_state) + multisite.create_sync_group_flow( + group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID, + flow_id=flow_id, + flow_type=sync_flow_type, + source_zone=primary_zone, + dest_zone=secondary_zone) + source_zones = [primary_zone, secondary_zone] + dest_zones = [primary_zone, secondary_zone] + if sync_flow_type == multisite.SYNC_FLOW_DIRECTIONAL: + source_zones = [primary_zone] + dest_zones = [secondary_zone] + multisite.create_sync_group_pipe( + group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID, + pipe_id=pipe_id, + source_zones=source_zones, + dest_zones=dest_zones) + log( + 'Mutation detected. Restarting {}.'.format(service_name()), + 'INFO') + multisite.update_period(zonegroup=zonegroup, zone=primary_zone) + CONFIGS.write_all() + service_restart(service_name()) + leader_set(restart_nonce=str(uuid.uuid4())) + else: + log('No mutation detected.', 'INFO') + + @hooks.hook('primary-relation-departed') @hooks.hook('secondary-relation-departed') def multisite_relation_departed(): @@ -935,6 +1017,9 @@ def secondary_relation_changed(relation_id=None, unit=None): # this operation but a period update will force it to be created. multisite.update_period(fatal=False) + relation_set(relation_id=relation_id, + sync_policy_flow_type=config('sync-policy-flow-type')) + mutation = False # NOTE(utkarshbhatthere): @@ -979,6 +1064,8 @@ def secondary_relation_changed(relation_id=None, unit=None): else: log('No mutation detected.', 'INFO') + relation_set(relation_id=relation_id, zone=zone) + @hooks.hook('master-relation-departed') @hooks.hook('slave-relation-departed') @@ -1016,6 +1103,8 @@ def leader_settings_changed(): # Primary/Secondary relation for r_id in relation_ids('primary'): primary_relation_joined(r_id) + for unit in related_units(r_id): + primary_relation_changed(r_id, unit) for r_id in relation_ids('radosgw-user'): radosgw_user_changed(r_id) @@ -1031,6 +1120,8 @@ def process_multisite_relations(): # Primary/Secondary relation for r_id in relation_ids('primary'): primary_relation_joined(r_id) + for unit in related_units(r_id): + primary_relation_changed(r_id, unit) for r_id in relation_ids('secondary'): for unit in related_units(r_id): secondary_relation_changed(r_id, unit) diff --git a/hooks/multisite.py b/hooks/multisite.py index 18a33410..57f8878f 100644 --- a/hooks/multisite.py +++ b/hooks/multisite.py @@ -24,6 +24,31 @@ RGW_ADMIN = 'radosgw-admin' +SYNC_POLICY_ENABLED = 'enabled' +SYNC_POLICY_ALLOWED = 'allowed' +SYNC_POLICY_FORBIDDEN = 'forbidden' +SYNC_POLICY_STATES = [ + SYNC_POLICY_ENABLED, + SYNC_POLICY_ALLOWED, + SYNC_POLICY_FORBIDDEN +] +SYNC_FLOW_DIRECTIONAL = 'directional' +SYNC_FLOW_SYMMETRICAL = 'symmetrical' +SYNC_FLOW_TYPES = [ + SYNC_FLOW_DIRECTIONAL, + SYNC_FLOW_SYMMETRICAL, +] + + +class UnknownSyncPolicyState(Exception): + """Raised when an unknown sync policy state is encountered""" + pass + + +class UnknownSyncFlowType(Exception): + """Raised when an unknown sync flow type is encountered""" + pass + @decorators.retry_on_exception(num_retries=10, base_delay=5, exc_type=subprocess.CalledProcessError) @@ -370,6 +395,28 @@ def modify_zone(name, endpoints=None, default=False, master=False, return None +def get_zone_info(name, zonegroup=None): + """Fetch detailed info for the provided zone + + :param name: zone name + :type name: str + :param zonegroup: parent zonegroup name + :type zonegroup: str + :rtype: dict + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'zone', 'get', + '--rgw-zone={}'.format(name), + ] + if zonegroup: + cmd.append('--rgw-zonegroup={}'.format(zonegroup)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + def remove_zone_from_zonegroup(zone, zonegroup): """Remove RADOS Gateway zone from provided parent zonegroup @@ -888,3 +935,357 @@ def check_cluster_has_buckets(): if check_zonegroup_has_buckets(zonegroup): return True return False + + +def list_sync_groups(bucket=None): + """List sync policy groups. + + :param bucket: Bucket name. If this this given, the bucket level group + policies are listed. + :type bucket: str + + :return: List of sync policy groups. + :rtype: list + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'get', + ] + if bucket: + cmd.append('--bucket={}'.format(bucket)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return [] + + +def sync_group_exists(group_id, bucket=None): + """Check if the sync policy group exists. + + :param group_id: Sync policy group id. + :type group_id: str + :param bucket: Bucket name. If this this given, the bucket level group + policy is checked. + :type bucket: str + + :rtype: Boolean + """ + for group in list_sync_groups(bucket=bucket): + if group['key'] == group_id: + return True + return False + + +def get_sync_group(group_id, bucket=None): + """Get the sync policy group configuration. + + :param group_id: Sync policy group id. + :type group_id: str + :param bucket: Bucket name. If this this given, the bucket level group + policy is returned. + :type bucket: str + + :return: Sync policy group configuration. + :rtype: dict + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'get', + '--group-id={}'.format(group_id), + ] + if bucket: + cmd.append('--bucket={}'.format(bucket)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + +def create_sync_group(group_id, status, bucket=None): + """Create a sync policy group. + + :param group_id: ID of the sync policy group to be created. + :type group_id: str + :param status: Status of the sync policy group to be created. Must be one + of the following: 'enabled', 'allowed', 'forbidden'. + :type status: str + :param bucket: Bucket name. If this this given, the bucket level group + policy is created. + :type bucket: str + + :raises UnknownSyncPolicyState: if the provided status is not one of the + allowed values. + + :return: Sync policy group configuration. + :rtype: dict + """ + if status not in SYNC_POLICY_STATES: + raise UnknownSyncPolicyState( + 'Unknown sync policy state: {}'.format(status)) + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'create', + '--group-id={}'.format(group_id), + '--status={}'.format(status), + ] + if bucket: + cmd.append('--bucket={}'.format(bucket)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + +def remove_sync_group(group_id, bucket=None): + """Remove a sync group with the given group ID and optional bucket. + + :param group_id: The ID of the sync group to remove. + :type group_id: str + :param bucket: Bucket name. If this this given, the bucket level group + policy is removed. + :type bucket: str + + :return: The output of the command as a dict. + :rtype: dict + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'remove', + '--group-id={}'.format(group_id), + ] + if bucket: + cmd.append('--bucket={}'.format(bucket)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + +def is_sync_group_update_needed(group_id, flow_id, pipe_id, source_zone, + dest_zone, desired_status, desired_flow_type): + """Check if the sync group (with the given ID) needs updating. + + :param group_id: The ID of the sync group to check. + :type group_id: str + :param flow_id: The ID of the sync group flow to check. + :type flow_id: str + :param pipe_id: The ID of the sync group pipe to check. + :type pipe_id: str + :param source_zone: Source zone of the sync group flow to check. + :type source_zone: str + :param dest_zone: Dest zone of the sync group flow to check. + :type dest_zone: str + :param desired_status: Desired status of the sync group. + :type desired_status: str + :param desired_flow_type: Desired flow type of the sync group data flow. + :type desired_flow_type: str + + :rtype: Boolean + """ + # Check if sync group exists. + if not sync_group_exists(group_id): + hookenv.log('Sync group "{}" not configured yet'.format(group_id)) + return True + group = get_sync_group(group_id) + + # Check sync group status. + if group.get('status') != desired_status: + hookenv.log('Sync group "{}" status changed to "{}"'.format( + group["id"], desired_status)) + return True + + # Check if data flow needs to be created or updated. + if is_sync_group_flow_update_needed(group=group, + flow_id=flow_id, + source_zone=source_zone, + dest_zone=dest_zone, + desired_flow_type=desired_flow_type): + return True + + # Check if data pipe needs to be created. + pipes = group.get('pipes', []) + pipes_ids = [pipe['id'] for pipe in pipes] + if pipe_id not in pipes_ids: + hookenv.log('Sync group pipe "{}" not created yet'.format(pipe_id)) + return True + + # Sync group configuration is up-to-date. + return False + + +def create_sync_group_flow(group_id, flow_id, flow_type, source_zone, + dest_zone): + """Create a new sync group data flow with the given parameters. + + :param group_id: The ID of the sync group to create the data flow for. + :type group_id: str + :param flow_id: The ID of the new data flow. + :type flow_id: str + :param flow_type: The type of the new data flow. + :type flow_type: str + :param source_zone: The source zone for the new data flow. + :type source_zone: str + :param dest_zone: The destination zone for the new data flow. + :type dest_zone: str + + :raises UnknownSyncFlowType: If an unknown sync flow type is provided. + + :return: Sync group data flow configuration. + :rtype: dict + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'flow', 'create', + '--group-id={}'.format(group_id), + '--flow-id={}'.format(flow_id), + '--flow-type={}'.format(flow_type), + ] + if flow_type == SYNC_FLOW_SYMMETRICAL: + cmd.append('--zones={},{}'.format(source_zone, dest_zone)) + elif flow_type == SYNC_FLOW_DIRECTIONAL: + cmd.append('--source-zone={}'.format(source_zone)) + cmd.append('--dest-zone={}'.format(dest_zone)) + else: + raise UnknownSyncFlowType( + 'Unknown sync flow type {}'.format(flow_type)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + +def remove_sync_group_flow(group_id, flow_id, flow_type, source_zone=None, + dest_zone=None): + """Remove a sync group data flow. + + :param group_id: The ID of the sync group. + :type group_id: str + :param flow_id: The ID of the flow to remove. + :type flow_id: str + :param flow_type: The type of the flow to remove. + :type flow_type: str + :param source_zone: The source zone of the flow to remove (only for + directional flows). + :type source_zone: str + :param dest_zone: The destination zone of the flow to remove (only for + directional flows). + :type dest_zone: str + + :return: The output of the command as a dict. + :rtype: dict + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'flow', 'remove', + '--group-id={}'.format(group_id), + '--flow-id={}'.format(flow_id), + '--flow-type={}'.format(flow_type), + ] + if flow_type == SYNC_FLOW_DIRECTIONAL: + cmd.append('--source-zone={}'.format(source_zone)) + cmd.append('--dest-zone={}'.format(dest_zone)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + +def create_sync_group_pipe(group_id, pipe_id, source_zones, dest_zones, + source_bucket='*', dest_bucket='*', bucket=None): + """Create a sync group pipe between source and destination zones. + + :param group_id: The ID of the sync group. + :type group_id: str + :param pipe_id: The ID of the sync group pipe. + :type pipe_id: str + :param source_zones: A list of source zones. + :type source_zones: list + :param dest_zones: A list of destination zones. + :type dest_zones: list + :param source_bucket: The source bucket name. Default is '*'. + :type source_bucket: str + :param dest_bucket: The destination bucket name. Default is '*'. + :type dest_bucket: str + :param bucket: The bucket name. If specified, the sync group pipe will be + created for this bucket only. + :type bucket: str + + :return: Sync group pipe configuration. + :rtype: dict + """ + cmd = [ + RGW_ADMIN, '--id={}'.format(_key_name()), + 'sync', 'group', 'pipe', 'create', + '--group-id={}'.format(group_id), + '--pipe-id={}'.format(pipe_id), + '--source-zones={}'.format(','.join(source_zones)), + '--source-bucket={}'.format(source_bucket), + '--dest-zones={}'.format(','.join(dest_zones)), + '--dest-bucket={}'.format(dest_bucket), + ] + if bucket: + cmd.append('--bucket={}'.format(bucket)) + try: + return json.loads(_check_output(cmd)) + except TypeError: + return None + + +def is_sync_group_flow_update_needed(group, flow_id, source_zone, dest_zone, + desired_flow_type): + """Check if the given sync group flow needs updating. + + :param group: The sync policy group configuration. + :type group: dict + :param flow_id: The ID of the sync group flow to check. + :type flow_id: str + :param source_zone: Source zone of the sync group flow to check. + :type source_zone: str + :param dest_zone: Dest zone of the sync group flow to check. + :type dest_zone: str + :param desired_flow_type: Desired flow type of the sync group data flow. + :type desired_flow_type: str + + :rtype: Boolean + """ + symmetrical_flows = group['data_flow'].get('symmetrical', []) + symmetrical_flows_ids = [flow['id'] for flow in symmetrical_flows] + + directional_flows = group['data_flow'].get('directional', []) + directional_flows_ids = [ + # NOTE: Directional flows IDs are not present in the sync group + # configuration. We assume that the ID is a concatenation of the source + # zone and destination zone, as currently configured by the charm code. + # This is a safe assumption, because there are unique directional + # flows for each pair of zones. + "{}-{}".format(flow['source_zone'], flow['dest_zone']) + for flow in directional_flows + ] + + data_flows_ids = symmetrical_flows_ids + directional_flows_ids + if flow_id not in data_flows_ids: + hookenv.log('Data flow "{}" not configured yet'.format(flow_id)) + return True + + # Check if the flow type is consistent with the current configuration. + is_symmetrical = (desired_flow_type == SYNC_FLOW_SYMMETRICAL and + flow_id in symmetrical_flows_ids) + is_directional = (desired_flow_type == SYNC_FLOW_DIRECTIONAL and + flow_id in directional_flows_ids) + if is_symmetrical or is_directional: + # Data flow is consistent with the current configuration. + return False + + # Data flow type has changed. We need to remove the old data flow. + hookenv.log('Data flow "{}" type changed to "{}"'.format( + flow_id, desired_flow_type)) + old_flow_type = ( + SYNC_FLOW_SYMMETRICAL if desired_flow_type == SYNC_FLOW_DIRECTIONAL + else SYNC_FLOW_DIRECTIONAL) + hookenv.log( + 'Removing old data flow "{}" before configuring the new one'.format( + flow_id)) + remove_sync_group_flow( + group_id=group["id"], flow_id=flow_id, flow_type=old_flow_type, + source_zone=source_zone, dest_zone=dest_zone) + return True diff --git a/hooks/primary-relation-changed b/hooks/primary-relation-changed new file mode 120000 index 00000000..9416ca6a --- /dev/null +++ b/hooks/primary-relation-changed @@ -0,0 +1 @@ +hooks.py \ No newline at end of file diff --git a/unit_tests/test_actions.py b/unit_tests/test_actions.py index 1978b68b..01d7407b 100644 --- a/unit_tests/test_actions.py +++ b/unit_tests/test_actions.py @@ -82,6 +82,7 @@ class MultisiteActionsTestCase(CharmTestCase): TO_PATCH = [ 'action_fail', + 'action_get', 'action_set', 'multisite', 'config', @@ -89,6 +90,7 @@ class MultisiteActionsTestCase(CharmTestCase): 'leader_set', 'service_name', 'service_restart', + 'log', ] def setUp(self): @@ -154,3 +156,176 @@ def test_tidydefaults_unconfigured(self): self.test_config.set('zone', None) actions.tidydefaults([]) self.action_fail.assert_called_once() + + def test_enable_buckets_sync(self): + self.multisite.is_multisite_configured.return_value = True + self.multisite.get_zonegroup_info.return_value = { + 'master_zone': 'test-zone-id', + } + self.multisite.get_zone_info.return_value = { + 'id': 'test-zone-id', + } + self.is_leader.return_value = True + self.action_get.return_value = 'testbucket1,testbucket2,non-existent' + self.test_config.set('zone', 'testzone') + self.test_config.set('zonegroup', 'testzonegroup') + self.test_config.set('realm', 'testrealm') + self.multisite.list_buckets.return_value = ['testbucket1', + 'testbucket2'] + + actions.enable_buckets_sync([]) + + self.multisite.is_multisite_configured.assert_called_once() + self.multisite.get_zonegroup_info.assert_called_once_with( + 'testzonegroup', + ) + self.multisite.get_zone_info.assert_called_once_with( + 'testzone', + ) + self.action_get.assert_called_once_with('buckets') + self.multisite.list_buckets.assert_called_once_with( + zonegroup='testzonegroup', zone='testzone', + ) + self.assertEqual(self.multisite.create_sync_group.call_count, 2) + self.multisite.create_sync_group.assert_has_calls([ + mock.call(bucket='testbucket1', + group_id='default', + status=self.multisite.SYNC_POLICY_ENABLED), + mock.call(bucket='testbucket2', + group_id='default', + status=self.multisite.SYNC_POLICY_ENABLED), + ]) + self.assertEqual(self.multisite.create_sync_group_pipe.call_count, 2) + self.multisite.create_sync_group_pipe.assert_has_calls([ + mock.call(bucket='testbucket1', + group_id='default', + pipe_id='default', + source_zones=['*'], + dest_zones=['*']), + mock.call(bucket='testbucket2', + group_id='default', + pipe_id='default', + source_zones=['*'], + dest_zones=['*']), + ]) + expected_messages = [ + 'Updated "testbucket1" bucket sync policy to "{}"'.format( + self.multisite.SYNC_POLICY_ENABLED), + 'Updated "testbucket2" bucket sync policy to "{}"'.format( + self.multisite.SYNC_POLICY_ENABLED), + ('Bucket "non-existent" does not exist in the zonegroup ' + '"testzonegroup" and zone "testzone"'), + ] + self.assertEqual(self.log.call_count, 3) + self.log.assert_has_calls([ + mock.call(expected_messages[0]), + mock.call(expected_messages[1]), + mock.call(expected_messages[2]), + ]) + self.action_set.assert_called_once_with( + values={ + 'message': '\n'.join(expected_messages), + }) + + def test_disable_buckets_sync(self): + self.multisite.is_multisite_configured.return_value = True + self.multisite.get_zonegroup_info.return_value = { + 'master_zone': 'test-zone-id', + } + self.multisite.get_zone_info.return_value = { + 'id': 'test-zone-id', + } + self.is_leader.return_value = True + self.action_get.return_value = 'testbucket1,non-existent' + self.test_config.set('zone', 'testzone') + self.test_config.set('zonegroup', 'testzonegroup') + self.test_config.set('realm', 'testrealm') + self.multisite.list_buckets.return_value = ['testbucket1'] + + actions.disable_buckets_sync([]) + + self.multisite.is_multisite_configured.assert_called_once() + self.multisite.get_zonegroup_info.assert_called_once_with( + 'testzonegroup', + ) + self.multisite.get_zone_info.assert_called_once_with( + 'testzone', + ) + self.action_get.assert_called_once_with('buckets') + self.multisite.list_buckets.assert_called_once_with( + zonegroup='testzonegroup', zone='testzone', + ) + self.multisite.create_sync_group.assert_called_once_with( + bucket='testbucket1', + group_id='default', + status=self.multisite.SYNC_POLICY_FORBIDDEN, + ) + self.multisite.create_sync_group_pipe.assert_called_once_with( + bucket='testbucket1', + group_id='default', + pipe_id='default', + source_zones=['*'], + dest_zones=['*'], + ) + expected_messages = [ + 'Updated "testbucket1" bucket sync policy to "{}"'.format( + self.multisite.SYNC_POLICY_FORBIDDEN), + ('Bucket "non-existent" does not exist in the zonegroup ' + '"testzonegroup" and zone "testzone"'), + ] + self.assertEqual(self.log.call_count, 2) + self.log.assert_has_calls([ + mock.call(expected_messages[0]), + mock.call(expected_messages[1]), + ]) + self.action_set.assert_called_once_with( + values={ + 'message': '\n'.join(expected_messages), + }) + + def test_reset_buckets_sync(self): + self.multisite.is_multisite_configured.return_value = True + self.multisite.get_zonegroup_info.return_value = { + 'master_zone': 'test-zone-id', + } + self.multisite.get_zone_info.return_value = { + 'id': 'test-zone-id', + } + self.is_leader.return_value = True + self.action_get.return_value = 'testbucket1,non-existent' + self.test_config.set('zone', 'testzone') + self.test_config.set('zonegroup', 'testzonegroup') + self.test_config.set('realm', 'testrealm') + self.multisite.list_buckets.return_value = ['testbucket1'] + + actions.reset_buckets_sync([]) + + self.multisite.is_multisite_configured.assert_called_once() + self.multisite.get_zonegroup_info.assert_called_once_with( + 'testzonegroup', + ) + self.multisite.get_zone_info.assert_called_once_with( + 'testzone', + ) + self.action_get.assert_called_once_with('buckets') + self.multisite.list_buckets.assert_called_once_with( + zonegroup='testzonegroup', zone='testzone', + ) + self.multisite.remove_sync_group.assert_called_once_with( + bucket='testbucket1', + group_id='default', + ) + expected_messages = [ + 'Reset "testbucket1" bucket sync policy', + ('Bucket "non-existent" does not exist in the zonegroup ' + '"testzonegroup" and zone "testzone"'), + ] + self.assertEqual(self.log.call_count, 2) + self.log.assert_has_calls([ + mock.call(expected_messages[0]), + mock.call(expected_messages[1]), + ]) + self.action_set.assert_called_once_with( + values={ + 'message': '\n'.join(expected_messages), + }) diff --git a/unit_tests/test_ceph_radosgw_context.py b/unit_tests/test_ceph_radosgw_context.py index f3b3289a..32d6a962 100644 --- a/unit_tests/test_ceph_radosgw_context.py +++ b/unit_tests/test_ceph_radosgw_context.py @@ -287,6 +287,7 @@ def _relation_get(attr, unit, rid): self.assertEqual(expect, mon_ctxt()) self.assertTrue(mock_ensure_rsv_v6.called) + @patch.object(context, 'format_ipv6_addr', lambda *_: None) @patch('ceph_radosgw_context.https') @patch('charmhelpers.contrib.hahelpers.cluster.relation_ids') @patch('charmhelpers.contrib.hahelpers.cluster.config_get') diff --git a/unit_tests/test_ceph_radosgw_utils.py b/unit_tests/test_ceph_radosgw_utils.py index 56315aef..ceaf761e 100644 --- a/unit_tests/test_ceph_radosgw_utils.py +++ b/unit_tests/test_ceph_radosgw_utils.py @@ -305,11 +305,11 @@ def test_multisite_deployment(self): def test_listen_port(self): self.https.return_value = False - self.assertEquals(80, utils.listen_port()) + self.assertEqual(80, utils.listen_port()) self.https.return_value = True - self.assertEquals(443, utils.listen_port()) + self.assertEqual(443, utils.listen_port()) self.test_config.set('port', 42) - self.assertEquals(42, utils.listen_port()) + self.assertEqual(42, utils.listen_port()) def test_set_s3_app(self): self.leader_get.return_value = None diff --git a/unit_tests/test_hooks.py b/unit_tests/test_hooks.py index bb399cb9..a12dd0ac 100644 --- a/unit_tests/test_hooks.py +++ b/unit_tests/test_hooks.py @@ -13,6 +13,7 @@ # limitations under the License. import base64 import json +import os from unittest.mock import ( patch, call, MagicMock, ANY ) @@ -338,6 +339,8 @@ def test_object_store_relation(self, _canonical_url): @patch.object(ceph_hooks, 'leader_get') @patch('charmhelpers.contrib.openstack.ip.service_name', lambda *args: 'ceph-radosgw') + @patch('charmhelpers.contrib.openstack.ip.resolve_address', + lambda *args: 'myserv') @patch('charmhelpers.contrib.openstack.ip.config') def test_identity_joined_early_version(self, _config, _leader_get): self.cmp_pkgrevno.return_value = -1 @@ -687,6 +690,7 @@ class MiscMultisiteTests(CharmTestCase): 'leader_get', 'is_leader', 'primary_relation_joined', + 'primary_relation_changed', 'secondary_relation_changed', 'service_restart', 'service_name', @@ -724,6 +728,12 @@ def test_leader_settings_changed(self): def test_process_multisite_relations(self): ceph_hooks.process_multisite_relations() self.primary_relation_joined.assert_called_once_with('primary:1') + self.assertEqual(self.primary_relation_changed.call_count, 2) + self.primary_relation_changed.assert_has_calls([ + call('primary:1', 'rgw/0'), + call('primary:1', 'rgw/1'), + ]) + self.assertEqual(self.secondary_relation_changed.call_count, 2) self.secondary_relation_changed.assert_has_calls([ call('secondary:1', 'rgw-s/0'), call('secondary:1', 'rgw-s/1'), @@ -889,6 +899,87 @@ def test_primary_relation_joined_not_leader(self): ) self.multisite.list_realms.assert_not_called() + def test_primary_relation_changed_sync_policy_state_unset(self): + self.is_leader.return_value = True + self.test_config.set('sync-policy-state', '') + + ceph_hooks.primary_relation_changed('primary:1') + + self.is_leader.assert_called_once() + self.ready_for_service.assert_called_once_with(legacy=False) + self.config.assert_called_once_with('sync-policy-state') + + def test_primary_relation_changed_sync_rel_data_incomplete(self): + self.is_leader.return_value = True + self.test_config.set('sync-policy-state', 'allowed') + self.relation_get.return_value = {'zone': 'secondary'} + + ceph_hooks.primary_relation_changed('primary:1', 'rgw/0') + + self.is_leader.assert_called_once() + self.ready_for_service.assert_called_once_with(legacy=False) + self.config.assert_called_once_with('sync-policy-state') + self.relation_get.assert_called_once_with(rid='primary:1', + unit='rgw/0') + + def test_primary_relation_changed(self): + self.is_leader.return_value = True + configs = { + 'sync-policy-state': 'allowed', + 'zonegroup': 'testzonegroup', + 'zone': 'zone_a', + } + for k, v in configs.items(): + self.test_config.set(k, v) + self.relation_get.return_value = { + 'zone': 'zone_b', + 'sync_policy_flow_type': 'symmetrical', + # this should force flow type to directional, and ignore the value + # from the relation data. + 'zone_tier_type': 'cloud', + } + self.multisite.is_sync_group_update_needed.return_value = True + group_test_data_file = os.path.join( + os.path.dirname(__file__), 'testdata', 'test_get_sync_group.json') + with open(group_test_data_file, 'r') as f: + self.multisite.get_sync_group.return_value = json.loads(f.read()) + + ceph_hooks.primary_relation_changed('primary:1', 'rgw/0') + + self.is_leader.assert_called_once() + self.ready_for_service.assert_called_once_with(legacy=False) + self.config.assert_has_calls([ + call('sync-policy-state'), + call('zonegroup'), + call('zone'), + ]) + self.relation_get.assert_called_once_with(rid='primary:1', + unit='rgw/0') + self.multisite.is_sync_group_update_needed.assert_called_once_with( + group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID, + flow_id='zone_a-zone_b', + pipe_id='zone_a-zone_b', + source_zone='zone_a', + dest_zone='zone_b', + desired_status='allowed', + desired_flow_type=self.multisite.SYNC_FLOW_DIRECTIONAL) + self.multisite.create_sync_group.assert_called_once_with( + group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID, + status='allowed') + self.multisite.create_sync_group_flow.assert_called_once_with( + group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID, + flow_id='zone_a-zone_b', + flow_type=self.multisite.SYNC_FLOW_DIRECTIONAL, + source_zone='zone_a', dest_zone='zone_b') + self.multisite.create_sync_group_pipe.assert_called_once_with( + group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID, + pipe_id='zone_a-zone_b', + source_zones=['zone_a'], dest_zones=['zone_b']) + self.multisite.update_period.assert_called_once_with( + zonegroup='testzonegroup', zone='zone_a') + self.service_restart.assert_called_once_with('rgw@hostname') + self.leader_set.assert_called_once_with(restart_nonce=ANY) + @patch.object(json, 'loads') def test_multisite_relation_departed(self, json_loads): for k, v in self._complete_config.items(): @@ -916,6 +1007,7 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests): 'realm': 'testrealm', 'zonegroup': 'testzonegroup', 'zone': 'testzone2', + 'sync-policy-flow-type': 'symmetrical', } _test_relation = { @@ -978,6 +1070,16 @@ def test_secondary_relation_changed(self): ]) self.service_restart.assert_called_once() self.leader_set.assert_called_once_with(restart_nonce=ANY) + self.relation_set.assert_has_calls([ + call( + relation_id='secondary:1', + sync_policy_flow_type='symmetrical', + ), + call( + relation_id='secondary:1', + zone='testzone2', + ), + ]) def test_secondary_relation_changed_incomplete_relation(self): for k, v in self._complete_config.items(): @@ -986,6 +1088,7 @@ def test_secondary_relation_changed_incomplete_relation(self): self.relation_get.return_value = {} ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0') self.config.assert_not_called() + self.relation_set.assert_not_called() def test_secondary_relation_changed_mismatching_config(self): for k, v in self._complete_config.items(): @@ -999,11 +1102,13 @@ def test_secondary_relation_changed_mismatching_config(self): call('zone'), ]) self.multisite.list_realms.assert_not_called() + self.relation_set.assert_not_called() def test_secondary_relation_changed_not_leader(self): self.is_leader.return_value = False ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0') self.relation_get.assert_not_called() + self.relation_set.assert_not_called() @patch.object(ceph_hooks, 'apt_install') @patch.object(ceph_hooks, 'services') diff --git a/unit_tests/test_multisite.py b/unit_tests/test_multisite.py index 403935fa..afc7756f 100644 --- a/unit_tests/test_multisite.py +++ b/unit_tests/test_multisite.py @@ -484,3 +484,233 @@ def test_check_zone_has_buckets(self, mock_list_zonegroups, multisite.check_cluster_has_buckets(), True ) + + def test_get_zone_info(self): + multisite.get_zone_info('test_zone', 'test_zonegroup') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'zone', 'get', + '--rgw-zone=test_zone', '--rgw-zonegroup=test_zonegroup', + ]) + + def test_sync_group_exists(self): + groups = [ + {'key': 'group1'}, + {'key': 'group2'}, + ] + self.subprocess.check_output.return_value = json.dumps(groups).encode() + self.assertTrue(multisite.sync_group_exists('group1')) + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'get', + ]) + + def test_bucket_sync_group_exists(self): + with open(self._testdata('test_list_sync_groups'), 'rb') as f: + self.subprocess.check_output.return_value = f.read() + self.assertTrue(multisite.sync_group_exists('default', + bucket='test')) + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'get', + '--bucket=test', + ]) + + def test_sync_group_does_not_exists(self): + with open(self._testdata('test_list_sync_groups'), 'rb') as f: + self.subprocess.check_output.return_value = f.read() + self.assertFalse(multisite.sync_group_exists('group-non-existent')) + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'get', + ]) + + def test_get_sync_group(self): + with open(self._testdata(whoami()), 'rb') as f: + self.subprocess.check_output.return_value = f.read() + result = multisite.get_sync_group('default') + self.assertEqual(result['id'], 'default') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'get', + '--group-id=default', + ]) + + def test_create_sync_group(self): + test_group_json = json.dumps({"id": "default"}).encode() + self.subprocess.check_output.return_value = test_group_json + result = multisite.create_sync_group( + group_id='default', + status=multisite.SYNC_POLICY_ENABLED, + ) + self.assertEqual(result['id'], 'default') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'create', + '--group-id=default', + '--status={}'.format(multisite.SYNC_POLICY_ENABLED), + ]) + + def test_create_sync_group_wrong_status(self): + self.assertRaises( + multisite.UnknownSyncPolicyState, + multisite.create_sync_group, "default", "wrong_status", + ) + + def test_remove_sync_group(self): + multisite.remove_sync_group('default') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'remove', + '--group-id=default', + ]) + + @mock.patch.object(multisite, 'get_sync_group') + @mock.patch.object(multisite, 'sync_group_exists') + def test_is_sync_group_update_needed(self, mock_sync_group_exists, + mock_get_sync_group): + mock_sync_group_exists.return_value = True + with open(self._testdata('test_get_sync_group'), 'r') as f: + mock_get_sync_group.return_value = json.loads(f.read()) + + result = multisite.is_sync_group_update_needed( + group_id='default', + flow_id='zone_a-zone_b', + pipe_id='zone_a-zone_b', + source_zone='zone_a', + dest_zone='zone_b', + desired_status=multisite.SYNC_POLICY_ALLOWED, + desired_flow_type=multisite.SYNC_FLOW_SYMMETRICAL, + ) + + mock_sync_group_exists.assert_called_with('default') + mock_get_sync_group.assert_called_with('default') + self.assertFalse(result) + + def test_is_sync_group_flow_update_needed(self): + with open(self._testdata('test_get_sync_group'), 'r') as f: + sync_group = json.loads(f.read()) + result = multisite.is_sync_group_flow_update_needed( + sync_group, + flow_id='zone_a-zone_b', + source_zone='zone_a', dest_zone='zone_b', + desired_flow_type=multisite.SYNC_FLOW_SYMMETRICAL, + ) + self.assertFalse(result) + + @mock.patch.object(multisite, 'remove_sync_group_flow') + def test_is_sync_group_flow_update_needed_flow_type_change( + self, mock_remove_sync_group_flow): + with open(self._testdata('test_get_sync_group'), 'r') as f: + sync_group = json.loads(f.read()) + result = multisite.is_sync_group_flow_update_needed( + sync_group, + flow_id='zone_a-zone_b', + source_zone='zone_a', dest_zone='zone_b', + desired_flow_type=multisite.SYNC_FLOW_DIRECTIONAL, + ) + mock_remove_sync_group_flow.assert_called_with( + group_id='default', + flow_id='zone_a-zone_b', + flow_type=multisite.SYNC_FLOW_SYMMETRICAL, + source_zone='zone_a', dest_zone='zone_b', + ) + self.assertTrue(result) + + def test_create_sync_group_flow_symmetrical(self): + with open(self._testdata('test_create_sync_group_flow'), 'rb') as f: + self.subprocess.check_output.return_value = f.read() + result = multisite.create_sync_group_flow( + group_id='default', + flow_id='flow_id', + flow_type=multisite.SYNC_FLOW_SYMMETRICAL, + source_zone='zone_a', + dest_zone='zone_b', + ) + self.assertEqual(result['groups'][0]['id'], 'default') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'flow', 'create', + '--group-id=default', + '--flow-id=flow_id', + '--flow-type=symmetrical', + '--zones=zone_a,zone_b', + ]) + + def test_create_sync_group_flow_directional(self): + with open(self._testdata('test_create_sync_group_flow'), 'rb') as f: + self.subprocess.check_output.return_value = f.read() + result = multisite.create_sync_group_flow( + group_id='default', + flow_id='flow_id', + flow_type=multisite.SYNC_FLOW_DIRECTIONAL, + source_zone='zone_a', + dest_zone='zone_b', + ) + self.assertEqual(result['groups'][0]['id'], 'default') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'flow', 'create', + '--group-id=default', + '--flow-id=flow_id', + '--flow-type=directional', + '--source-zone=zone_a', '--dest-zone=zone_b', + ]) + + def test_create_sync_group_flow_wrong_type(self): + self.assertRaises( + multisite.UnknownSyncFlowType, + multisite.create_sync_group_flow, + group_id='default', flow_id='flow_id', flow_type='wrong_type', + source_zone='zone_a', dest_zone='zone_b', + ) + + def test_remove_sync_group_flow_symmetrical(self): + multisite.remove_sync_group_flow( + group_id='default', + flow_id='flow_id', + flow_type=multisite.SYNC_FLOW_SYMMETRICAL, + ) + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'flow', 'remove', + '--group-id=default', + '--flow-id=flow_id', + '--flow-type=symmetrical', + ]) + + def test_remove_sync_group_flow_directional(self): + multisite.remove_sync_group_flow( + group_id='default', + flow_id='flow_id', + flow_type=multisite.SYNC_FLOW_DIRECTIONAL, + source_zone='zone_a', + dest_zone='zone_b', + ) + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'flow', 'remove', + '--group-id=default', + '--flow-id=flow_id', + '--flow-type=directional', + '--source-zone=zone_a', '--dest-zone=zone_b', + ]) + + def test_create_sync_group_pipe(self): + with open(self._testdata(whoami()), 'rb') as f: + self.subprocess.check_output.return_value = f.read() + result = multisite.create_sync_group_pipe( + group_id='default', + pipe_id='pipe_id', + source_zones=['zone_a', 'zone_b'], + dest_zones=['zone_c', 'zone_d'], + ) + self.assertEqual(result['groups'][0]['id'], 'default') + self.subprocess.check_output.assert_called_with([ + 'radosgw-admin', '--id=rgw.testhost', + 'sync', 'group', 'pipe', 'create', + '--group-id=default', + '--pipe-id=pipe_id', + '--source-zones=zone_a,zone_b', '--source-bucket=*', + '--dest-zones=zone_c,zone_d', '--dest-bucket=*', + ]) diff --git a/unit_tests/testdata/test_create_sync_group_flow.json b/unit_tests/testdata/test_create_sync_group_flow.json new file mode 100644 index 00000000..363ecf60 --- /dev/null +++ b/unit_tests/testdata/test_create_sync_group_flow.json @@ -0,0 +1,20 @@ +{ + "groups": [ + { + "id": "default", + "data_flow": { + "symmetrical": [ + { + "id": "zone_a-zone_b", + "zones": [ + "zone_a", + "zone_b" + ] + } + ] + }, + "pipes": [], + "status": "allowed" + } + ] +} diff --git a/unit_tests/testdata/test_create_sync_group_pipe.json b/unit_tests/testdata/test_create_sync_group_pipe.json new file mode 100644 index 00000000..6d2b6630 --- /dev/null +++ b/unit_tests/testdata/test_create_sync_group_pipe.json @@ -0,0 +1,49 @@ +{ + "groups": [ + { + "id": "default", + "data_flow": { + "symmetrical": [ + { + "id": "zone_a-zone_b", + "zones": [ + "zone_a", + "zone_b" + ] + } + ] + }, + "pipes": [ + { + "id": "zone_a-zone_b", + "source": { + "bucket": "*", + "zones": [ + "zone_a", + "zone_b" + ] + }, + "dest": { + "bucket": "*", + "zones": [ + "zone_a", + "zone_b" + ] + }, + "params": { + "source": { + "filter": { + "tags": [] + } + }, + "dest": {}, + "priority": 0, + "mode": "system", + "user": "" + } + } + ], + "status": "allowed" + } + ] +} diff --git a/unit_tests/testdata/test_get_sync_group.json b/unit_tests/testdata/test_get_sync_group.json new file mode 100644 index 00000000..0a3f43d8 --- /dev/null +++ b/unit_tests/testdata/test_get_sync_group.json @@ -0,0 +1,45 @@ +{ + "id": "default", + "data_flow": { + "symmetrical": [ + { + "id": "zone_a-zone_b", + "zones": [ + "zone_a", + "zone_b" + ] + } + ] + }, + "pipes": [ + { + "id": "zone_a-zone_b", + "source": { + "bucket": "*", + "zones": [ + "zone_a", + "zone_b" + ] + }, + "dest": { + "bucket": "*", + "zones": [ + "zone_a", + "zone_b" + ] + }, + "params": { + "source": { + "filter": { + "tags": [] + } + }, + "dest": {}, + "priority": 0, + "mode": "system", + "user": "" + } + } + ], + "status": "allowed" +} diff --git a/unit_tests/testdata/test_list_sync_groups.json b/unit_tests/testdata/test_list_sync_groups.json new file mode 100644 index 00000000..b80c2999 --- /dev/null +++ b/unit_tests/testdata/test_list_sync_groups.json @@ -0,0 +1,45 @@ +[ + { + "key": "default", + "val": { + "id": "default", + "data_flow": { + "directional": [ + { + "source_zone": "zone_a", + "dest_zone": "zone_b" + } + ] + }, + "pipes": [ + { + "id": "zone_a-zone_b", + "source": { + "bucket": "*", + "zones": [ + "zone_a" + ] + }, + "dest": { + "bucket": "*", + "zones": [ + "zone_b" + ] + }, + "params": { + "source": { + "filter": { + "tags": [] + } + }, + "dest": {}, + "priority": 0, + "mode": "system", + "user": "" + } + } + ], + "status": "allowed" + } + } +]