Skip to content

Commit

Permalink
initial commit for VM SDK ccc meeting sample
Browse files Browse the repository at this point in the history
  • Loading branch information
hairongchen committed Dec 6, 2023
1 parent 3374839 commit cf0ebcb
Show file tree
Hide file tree
Showing 15 changed files with 1,750 additions and 0 deletions.
8 changes: 8 additions & 0 deletions samples/CCTrustedInspect/get_eventlog_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sys
sys.path.append("../..")

from vmsdk.python.cctrusted.eventlog import Eventlog

eventlog = Eventlog.get_eventlog()

eventlog.dump_td_event_log_by_index(89)
9 changes: 9 additions & 0 deletions samples/CCTrustedInspect/get_quote_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import sys
sys.path.append("../..")

from vmsdk.python.cctrusted.quote import Quote

nonce = bytes("test_nonce",'utf-8')
data = bytes("test_data",'utf-8')
tdquote = Quote.get_quote(nonce,data)
tdquote.dump()
10 changes: 10 additions & 0 deletions samples/CCTrustedInspect/get_rtmr_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys
sys.path.append("../..")

from vmsdk.python.cctrusted.measurement import Measurement

rtmrs = Measurement.get_measurement()

rtmrs.dump_rtmrs()
rtmrs.dump_rtmrs_by_index(2)

Empty file added utils/tdx/__init__.py
Empty file.
245 changes: 245 additions & 0 deletions utils/tdx/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
"""
Actors package, the bussiness logic layer.
"""

import os
import logging
from typing import Dict, List
from hashlib import sha384

from .rtmr import RTMR
from .tdreport import TdReport
from .tdeventlog import TDEventLogEntry, TDEventLogType, TDEventLogSpecIdHeader
from .ccel import CCEL
from .binaryblob import BinaryBlob

__author__ = ""

LOG = logging.getLogger(__name__)


# pylint: disable=too-few-public-methods
class VerifyActor:
"""
Actor to verify the RTMR
"""

def _verify_single_rtmr(self, rtmr_index: int, rtmr_value_1: RTMR,
rtmr_value_2: RTMR) -> None:

if rtmr_value_1 == rtmr_value_2:
LOG.info("RTMR[%d] passed the verification.", rtmr_index)
else:
LOG.error("RTMR[%d] did not pass the verification", rtmr_index)

def verify_rtmr(self) -> None:
"""
Get TD report and RTMR replayed by event log to do verification.
"""
# 1. Read CCEL from ACPI table at /sys/firmware/acpi/tables/CCEL
ccelobj = CCEL.create_from_acpi_file()
if ccelobj is None:
return

# 2. Get the start address and length for event log area
td_event_log_actor = TDEventLogActor(
ccelobj.log_area_start_address,
ccelobj.log_area_minimum_length)

# 3. Collect event log and replay the RTMR value according to event log
td_event_log_actor.replay()

# 4. Read TD REPORT via TDCALL.GET_TDREPORT
td_report = TdReport.get_td_report()

# 5. Verify individual RTMR value from TDREPORT and recalculated from
# event log
self._verify_single_rtmr(
0,
td_event_log_actor.get_rtmr_by_index(0),
RTMR(bytearray(td_report.td_info.rtmr_0)))

self._verify_single_rtmr(
1,
td_event_log_actor.get_rtmr_by_index(1),
RTMR(bytearray(td_report.td_info.rtmr_1)))

self._verify_single_rtmr(
2,
td_event_log_actor.get_rtmr_by_index(2),
RTMR(bytearray(td_report.td_info.rtmr_2)))

self._verify_single_rtmr(
3,
td_event_log_actor.get_rtmr_by_index(3),
RTMR(bytearray(td_report.td_info.rtmr_3)))


# pylint: disable=too-few-public-methods
class TDEventLogActor:
"""
Event log actor
"""

def __init__(self, base, length):
self._data = None
self._log_base = base
self._log_length = length
self._specid_header = None
self._event_logs = []
self._rtmrs = {}

def _read(self, ccel_file="/sys/firmware/acpi/tables/data/CCEL"):
assert os.path.exists(ccel_file), f"Could not find the CCEL file {ccel_file}"
try:
with open(ccel_file, "rb") as fobj:
self._data = fobj.read()
assert len(self._data) > 0
return self._data
except (PermissionError, OSError):
LOG.error("Need root permission to open file %s", ccel_file)
return None

@staticmethod
def _replay_single_rtmr(event_logs: List[TDEventLogEntry]) -> RTMR:
rtmr = bytearray(RTMR.RTMR_LENGTH_BY_BYTES)

for event_log in event_logs:
digest = event_log.digests[0]
sha384_algo = sha384()
sha384_algo.update(rtmr + digest)
rtmr = sha384_algo.digest()

return RTMR(rtmr)

def get_rtmr_by_index(self, index: int) -> RTMR:
"""
Get RTMR by TD register index
"""
return self._rtmrs[index]

def process(self) -> None:
"""
Factory process raw data and generate entries
"""
if self._specid_header is not None:
return

if self._read() is None:
return

index = 0
count = 0
blob = BinaryBlob(self._data, self._log_base)

while index < self._log_length:
start = index
rtmr, index = blob.get_uint32(index)
etype, index = blob.get_uint32(index)

if rtmr == 0xFFFFFFFF:
break

if etype == TDEventLogType.EV_NO_ACTION:
self._specid_header = TDEventLogSpecIdHeader(
self._log_base + start)
self._specid_header.parse(self._data[start:])
index = start + self._specid_header.length
else:
event_log_obj = TDEventLogEntry(self._log_base + start,
self._specid_header)
event_log_obj.parse(self._data[start:])
index = start + event_log_obj.length
self._event_logs.append(event_log_obj)

count += 1

def replay(self) -> Dict[int, RTMR]:
"""
Replay event logs to generate RTMR value, which will be used during
verification
"""
self.process()

# result dictionary for classifying event logs by rtmr index
# the key is a integer, which represents rtmr index
# the value is a list of event log entries whose rtmr index is equal to
# its related key
event_logs_by_index = {}
for index in range(RTMR.RTMR_COUNT):
event_logs_by_index[index] = []

for event_log in self._event_logs:
event_logs_by_index[event_log.rtmr].append(event_log)

rtmr_by_index = {}
for rtmr_index, event_logs in event_logs_by_index.items():
rtmr_value = TDEventLogActor._replay_single_rtmr(event_logs)
rtmr_by_index[rtmr_index] = rtmr_value

self._rtmrs = rtmr_by_index

def dump_td_event_logs(self) -> None:
"""
Dump all TD event logs.
"""
self.process()

count, start = 0, 0

LOG.info("==== TDX Event Log Entry - %d [0x%X] ====",
count, self._log_base + start)
self._specid_header.dump()
count += 1
start += self._specid_header.length

for event_log in self._event_logs:
LOG.info("==== TDX Event Log Entry - %d [0x%X] ====",
count, self._log_base + start)
event_log.dump()
count += 1
start += event_log.length

def dump_td_event_log_by_index(self,index) -> None:
"""
Dump all TD event logs.
"""
self.process()

count, start = 0, 0

LOG.info("==== TDX Event Log Entry - %d [0x%X] ====",
count, self._log_base + start)
self._specid_header.dump()
count += 1
start += self._specid_header.length

event_log = self._event_logs[index]:
LOG.info("==== TDX Event Log Entry - %d [0x%X] ====",
count, self._log_base + start)
event_log.dump()
count += 1
start += event_log.length

def dump_rtmrs_by_index(self,index) -> None:
"""
Dump RTMRs replayed by event log.
"""
self.replay()

for rtmr_index, rtmr in self._rtmrs.items():
if index == rtmr_index:
LOG.info("==== RTMR[%d] ====", rtmr_index)
rtmr.dump()
LOG.info("")

def dump_rtmrs(self) -> None:
"""
Dump RTMRs replayed by event log.
"""
self.replay()

for rtmr_index, rtmr in self._rtmrs.items():
LOG.info("==== RTMR[%d] ====", rtmr_index)
rtmr.dump()
LOG.info("")
113 changes: 113 additions & 0 deletions utils/tdx/binaryblob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""
Manage the binary blob
"""
import logging
import string
import struct

LOG = logging.getLogger(__name__)

__author__ = ""


class BinaryBlob:
"""
Manage the binary blob.
"""

def __init__(self, data, base=0):
self._data = data
self._base_address = base

@property
def length(self):
"""
Length of binary in bytes
"""
return len(self._data)

@property
def data(self):
"""
Raw data of binary blob
"""
return self._data

def to_hex_string(self):
"""
To hex string
"""
return "".join(f"{b:02x}" % b for b in self._data)

def get_uint16(self, pos):
"""
Get UINT16 integer
"""
assert pos + 2 <= self.length
return (struct.unpack("<H", self.data[pos:pos + 2])[0], pos + 2)

def get_uint8(self, pos):
"""
Get UINT8 integer
"""
assert pos + 1 <= self.length
return (self.data[pos], pos + 1)

def get_uint32(self, pos):
"""
Get UINT32 integer
"""
assert pos + 4 <= self.length
return (struct.unpack("<L", self.data[pos:pos + 4])[0], pos + 4)

def get_uint64(self, pos):
"""
Get UINT64 integer
"""
assert pos + 8 <= self.length
return (struct.unpack("<Q", self.data[pos:pos + 8])[0], pos + 8)

def get_bytes(self, pos, count):
"""
Get bytes
"""
if count == 0:
return None
assert pos + count <= self.length
return (self.data[pos:pos + count], pos + count)

def dump(self):
"""
Dump Hex value
"""
index = 0
linestr = ""
printstr = ""

while index < self.length:
if (index % 16) == 0:
if len(linestr) != 0:
LOG.info("%s %s", linestr, printstr)
printstr = ''
# line prefix string
# pylint: disable=consider-using-f-string
linestr = "{0:08X} ".format(int(index / 16) * 16 + \
self._base_address)

# pylint: disable=consider-using-f-string
linestr += "{0:02X} ".format(self._data[index])
if chr(self._data[index]) in set(string.printable) and \
self._data[index] not in [0xC, 0xB, 0xA, 0xD, 0x9]:
printstr += chr(self._data[index])
else:
printstr += '.'

index += 1

if (index % 16) != 0:
blank = ""
for _ in range(16 - index % 16):
blank = blank + " "
LOG.info("%s%s %s", linestr, blank, printstr)
elif index == self.length:
LOG.info("%s %s", linestr, printstr)
Loading

0 comments on commit cf0ebcb

Please sign in to comment.