Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1ce27f1
feat: Add keyvault copy command
jcassanji-southworks Feb 4, 2026
2e41392
keyvault secret copy command improvements
jcassanji-southworks Feb 4, 2026
ec4d378
Move test case to file
jcassanji-southworks Feb 4, 2026
fe468ec
Apply PR feedbacks
jcassanji-southworks Feb 5, 2026
ad02119
Update src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/…
jcassanji-southworks Feb 5, 2026
0b8e4cc
Update src/azure-cli/azure/cli/command_modules/keyvault/custom.py
jcassanji-southworks Feb 5, 2026
e90b9bd
Update src/azure-cli/azure/cli/command_modules/keyvault/custom.py
jcassanji-southworks Feb 5, 2026
e9722b8
Update src/azure-cli/azure/cli/command_modules/keyvault/tests/latest/…
jcassanji-southworks Feb 5, 2026
90c664f
Update src/azure-cli/azure/cli/command_modules/keyvault/_params.py
jcassanji-southworks Feb 5, 2026
a140d6c
Apply PR feedback
jcassanji-southworks Feb 5, 2026
c44cf06
Merge branch 'jcassanji-southworks/feature-keyvault-copy' of https://…
jcassanji-southworks Feb 5, 2026
de2f265
Update src/azure-cli/azure/cli/command_modules/keyvault/_params.py
jcassanji-southworks Feb 5, 2026
4fc1c56
Update src/azure-cli/azure/cli/command_modules/keyvault/custom.py
jcassanji-southworks Feb 5, 2026
429bfcc
Update src/azure-cli/azure/cli/command_modules/keyvault/custom.py
jcassanji-southworks Feb 5, 2026
9b3b669
Update src/azure-cli/azure/cli/command_modules/keyvault/custom.py
jcassanji-southworks Feb 5, 2026
69e3d2c
Enhance keyvault secret copy command: update overwrite flag to store_…
jcassanji-southworks Feb 5, 2026
f241f0a
Merge branch 'jcassanji-southworks/feature-keyvault-copy' of https://…
jcassanji-southworks Feb 5, 2026
8b11ea1
Fix copy_secret function: remove http_logging_policy from client kwar…
jcassanji-southworks Feb 5, 2026
3376e7d
Implement secret copying functionality: add _copy_single_secret funct…
jcassanji-southworks Feb 5, 2026
56bf92a
Add unit tests for keyvault secret copying functionality
jcassanji-southworks Feb 5, 2026
41ad995
Merge remote-tracking branch 'upstream/dev' into jcassanji-southworks…
jcassanji-southworks Feb 5, 2026
e3e8f0e
Update KeyVault preparation to disable RBAC authorization for copy tests
jcassanji-southworks Feb 5, 2026
398c73a
Add User-Agent filter to prevent recording mismatch in KeyVault copy …
jcassanji-southworks Feb 5, 2026
2a79899
Fix unused import in keyvault custom.py
jcassanji-southworks Feb 5, 2026
db40168
implement PR feedback
jcassanji-southworks Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/azure-cli/azure/cli/command_modules/keyvault/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,20 @@
the secret will be downloaded. This operation requires the secrets/backup permission.
"""

helps['keyvault secret copy'] = """
type: command
short-summary: Copy a secret from one Key Vault to another.
long-summary: Copies the latest version of a secret from a source Key Vault to a destination Key Vault.
This operation copies the secret value and its metadata (tags, content-type, attributes).
examples:
- name: Copy a specific secret from one vault to another.
text: az keyvault secret copy --source-vault SourceVault --destination-vault DestVault --name MySecret
- name: Copy all secrets from one vault to another.
text: az keyvault secret copy --source-vault SourceVault --destination-vault DestVault --all
- name: Copy a secret and overwrite if it already exists in the destination.
text: az keyvault secret copy --source-vault SourceVault --destination-vault DestVault --name MySecret --overwrite
"""

helps['keyvault secret restore'] = """
type: command
short-summary: Restores a backed up secret to a vault.
Expand Down
14 changes: 14 additions & 0 deletions src/azure-cli/azure/cli/command_modules/keyvault/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,20 @@ class CLISecurityDomainOperation(str, Enum):
with self.argument_context('keyvault secret restore') as c:
c.extra('vault_base_url', vault_name_type, required=True, arg_group='Id',
type=get_vault_base_url_type(self.cli_ctx), id_part=None)

with self.argument_context('keyvault secret copy') as c:
c.extra('vault_base_url', vault_name_type, type=get_vault_base_url_type(self.cli_ctx),
options_list=['--source-vault'], help='Name of the source Key Vault.', required=True)
c.extra('destination_vault', vault_name_type, type=get_vault_base_url_type(self.cli_ctx),
options_list=['--destination-vault'], help='Name of the destination Key Vault.', required=True)
c.argument('name', options_list=['--name', '-n'],
help='Name of the secret to copy. Mutually exclusive with --all. If neither --name nor --all is '
'specified, all secrets will be copied.',
required=False)
c.extra('all_secrets', arg_type=get_three_state_flag(), options_list=['--all'],
help='Copy all secrets from the source vault. Mutually exclusive with --name. If neither --name nor '
'--all is specified, all secrets will be copied.')
c.extra('overwrite', arg_type=get_three_state_flag(), help='Overwrite secrets in the destination vault if they already exist.')
# endregion

# region keyvault security-domain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def load_command_table(self, _):
g.keyvault_custom('download', 'download_secret')
g.keyvault_custom('backup', 'backup_secret')
g.keyvault_custom('restore', 'restore_secret', transform=transform_secret_set_attributes)
g.keyvault_custom('copy', 'copy_secret')

# certificate track2
with self.command_group('keyvault certificate', data_certificate_entity.command_type) as g:
Expand Down
142 changes: 142 additions & 0 deletions src/azure-cli/azure/cli/command_modules/keyvault/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2517,3 +2517,145 @@ def set_attributes_certificate(client, certificate_name, version=None, policy=No
if kwargs.get('enabled') is not None or kwargs.get('tags') is not None:
return client.update_certificate_properties(certificate_name=certificate_name, version=version, **kwargs)
return client.get_certificate(certificate_name=certificate_name)


def _copy_single_secret(source_client, dest_client, secret_name, overwrite, is_single_mode):
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError

try:
# Check destination
if not overwrite:
try:
dest_client.get_secret(secret_name)
logger.warning("Secret '%s' already exists in destination. Skipping.", secret_name)
return None # Skipped
except ResourceNotFoundError:
pass
except HttpResponseError as e:
status_code = getattr(e, "status_code", None)
if status_code == 403:
logger.error("Access denied (403) checking secret '%s' in destination: %s", secret_name, str(e))
elif status_code is not None and 400 <= status_code < 500:
logger.warning("Client error (%s) checking secret '%s' in destination: %s",
status_code, secret_name, str(e))
elif status_code is not None and status_code >= 500:
logger.error("Server error (%s) checking secret '%s' in destination: %s",
status_code, secret_name, str(e))
else:
logger.error("Unexpected error checking secret '%s' in destination: %s", secret_name, str(e))
return False # Failed

# Copy
logger.info("Copying secret: %s", secret_name)
s = source_client.get_secret(secret_name)

try:
new_secret = dest_client.set_secret(
s.name,
s.value,
content_type=s.properties.content_type,
tags=s.properties.tags,
enabled=s.properties.enabled,
not_before=s.properties.not_before,
expires_on=s.properties.expires_on
)
except HttpResponseError as e:
from azure.cli.core.azclierror import CLIError
if is_single_mode:
raise CLIError(f"Failed to copy secret '{secret_name}': {str(e)}")

logger.error("Failed to copy secret '%s': %s", secret_name, str(e))
return False

logger.info("Successfully copied secret: %s", secret_name)
return {'name': new_secret.name, 'id': new_secret.id}

except ResourceNotFoundError:
if is_single_mode:
raise CLIError("Secret '{}' not found in source vault.".format(secret_name))
logger.error("Secret '%s' not found in source vault.", secret_name)
return False
except HttpResponseError as e:
if is_single_mode:
raise CLIError("Failed to copy secret '{}': {}".format(secret_name, str(e)))

if e.status_code == 403: # Forbidden
logger.error("Access denied (403) for secret '%s': %s", secret_name, str(e))
else:
logger.error("Failed to copy secret '%s': %s", secret_name, str(e))
return False


def copy_secret(cmd, client, destination_vault, name=None, all_secrets=None, overwrite=False):
from azure.core.exceptions import ResourceNotFoundError, HttpResponseError
from azure.keyvault.secrets import SecretClient
from azure.cli.core._profile import Profile
from azure.cli.core.commands.client_factory import prepare_client_kwargs_track2

# If neither a specific secret name nor --all is provided, default to copying all secrets.
if not name and not all_secrets:
all_secrets = True

# A specific secret name and --all are mutually exclusive.
if name and all_secrets:
raise MutuallyExclusiveArgumentError("Specify either a secret name or --all, but not both.")
# Validation
if client.vault_url.rstrip('/') == destination_vault.rstrip('/'):
raise CLIError("Source and destination Key Vaults cannot be the same.")

profile = Profile(cli_ctx=cmd.cli_ctx)
credential, _, _ = profile.get_login_credentials(subscription_id=cmd.cli_ctx.data.get('subscription_id'))

# Use standard client kwargs for consistent logging/telemetry
client_kwargs = prepare_client_kwargs_track2(cmd.cli_ctx)
# KeyVault clients handle this internally or differently sometimes, mimicking _client_factory
client_kwargs.pop('http_logging_policy', None)

dest_client = SecretClient(
vault_url=destination_vault,
credential=credential,
api_version='7.4',
verify_challenge_resource=False,
**client_kwargs
)

# Fail fast if destination vault is not accessible or does not exist
try:
# Perform a lightweight call to validate vault accessibility.
# A 404 for a dummy secret name means the vault is reachable but the secret does not exist.
dest_client.get_secret("azure-cli-validation-dummy")
except ResourceNotFoundError:
# Vault is accessible but the dummy secret does not exist, which is expected.
pass
except HttpResponseError as e:
raise CLIError(f"Failed to access destination Key Vault '{destination_vault}': {str(e)}")

secrets_to_copy = []
if name:
secrets_to_copy.append(name)
else:
logger.info("Copying all secrets from source...")
try:
source_secrets = client.list_properties_of_secrets()
for s in source_secrets:
if s.managed:
logger.warning("Skipping managed secret: %s", s.name)
continue
secrets_to_copy.append(s.name)
except HttpResponseError as e:
raise CLIError(f"Failed to list secrets from source: {str(e)}")

copied_secrets = []
failed_secrets = []
for secret_name in secrets_to_copy:
result = _copy_single_secret(client, dest_client, secret_name, overwrite, bool(name))
if result:
copied_secrets.append(result)
elif result is False:
failed_secrets.append(secret_name)

if failed_secrets:
logger.warning("Operation completed with failures. %s secrets failed to copy: %s",
len(failed_secrets), ', '.join(failed_secrets))

return copied_secrets
Original file line number Diff line number Diff line change
Expand Up @@ -2778,5 +2778,96 @@ def test_keyvault_mhsm_region(self, resource_group, managed_hsm):
self.cmd('keyvault region remove -g {rg} --hsm-name {hsm_name} -r uksouth')


class KeyVaultCopyScenarioTest(ScenarioTest):
# Filter User-Agent to prevent recording mismatch between recording env (Windows) and CI (Linux)
FILTER_HEADERS = ScenarioTest.FILTER_HEADERS + ['user-agent']

@ResourceGroupPreparer(name_prefix='cli_test_keyvault_copy')
@KeyVaultPreparer(name_prefix='cli-test-kv-src-', additional_params='--enable-rbac-authorization false')
def test_keyvault_secret_copy(self, resource_group, key_vault):
self.kwargs.update({
'src_kv': key_vault,
'dest_kv': self.create_random_name('cli-test-kv-dest-', 24),
'secret_name': self.create_random_name('secret-', 24),
'secret_value': 'mysecretvalue',
'new_val': 'newval',
'secret_name_2': self.create_random_name('secret2-', 24)
})

# Create Dest KV
# Use simple creation to ensure speed and reliability in playback
self.cmd('keyvault create -g {rg} -n {dest_kv} --enable-rbac-authorization false')
self.addCleanup(self.cmd, 'keyvault delete -g {rg} -n {dest_kv}')
self.addCleanup(self.cmd, 'keyvault purge -n {dest_kv}')

# Set secret in Source with tags and content-type
self.cmd('keyvault secret set --vault-name {kv} -n {secret_name} --value {secret_value} --tags tag1=value1 --content-type text/plain')

# 1. Copy specific secret
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv} --name {secret_name}')
self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret_name}', checks=[
self.check('value', '{secret_value}'),
self.check('tags.tag1', 'value1'),
self.check('contentType', 'text/plain')
])

# 2. Copy all secrets
# Add another secret to source
self.cmd('keyvault secret set --vault-name {kv} -n {secret_name_2} --value {secret_value}')

# Run copy --all
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv} --all')

# Verify both exist in dest
self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret_name_2}', checks=[
self.check('value', '{secret_value}')
])

# 3. Test overwrite protection (default behavior: skip)
# Update source
self.cmd('keyvault secret set --vault-name {kv} -n {secret_name} --value {new_val}')

# Copy without overwrite (should skip)
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv} --name {secret_name}')

# Verify destination still has old value
self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret_name}', checks=[
self.check('value', '{secret_value}')
])

# 4. Test overwrite
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv} --name {secret_name} --overwrite')

# Verify destination has new value
self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret_name}', checks=[
self.check('value', '{new_val}')
])

# 5. Test Mutual Exclusivity
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv} --name {secret_name} --all', expect_failure=True)

# 6. Test Source == Destination (Should fail)
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {kv} --name {secret_name}', expect_failure=True)

# 7. Test Non-existent Destination (Should fail fast)
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault non_existent_kv_12345 --name {secret_name}', expect_failure=True)

# 8. Test Non-existent Secret in Source (Should fail)
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv} --name non_existent_secret_123', expect_failure=True)

# 9. Test Default Behavior (Implicit --all)
# Add a unique secret to check implicit copy
secret_name_3 = self.create_random_name('secret3-', 24)
self.kwargs['secret_name_3'] = secret_name_3
self.cmd('keyvault secret set --vault-name {kv} -n {secret_name_3} --value {secret_value}')

# Run copy without --name or --all
self.cmd('keyvault secret copy --source-vault {kv} --destination-vault {dest_kv}')

# Verify it was copied
self.cmd('keyvault secret show --vault-name {dest_kv} -n {secret_name_3}', checks=[
self.check('value', '{secret_value}')
])

if __name__ == '__main__':
unittest.main()
Loading
Loading