Skip to content

Conversation

@Yu-Jack
Copy link
Contributor

@Yu-Jack Yu-Jack commented Jun 13, 2025

Which issue(s) this PR fixes:

Issue harvester/harvester#8148

What this PR does / why we need it:

Since we can use copilot, I think we can utilize it.

It's just a way to create the test cases quickly, it still require us to adjust. It's not a magic tool right now, we still need to improve the prompt and copilot instruction. At least, I used it to create another integration test PR (#2054).

Special notes for your reviewer:

Before trying this, please add below codes in your settings.json of VSCode.

{
    "github.copilot.chat.codeGeneration.useInstructionFiles": true,
    "github.copilot.chat.codeGeneration.instructions": [
        {
            "file": "./.github/instructions/harvester-test-instructions.md",
            "description": "Instructions for generating integration/api test cases for the Harvester server",
        }
    ],
}

Additional documentation or context

Copilot Instruction

@Yu-Jack Yu-Jack force-pushed the HARV-8148-copilot branch from a46a2c3 to 3309512 Compare June 13, 2025 03:45
@Yu-Jack
Copy link
Contributor Author

Yu-Jack commented Jun 13, 2025

This is example prompt.


What I want to do

I want to test the creation of encrypted and decrypted images in Harvester.

Please help me:

  • Add a new test class in a new file: integrations/test_1_security_images.py
  • Use integrations/test_1_images.py as a reference

Shared Fixtures

Please reuse or define the following shared fixtures:

  1. Create a Secret with encryption parameters by using Harvester API

    {
      "CRYPTO_KEY_CIPHER": "aes-xts-plain64",
      "CRYPTO_KEY_HASH": "sha256",
      "CRYPTO_KEY_PROVIDER": "secret",
      "CRYPTO_KEY_SIZE": "256",
      "CRYPTO_KEY_VALUE": "test",
      "CRYPTO_PBKDF": "argon2i"
    }
  2. Create a StorageClass with encryption settings and secret reference by using Harvester API

    {
      "csi.storage.k8s.io/node-publish-secret-name": "<secret_name>",
      "csi.storage.k8s.io/node-publish-secret-namespace": "<namespace>",
      "csi.storage.k8s.io/node-stage-secret-name": "<secret_name>",
      "csi.storage.k8s.io/node-stage-secret-namespace": "<namespace>",
      "csi.storage.k8s.io/provisioner-secret-name": "<secret_name>",
      "csi.storage.k8s.io/provisioner-secret-namespace": "<namespace>",
      "encrypted": "true",
      "migratable": "true",
      "numberOfReplicas": "3",
      "staleReplicaTimeout": "30"
    }
  3. Shared image

    This is the base image used to create encrypted image.


Main Test Cases

Please separate the following into individual test functions within the same class:

  1. Create an Encrypted Image by using Harvester API

    • Use the StorageClass created in step 2 of shared fixture.
    • Use the following spec snippet:
      data["spec"]["securityParameters"] = {
        "cryptoOperation": "encrypt",
        "sourceImageName": source_image_name,
        "sourceImageNamespace": namespace,
      }
      data["spec"].pop("checksum", None)
      data["spec"].pop("url", None)
      spec.metadata.annotations["harvesterhci.io/storageClassName"] = storage_class_name
      spec.displayName = "<Encrypted Image Name>"
  2. Create a Decrypted Image by using Harvester API

    • Use the encrypted image created above as the source.
    • Use the following spec snippet:
      data["spec"]["securityParameters"] = {
        "cryptoOperation": "decrypt",
        "sourceImageName": source_encrypted_image_name,
        "sourceImageNamespace": namespace,
      }
      data["spec"].pop("checksum", None)
      data["spec"].pop("url", None)
      spec.displayName = "<Decrypted Image Name>"

Additional Notes

  • Only use image_ubuntu.

@Yu-Jack Yu-Jack self-assigned this Jun 13, 2025
@Yu-Jack Yu-Jack force-pushed the HARV-8148-copilot branch from 3309512 to faef991 Compare June 13, 2025 06:10
@Yu-Jack Yu-Jack marked this pull request as ready for review June 13, 2025 06:19
Copilot AI review requested due to automatic review settings June 13, 2025 06:19
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new documentation file for GitHub Copilot instructions to streamline test case generation. The changes include:

  • Detailed guidelines for writing integration and API tests.
  • Templates for shared fixtures, test cases, and Harvester API manager implementations.
Comments suppressed due to low confidence (2)

.github/instructions/copilot-instructions.md:45

  • The fixture scope 'module or class' is ambiguous as pytest expects a single valid scope (e.g. 'module' or 'class'). Please clarify this or split into separate examples.
@pytest.fixture(scope="module or class")

.github/instructions/copilot-instructions.md:77

  • [nitpick] The placeholder '${some fixtures and shared_fixture}' may confuse users. Consider replacing it with a clear indication of the required fixture names.
def test_xxxx(self, ${some fixtures and shared_fixture}):

Copy link
Member

@lanfon72 lanfon72 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Harvester API would be more complicated, so I feel copilot would not be useful in it.

assert code == xxxx, f"Expected status code xxxx, got {code} with data: {data}"
```

## Harvester API Manager Template
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Harvester API is not tight with our pytest test cases, we would separate it to another repo in near future, so this would not be used in here.

Copy link
Contributor Author

@Yu-Jack Yu-Jack Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, I think we can keep it here until we move it out.

## Don't Do

- Do **not** use method names with `_` prefixes in tests, such as `api_client.[resource]._[method]`.
- Do **not** define a function inside another function, for example:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example as

@pytest.fixture(scope="session")
def vm_checker(api_client, wait_timeout, sleep_timeout, vm_shell):
from dataclasses import dataclass, field
@dataclass
class ResponseContext:
callee: str
code: int
data: dict
options: dict = field(default_factory=dict, compare=False)
def __iter__(self):
''' handy method be used to unpack'''
return iter([self.code, self.data])
@dataclass
class ShellContext:
command: str
stdout: str
stderr: str
options: dict = field(default_factory=dict, compare=False)
def __iter__(self):
''' handy method be used to unpack'''
return iter([self.stdout, self.stderr])
def default_cb(ctx):
''' identity callback function for adjust checking condition.
:rtype: boolean
:return: True when hit the additional check
'''
return True
class VMChecker:
def __init__(self, vm_api, wait_timeout, snooze=3):
self.vms = vm_api
self.wait_timeout = wait_timeout
self.snooze = snooze
def _endtime(self):
return datetime.now() + timedelta(seconds=self.wait_timeout)
@contextmanager
def configure(self, snooze=None, wait_timeout=None):
''' context manager to temporarily change snooze or wait_timeout '''
s, t = self.snooze, self.wait_timeout
try:
self.snooze, self.wait_timeout = snooze or s, wait_timeout or t
yield self
finally:
self.snooze, self.wait_timeout = s, t
def wait_getable(self, vm_name, endtime=None, callback=default_cb, **kws):
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get', *self.vms.get(vm_name, **kws))
if 200 == ctx.code and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_stopped(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.stop', *self.vms.stop(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('get_status', *self.vms.get_status(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_status_stopped(self, vm_name, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.stop':
return callback(ctx)
ctx.code, ctx.data = self.vms.get(vm_name, **kws)
ctx.callee = 'vm.get'
return (
200 == ctx.code
and "Stopped" == ctx.data.get('status', {}).get('printableStatus')
and callback(ctx)
)
return self.wait_stopped(vm_name, endtime, cb, **kws)
def wait_status_running(self, vm_name, endtime=None, callback=default_cb, **kws):
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get', *self.vms.get(vm_name, **kws))
status = ctx.data.get('status', {}).get('printableStatus')
if 200 == ctx.code and "Running" == status and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_deleted(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.delete', *self.vms.delete(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_restarted(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
options = dict(old_pods=set(ctx.data['status']['activePods'].items()))
ctx = ResponseContext('vm.restart', *self.vms.restart(vm_name, **kws), options)
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws),
ctx.options)
if 404 != ctx.code:
old_pods = ctx.options['old_pods']
cur_pods = ctx.data['status'].get('activePods', {}).items()
if old_pods.difference(cur_pods or old_pods) and callback(ctx):
break
sleep(self.snooze)
else:
return False, ctx
return self.wait_started(vm_name, endtime, callback, **kws)
def wait_started(self, vm_name, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.start', *self.vms.start(vm_name, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if (
200 == ctx.code
and "Running" == ctx.data.get('status', {}).get('phase')
and callback(ctx)
):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_agent_connected(self, vm_name, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.start':
return callback(ctx)
conds = ctx.data.get('status', {}).get('conditions', [{}])
return (
"AgentConnected" == conds[-1].get('type')
and callback(ctx)
)
return self.wait_started(vm_name, endtime, cb, **kws)
def wait_interfaces(self, vm_name, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.start':
return callback(ctx)
return (
ctx.data.get('status', {}).get('interfaces')
and callback(ctx)
)
return self.wait_agent_connected(vm_name, endtime, cb, **kws)
def wait_ip_addresses(self, vm_name, ifnames, endtime=None, callback=default_cb, **kws):
def cb(ctx):
if ctx.callee == 'vm.start':
return callback(ctx)
ifaces = {d['name']: d for d in ctx.data.get('status', {}).get('interfaces', {})}
return (
all(ifaces.get(name, {}).get('ipAddress') for name in ifnames)
and callback(ctx)
)
ifnames = list(ifnames)
return self.wait_interfaces(vm_name, endtime, cb, **kws)
def wait_cloudinit_done(self, shell, endtime=None, callback=default_cb, **kws):
cmd = 'cloud-init status'
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ShellContext(cmd, *shell.exec_command(cmd))
if 'done' in ctx.stdout and callback(ctx):
break
sleep(self.snooze)
else:
return False, (ctx.stdout, ctx.stderr)
return True, (ctx.stdout, ctx.stderr)
def wait_migrated(self, vm_name, new_host, endtime=None, callback=default_cb, **kws):
ctx = ResponseContext('vm.migrate', *self.vms.migrate(vm_name, new_host, **kws))
if 404 == ctx.code and callback(ctx):
return False, ctx
endtime = endtime or self._endtime()
while endtime > datetime.now():
ctx = ResponseContext('vm.get_status', *self.vms.get_status(vm_name, **kws))
if (
not ctx.data['metadata']['annotations'].get("harvesterhci.io/migrationState")
and new_host == ctx.data['status']['nodeName']
and callback(ctx)
):
break
sleep(self.snooze)
else:
return False, ctx
return True, ctx
def wait_ssh_connected(
self, vm_ip, username, password=None, pkey=None, endtime=None, **kws
):
vm_sh = vm_shell(username, password, pkey)
endtime = endtime or self._endtime()
while endtime > datetime.now():
try:
vm_sh.connect(vm_ip, **kws)
except (ChannelException, NoValidConnectionsError) as e:
login_ex = e
sleep(self.snooze)
else:
break
else:
raise AssertionError(f"Unable to login to VM {vm_ip}") from login_ex
return vm_sh
return VMChecker(api_client.vms, wait_timeout, sleep_timeout)

so it would not be Don't, but need more consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I didn't notice we have this pattern, thanks!

@Yu-Jack Yu-Jack force-pushed the HARV-8148-copilot branch 6 times, most recently from 976980a to e81c9bc Compare June 23, 2025 08:20
@Yu-Jack Yu-Jack force-pushed the HARV-8148-copilot branch from e81c9bc to 3aca361 Compare June 23, 2025 08:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants