diff --git a/common/python/cctrusted_base/tdx/__init__.py b/common/python/cctrusted_base/tdx/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/common/python/cctrusted_base/tdx/common.py b/common/python/cctrusted_base/tdx/common.py new file mode 100644 index 00000000..7a5f960a --- /dev/null +++ b/common/python/cctrusted_base/tdx/common.py @@ -0,0 +1,10 @@ +""" +Common definition for Intel TDX. +""" +TDX_VERSION_1_0 = "1.0" +TDX_VERSION_1_5 = "1.5" + +# The length of the reportdata +TDX_REPORTDATA_LEN = 64 +# The length of the tdreport +TDX_REPORT_LEN = 1024 diff --git a/common/python/cctrusted_base/tdx/report.py b/common/python/cctrusted_base/tdx/report.py new file mode 100644 index 00000000..f4e9a165 --- /dev/null +++ b/common/python/cctrusted_base/tdx/report.py @@ -0,0 +1,327 @@ +""" +Manage the TDX Report +""" +import logging +from abc import ABC, abstractmethod +from cctrusted_base.binaryblob import BinaryBlob +from cctrusted_base.tdx.common import TDX_VERSION_1_0, TDX_VERSION_1_5, \ + TDX_REPORTDATA_LEN, TDX_REPORT_LEN + +LOG = logging.getLogger(__name__) + +class ModuleVersion: + ''' + class ModuleVersion contains version infomation of tdx module + ''' + + VALID_SVN_LENGTH = 16 + + def __init__(self, release_names: list[str], major: int, minor: int, is_debug: bool = False): + self.release_names = release_names + self.major = major + self.minor = minor + self.is_debug = is_debug + + @staticmethod + def from_bytes(tee_tcb_svn: bytes): + ''' + Method from_bytes parses bytes of the svn, if it + is valid, return the instance of the module version. + ''' + if len(tee_tcb_svn) != ModuleVersion.VALID_SVN_LENGTH: + return None, False + version_bytes = tee_tcb_svn[0:2] + for version in VALID_MODULE_VERSIONS: + if version.to_hex().to_bytes(2, byteorder='little') == version_bytes: + return version, True + return None, False + + def to_hex(self): + ''' + Method to_hex converts the module version to the svn in hex. + ''' + return self.major * 16 * 16 + self.minor + + def __str__(self): + sep = " or " + names = sep.join(self.release_names) + return (f'module version: {{ ' + f'release_names: {names},' + f'major: {self.major},' + f'minor: {self.minor},' + f'is_debug: {self.is_debug}' + f' }}' + ) + +VALID_MODULE_VERSIONS = [ + ModuleVersion(["1.0"], 0, 0, True), + ModuleVersion(["1.0"], 0, 3), + ModuleVersion(["1.4", "1.5"], 1, 0, True), + ModuleVersion(["1.4", "1.5"], 1, 3), + ModuleVersion(["2.0"], 2, 0, True), + ModuleVersion(["2.0"], 2, 3), +] + +class ReportMacStruct(BinaryBlob): + """ + Struct REPORTMACSTRUCT + """ + + def __init__(self, data): + super().__init__(data) + self.report_type = None + self.reserverd1 = None + self.cpusvn = None + self.tee_tcb_info_hash = None + self.tee_info_hash = None + self.report_data = None + self.reserverd2 = None + self.mac = None + + def parse(self): + """ + parse the raw data for Struct REPORTMACSTRUCT + + Struct REPORTMACSTRUCT's layout: + offset, len + 0x0, 0x8 report_type + 0x8, 0x8 reserverd1 + 0x10, 0x10 cpusvn + 0x20, 0x30 tee_tcb_info_hash + 0x50, 0x30 tee_info_hash + 0x80, 0x40 report_data + 0xc0, 0x20 reserverd2 + 0xe0, 0x20 mac + """ + offset = 0 + + self.report_type, offset = self.get_bytes(offset, 0x8) + self.reserverd1, offset = self.get_bytes(offset, 0x8) + self.cpusvn, offset = self.get_bytes(offset, 0x10) + self.tee_tcb_info_hash, offset = self.get_bytes(offset, 0x30) + self.tee_info_hash, offset = self.get_bytes(offset, 0x30) + self.report_data, offset = self.get_bytes(offset, 0x40) + self.reserverd2, offset = self.get_bytes(offset, 0x20) + self.mac, offset = self.get_bytes(offset, 0x20) + +class TeeTcbInfo(BinaryBlob): + """ + Struct TEE_TCB_INFO + """ + + def __init__(self, data): + super().__init__(data) + self.module_version = None + + # real fields + self.valid = None + self.tee_tcb_svn = None + self.mrseam = None + self.mrsignerseam = None + self.attributes = None + self.tee_tcb_svn2 = None + self.reserved = None + + def parse(self, tdx_version): + """ + parse the raw data for Struct TEE_TCB_INFO + + Struct TEE_TCB_INFO's layout: + offset, len + 0x0, 0x08 valid + 0x8, 0x10 tee_tcb_svn + 0x18, 0x30 mrseam + 0x48, 0x30 mrsignerseam + 0x78, 0x08 attributes + + # fileds in tdx v1.0 + 0x80, 0x6f reserved + + # fileds in tdx v1.5 + 0x80, 0x10 tee_tcb_svn2 + 0x90, 0x5f reserved + + FIXME: need spec reference to update info # pylint: disable=fixme + about new field tee_tcb_svn2 + """ + offset = 0 + + self.valid, offset = self.get_bytes(offset, 0x8) + self.tee_tcb_svn, offset = self.get_bytes(offset, 0x10) + self.mrseam, offset = self.get_bytes(offset, 0x30) + self.mrsignerseam, offset = self.get_bytes(offset, 0x30) + self.attributes, offset = self.get_bytes(offset, 0x8) + + if tdx_version == TDX_VERSION_1_0: + self.reserved, offset = self.get_bytes(offset, 0x6f) + elif tdx_version == TDX_VERSION_1_5: + self.tee_tcb_svn2, offset = self.get_bytes(offset, 0x10) + self.reserved, offset = self.get_bytes(offset, 0x5f) + + # parse module svn + self.module_version, _ = ModuleVersion.from_bytes(self.tee_tcb_svn) + +class TdInfo(BinaryBlob): + """ + Struct TDINFO_STRUCT + """ + + def __init__(self, data): + super().__init__(data) + # read fields + self.attributes = None + self.xfam = None + self.mrtd = None + self.mrconfigid = None + self.mrowner = None + self.mrownerconfig = None + self.rtmr_0 = None + self.rtmr_1 = None + self.rtmr_2 = None + self.rtmr_3 = None + self.servtd_hash = None + self.reserved = None + + def parse(self, tdx_version): + ''' + parse the raw data for Struct TDINFO_STRUCT + + Struct TDINFO_STRUCT's layout: + offset, len + 0x0, 0x8 attributes + 0x8, 0x8 xfam + 0x10, 0x30 mrtd + 0x40, 0x30 mrconfigid + 0x70, 0x30 mrowner + 0xa0, 0x30 mrownerconfig + 0xd0, 0x30 rtmr_0 + 0x100, 0x30 rtmr_1 + 0x130, 0x30 rtmr_2 + 0x160, 0x30 rtmr_3 + + # fields in tdx v1.0 + 0x190, 0x70 reserved + + # fields in tdx v1.5 + 0x190, 0x30 servtd_hash + 0x1c0, 0x40 reserved + + ref: + Page 40 of IntelĀ® TDX Module v1.5 ABI Specification + from https://www.intel.com/content/www/us/en/developer/articles/technical/ + intel-trust-domain-extensions.html + ''' + + offset = 0 + + self.attributes, offset = self.get_bytes(offset, 0x8) + self.xfam, offset = self.get_bytes(offset, 0x8) + self.mrtd, offset = self.get_bytes(offset, 0x30) + self.mrconfigid, offset = self.get_bytes(offset, 0x30) + self.mrowner, offset = self.get_bytes(offset, 0x30) + self.mrownerconfig, offset = self.get_bytes(offset, 0x30) + self.rtmr_0, offset = self.get_bytes(offset, 0x30) + self.rtmr_1, offset = self.get_bytes(offset, 0x30) + self.rtmr_2, offset = self.get_bytes(offset, 0x30) + self.rtmr_3, offset = self.get_bytes(offset, 0x30) + + if tdx_version == TDX_VERSION_1_0: + self.reserved, offset = self.get_bytes(offset, 0x70) + elif tdx_version == TDX_VERSION_1_5: + self.servtd_hash, offset = self.get_bytes(offset, 0x30) + self.reserved, offset = self.get_bytes(offset, 0x40) + +class TdReport(BinaryBlob): + + def __init__(self, data): + super().__init__(data) + self.report_mac_struct = None + self.tee_tcb_info = None + self.reserved = None + self.td_info = None + + def parse(self, version): + """ + Parse structure + """ + offset = 0 + + data, offset = self.get_bytes(offset, 0x100) + self.report_mac_struct = ReportMacStruct(data) + self.report_mac_struct.parse() + + data, offset = self.get_bytes(offset, 0xef) + self.tee_tcb_info = TeeTcbInfo(data) + self.tee_tcb_info.parse(version) + + data, offset = self.get_bytes(offset, 0x11) + self.reserved = data + + data, offset = self.get_bytes(offset, 0x200) + self.td_info = TdInfo(data) + self.td_info.parse(version) + +class TdxReportReq(ABC): + + def __init__(self, ver): + self._version = ver + + @property + def version(self): + """ + The TDX version + """ + return self._version + + @abstractmethod + def prepare_reqbuf(self, report_data=None) -> bytearray: + """ + Return the request data for TDREPORT + """ + raise NotImplementedError("Should be implemented by inherited class") + + @abstractmethod + def process_output(self, rawdata) -> TdReport: + """ + Process response data from IOCTL or sysfs + """ + raise NotImplementedError("Should be implemented by inherited class") + +class TdxReportReq10(TdxReportReq): + + def __init__(self): + TdxReportReq.__init__(self, TDX_VERSION_1_0) + + def prepare_reqbuf(self, report_data=None) -> bytearray: + assert False, "Need implement later" + + def process_output(self, rawdata) -> TdReport: + assert False, "Need implement later" + +class TdxReportReq15(TdxReportReq): + + def __init__(self): + TdxReportReq.__init__(self, TDX_VERSION_1_5) + + def prepare_reqbuf(self, report_data=None) -> bytearray: + length = 0 + if report_data is not None: + length = len(report_data) + + if length > TDX_REPORTDATA_LEN: + LOG.error("Input report_data is longer than TDX_REPORTDATA_LEN") + return None + + req = bytearray(TDX_REPORTDATA_LEN + TDX_REPORT_LEN) + for index in range(length): + req[index] = report_data[index] + return req + + def process_output(self, rawdata) -> TdReport: + """ + Process response data from IOCTL or sysfs + """ + report_bytes = rawdata[TDX_REPORTDATA_LEN:] + report_obj = TdReport(report_bytes) + report_obj.parse(self.version) + return report_obj diff --git a/vmsdk/python/cc_imr_cli.py b/vmsdk/python/cc_imr_cli.py index 94706874..14b2e942 100644 --- a/vmsdk/python/cc_imr_cli.py +++ b/vmsdk/python/cc_imr_cli.py @@ -7,7 +7,7 @@ LOG = logging.getLogger(__name__) -logging.basicConfig(level=logging.NOTSET) +logging.basicConfig(level=logging.NOTSET, format='%(name)s %(levelname)-8s %(message)s') imr_inst = cctrusted.get_measurement([2, None]) diff --git a/vmsdk/python/cctrusted/api.py b/vmsdk/python/cctrusted/api.py index 3e70d73b..1fc6ec96 100644 --- a/vmsdk/python/cctrusted/api.py +++ b/vmsdk/python/cctrusted/api.py @@ -6,8 +6,8 @@ # pylint: disable=unused-import from cctrusted_base.imr import TcgIMR from cctrusted_base.tcg import TcgAlgorithmRegistry - -from .cvm import ConfidentialVM +from cctrusted_base.tdx.report import TdReport +from .cvm import ConfidentialVM, TdxVM LOG = logging.getLogger(__name__) @@ -16,6 +16,8 @@ def get_measurement(imr_select:[int, int]) -> TcgIMR: Get IMR register value according to given index """ cvm_inst = ConfidentialVM.inst() + cvm_inst.dump() + imr_index = imr_select[0] algo_id = imr_select[1] @@ -27,3 +29,13 @@ def get_measurement(imr_select:[int, int]) -> TcgIMR: algo_id = cvm_inst.default_algo_id return cvm_inst.imrs[imr_index].digest(algo_id) + +def get_tdx_report() -> TdReport: + """ + Get TDX Report + """ + cvm_inst = ConfidentialVM.inst() + cvm_inst.dump() + + assert isinstance(cvm_inst, TdxVM) + return cvm_inst.tdreport diff --git a/vmsdk/python/cctrusted/cvm.py b/vmsdk/python/cctrusted/cvm.py index 9a4c6ef3..dabf0feb 100644 --- a/vmsdk/python/cctrusted/cvm.py +++ b/vmsdk/python/cctrusted/cvm.py @@ -6,11 +6,15 @@ 3. IMR (integrated measurement register) """ - +import os import logging +import struct +import fcntl from abc import ABC, abstractmethod from cctrusted_base.imr import TcgIMR from cctrusted_base.tcg import TcgAlgorithmRegistry +from cctrusted_base.tdx.common import TDX_VERSION_1_0, TDX_VERSION_1_5 +from cctrusted_base.tdx.report import TdxReportReq10, TdxReportReq15 LOG = logging.getLogger(__name__) @@ -40,11 +44,17 @@ class ConfidentialVM: TYPE_CC_SEV = 1 TYPE_CC_CCA = 1 + TYPE_CC_STRING = { + TYPE_CC_TDX: "TDX", + TYPE_CC_SEV: "SEV", + TYPE_CC_CCA: "CCA" + } + _inst = None - def __init__(self): + def __init__(self, cctype): self._device_node:CcDeviceNode = None - self._cc_type:int = ConfidentialVM.TYPE_CC_NONE + self._cc_type:int = cctype self._is_init:bool = False self._imrs:dict[int, TcgIMR] = {} @@ -63,6 +73,14 @@ def default_algo_id(self): """ raise NotImplementedError("Should be implemented by inherited class") + @property + @abstractmethod + def version(self): + """ + Version of CC VM + """ + raise NotImplementedError("Should be implemented by inherited class") + @property def imrs(self) -> list[TcgIMR]: """ @@ -70,6 +88,13 @@ def imrs(self) -> list[TcgIMR]: """ return self._imrs + @property + def cc_type_str(self): + """ + Return the CC type string + """ + return ConfidentialVM.TYPE_CC_STRING[self.cc_type] + def init(self) -> bool: """ Initialize the CC stub and environment @@ -91,9 +116,11 @@ def detect_cc_type(): """ Detect the type of current confidential VM """ - # TODO: add code to detect CC type - LOG.info("Detect the CC type") - return ConfidentialVM.TYPE_CC_TDX + # TODO: refine the justification + for devpath in TdxVM.DEVICE_NODE_PATH.values(): + if os.path.exists(devpath): + return ConfidentialVM.TYPE_CC_TDX + return ConfidentialVM.TYPE_CC_NONE @abstractmethod def process_cc_report(self): @@ -109,6 +136,15 @@ def process_eventlog(self): """ raise NotImplementedError("Should be implemented by inherited class") + def dump(self): + """ + Dump confidential VM information + """ + LOG.info("======================================") + LOG.info("CVM type = %s", self.cc_type_str) + LOG.info("CVM version = %s", self.version) + LOG.info("======================================") + @staticmethod def inst(): """ @@ -126,18 +162,81 @@ def inst(): class TdxVM(ConfidentialVM): - def process_cc_report(self): + DEVICE_NODE_PATH = { + TDX_VERSION_1_0: "/dev/tdx-guest", + TDX_VERSION_1_5: "/dev/tdx_guest" + } + + IOCTL_GET_REPORT = { + TDX_VERSION_1_0: int.from_bytes(struct.pack('Hcb', 0x08c0, b'T', 1), 'big'), + TDX_VERSION_1_5: int.from_bytes(struct.pack('Hcb', 0x40c4, b'T', 1),'big') + } + + def __init__(self): + ConfidentialVM.__init__(self, ConfidentialVM.TYPE_CC_TDX) + self._version:str = None + self._tdreport = None + + @property + def version(self): + if self._version is None: + for key, value in TdxVM.DEVICE_NODE_PATH.items(): + if os.path.exists(value): + self._version = key + return self._version + + @property + def default_algo_id(self): + return TcgAlgorithmRegistry.TPM_ALG_SHA384 + + @property + def tdreport(self): + """ + return TDREPORT structure + """ + return self._tdreport + + def process_cc_report(self) -> bool: """ Process the confidential computing REPORT. """ + dev_path = self.DEVICE_NODE_PATH[self.version] + try: + tdx_dev = os.open(dev_path, os.O_RDWR) + except (PermissionError, IOError, OSError): + LOG.error("Fail to open device node %s", dev_path) + return False + + LOG.debug("Successful open device node %s", dev_path) + + if self.version is TDX_VERSION_1_0: + tdreport_req = TdxReportReq10() + elif self.version is TDX_VERSION_1_5: + tdreport_req = TdxReportReq15() + + # pylint: disable=E1111 + reqbuf = tdreport_req.prepare_reqbuf() + try: + fcntl.ioctl(tdx_dev, self.IOCTL_GET_REPORT[self.version], reqbuf) + except OSError: + LOG.error("Fail to execute ioctl for file %s", dev_path) + os.close(tdx_dev) + return False + + LOG.debug("Successful read TDREPORT from %s.", dev_path) + os.close(tdx_dev) + + # pylint: disable=E1111 + tdreport = tdreport_req.process_output(reqbuf) + if tdreport is not None: + LOG.debug("Successful parse TDREPORT.") + + # process IMR + self._tdreport = tdreport return True - def process_eventlog(self): + def process_eventlog(self) -> bool: """ Process the event log """ return True - - @property - def default_algo_id(self): - return TcgAlgorithmRegistry.TPM_ALG_SHA384 diff --git a/vmsdk/python/td_report_cli.py b/vmsdk/python/td_report_cli.py new file mode 100644 index 00000000..92454f0d --- /dev/null +++ b/vmsdk/python/td_report_cli.py @@ -0,0 +1,13 @@ + +""" +Command line to dump the integrated measurement register +""" +import logging +import cctrusted + +LOG = logging.getLogger(__name__) + +logging.basicConfig(level=logging.NOTSET, format='%(message)s') + +tdreport = cctrusted.get_tdx_report() +tdreport.dump()