From 6026bdd613486fff7608b96be0473d6affc26c3e Mon Sep 17 00:00:00 2001 From: Ruoyu Ying Date: Fri, 14 Jun 2024 10:14:53 +0800 Subject: [PATCH] tpm: fix tpm get_quote() Signed-off-by: Ruoyu Ying --- src/python/cc_quote_cli.py | 16 +++++- src/python/cctrusted_vm/cvm.py | 98 ++++++++++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 6 deletions(-) diff --git a/src/python/cc_quote_cli.py b/src/python/cc_quote_cli.py index bc5f501..84917ed 100644 --- a/src/python/cc_quote_cli.py +++ b/src/python/cc_quote_cli.py @@ -71,14 +71,28 @@ def main(): help="Output format: raw/human. Default raw.", type=out_format_validator ) + parser.add_argument( + "--pcr-selection", + type=str, + default="sha256:1,2,10", + help="PCR Selection to generate quote", + dest="pcr_selection" + ) + parser.add_argument("--ak-context", type=str, help="Path to ak context", dest="ak_context") args = parser.parse_args() nonce = make_nounce() LOG.info("demo random number in base64: %s", nonce.decode("utf-8")) userdata = make_userdata() LOG.info("demo user data in base64: %s", userdata.decode("utf-8")) + extra_args = {} + extra_args["pcr_selection"] = args.pcr_selection + extra_args["ak_context"] = args.ak_context - quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata) + if ConfidentialVM.detect_cc_type() == CCTrustedApi.TYPE_CC_TPM: + quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata, extra_args) + else: + quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata) if quote is not None: quote.dump(args.out_format == OUT_FORMAT_RAW) else: diff --git a/src/python/cctrusted_vm/cvm.py b/src/python/cctrusted_vm/cvm.py index f5145ae..e485978 100644 --- a/src/python/cctrusted_vm/cvm.py +++ b/src/python/cctrusted_vm/cvm.py @@ -22,7 +22,12 @@ from cctrusted_base.tdx.common import TDX_VERSION_1_0, TDX_VERSION_1_5 from cctrusted_base.tdx.rtmr import TdxRTMR from cctrusted_base.tdx.quote import TdxQuoteReq10, TdxQuoteReq15, TdxQuote, TdxQuoteReq +from cctrusted_base.tpm.pcr import TpmPCR +from cctrusted_base.tpm.quote import Tpm2Quote from cctrusted_base.tdx.report import TdxReportReq10, TdxReportReq15 +from tpm2_pytss import ESAPI +from tpm2_pytss.types import TPML_PCR_SELECTION, TPMS_CONTEXT, TPM2B_DATA + LOG = logging.getLogger(__name__) @@ -144,6 +149,10 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor Returns: The ``CcReport`` object. """ + # In tpm case, skip get report through configfs-tsm + if self.cc_type == CCTrustedApi.TYPE_CC_TPM: + return None + if not os.path.exists(self.tsm_prefix): return None @@ -248,9 +257,6 @@ def inst(): LOG.error("Fail to initialize the confidential VM.") return ConfidentialVM._inst -from tpm2_pytss import ESAPI -from cctrusted_base.tpm.pcr import TpmPCR - class TpmVM(ConfidentialVM): DEFAULT_TPM_DEVICE_NODE="/dev/tpm0" @@ -265,26 +271,108 @@ def __init__(self, dev_node=DEFAULT_TPM_DEVICE_NODE): def default_algo_id(self): return TcgAlgorithmRegistry.TPM_ALG_SHA256 + @property + def version(self): + return "TPM2" + def process_cc_report(self, report_data=None) -> bool: """ For TPM, we do not need to get integrited measurement register """ for index in range(24): - _, _, digests = self._esapi.pcr_read("sha256:%d" % index) + _, _, digests = self._esapi.pcr_read(f"sha256:{index}") assert digests.count == 1 self._imrs[index] = TpmPCR(index, bytes.fromhex(str(digests.digests[0]))) return True def process_eventlog(self) -> bool: + """ + Process TPM event logs from /sys/kernel/security/tpm0/binary_bios_measurements + """ try: with open(TpmVM.BIOS_MEAUSREMENT, "rb") as f: self._boot_time_event_log = f.read() assert len(self._boot_time_event_log) > 0 except (PermissionError, OSError): - LOG.error("Need root permission to open file %s", TdxVM.BIOS_MEAUSREMENT) + LOG.error("Need root permission to open file %s", TpmVM.BIOS_MEAUSREMENT) return False return True + def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcReport: + """ + Fetch TPM quote with user provided attestation key + """ + pcr_selection = None + ak_context_path = None + ak_handle = None + input_data = None + + # Get user defined params including attestation key context and PCRSelections + if extraArgs is not None and "pcr_selection" in extraArgs.keys() \ + and "ak_context" in extraArgs.keys(): + try: + pcr_selection = TPML_PCR_SELECTION.parse(extraArgs["pcr_selection"]) + except ValueError: + LOG.error("Invalid PCRSelection provided for quote generation") + return None + ak_context_path = extraArgs["ak_context"] + else: + LOG.error( + "Required params(pcr_selection or ak_context) not provided for quote generation.") + return None + + # Prepare user defined data which could include nonce + if nonce is not None: + nonce = base64.b64decode(nonce, validate=True) + if data is not None: + data = base64.b64decode(data, validate=True) + + # the algorithm to concatenate nonce and user data will depend on + # algorithm defined in the pcr_selection + algo = extraArgs["pcr_selection"].split(":")[0] + if algo == "sha1": + hash_algo = hashlib.sha1() + elif algo == "sha256": + hash_algo = hashlib.sha256() + elif algo == "sha384": + hash_algo = hashlib.sha384() + elif algo == "sha512": + hash_algo = hashlib.sha512() + else: + LOG.error("Unsupported hash algorithm.") + return None + + LOG.info("Calculate report data by nonce and user data") + if nonce is not None: + hash_algo.update(bytes(nonce)) + if data is not None: + hash_algo.update(bytes(data)) + input_data = hash_algo.digest() + + # Open attestation key context from file + if os.path.exists(ak_context_path): + try: + with open(ak_context_path, 'rb') as ak_file: + ak_context = ak_file.read() + ak_handle = self._esapi.context_load(TPMS_CONTEXT.from_tools(ak_context)) + except(PermissionError, OSError): + LOG.error("Lack of permission to open file %s for ak context.", + ak_context_path) + else: + LOG.error("ak_context not found under path %s", ak_context_path) + + # Generate quote and signature + quote, signature = self._esapi.quote(ak_handle, pcr_selection, TPM2B_DATA(input_data)) + + # Flush the loaded context after use + self._esapi.flush_context(ak_handle) + + # Save the tpm quote + structured_quote = Tpm2Quote(None, CCTrustedApi.TYPE_CC_TPM) + structured_quote.set_quoted_data(quote) + structured_quote.set_sig(signature) + return structured_quote + class TdxVM(ConfidentialVM): DEVICE_NODE_PATH = {