diff --git a/src/plugins/analysis/architecture_detection/internal/dt.py b/src/plugins/analysis/architecture_detection/internal/dt.py index b234ad825..81de3e698 100644 --- a/src/plugins/analysis/architecture_detection/internal/dt.py +++ b/src/plugins/analysis/architecture_detection/internal/dt.py @@ -72,8 +72,12 @@ def _get_compatible_entry(dts: str) -> str | None: def construct_result(file_object): + device_tree_result = file_object.processed_analysis['device_tree'].get('result', {}) + if not device_tree_result: + return {} + result = {} - for dt_dict in file_object.processed_analysis['device_tree'].get('result', {}).get('device_trees', []): + for dt_dict in device_tree_result.get('device_trees', []): dt = dt_dict['device_tree'] compatible_entry = _get_compatible_entry(dt) diff --git a/src/plugins/analysis/device_tree/code/device_tree.py b/src/plugins/analysis/device_tree/code/device_tree.py index dc6bf47ef..ff9103b46 100644 --- a/src/plugins/analysis/device_tree/code/device_tree.py +++ b/src/plugins/analysis/device_tree/code/device_tree.py @@ -1,38 +1,72 @@ -from analysis.PluginBase import AnalysisBasePlugin +from __future__ import annotations + from helperFunctions.tag import TagColor -from objects.file import FileObject from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED +from analysis.plugin.compat import AnalysisBasePluginAdapterMixin +from analysis.plugin import AnalysisPluginV0, Tag +from typing import Optional, Dict, TYPE_CHECKING -from ..internal.device_tree_utils import dump_device_trees - - -class AnalysisPlugin(AnalysisBasePlugin): - """ - Device Tree Plug-in - """ - - NAME = 'device_tree' - DESCRIPTION = 'get the device tree in text from the device tree blob' - VERSION = '1.0.1' - MIME_BLACKLIST = [*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'] # noqa: RUF012 - FILE = __file__ - - def process_object(self, file_object: FileObject): - file_object.processed_analysis[self.NAME] = {'summary': []} - - device_trees = dump_device_trees(file_object.binary) - if device_trees: - file_object.processed_analysis[self.NAME]['device_trees'] = device_trees - for result in device_trees: - model = result.get('model') - if model: - file_object.processed_analysis[self.NAME]['summary'].append(model) - self.add_analysis_tag( - file_object=file_object, - tag_name=self.NAME, - value=self.NAME.replace('_', ' '), - color=TagColor.ORANGE, - propagate=False, - ) +from ..internal.schema import Schema +from ..internal.schema import DeviceTree, IllegalDeviceTreeError + +if TYPE_CHECKING: + import io + + +class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): + def __init__(self): + metadata = AnalysisPluginV0.MetaData( + name='device_tree', + description='get the device tree in text from the device tree blob', + version='2.0.0', + system_version=None, + mime_blacklist=[*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'], + timeout=10, + Schema=Schema, + ) + super().__init__(metadata=metadata) + + def summarize(self, result: Schema) -> list[str]: + models = [device_tree.model for device_tree in result.device_trees if device_tree.model] + + if not models: + return ['unknown-model'] - return file_object + return models + + def analyze( + self, + file_handle: io.FileIO, + virtual_file_path: dict, + analyses: Dict[str, dict], + ) -> Optional[Schema]: + del virtual_file_path, analyses + + binary = file_handle.readall() + + device_trees = [] + offset = 0 + while (offset := binary.find(DeviceTree.Header.MAGIC, offset)) >= 0: + try: + device_tree = DeviceTree.from_binary(binary, offset=offset) + # We found a valid device tree. + # Skip only the header because device trees may contain device trees themselves. + offset += DeviceTree.Header.SIZE + device_trees.append(device_tree) + except IllegalDeviceTreeError: + offset += 1 + + if len(device_trees) == 0: + return None + + return Schema(device_trees=device_trees) + + def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]: + del result, summary + return [ + Tag( + name=self.metadata.name, + value='device tree', + color=TagColor.ORANGE, + ), + ] diff --git a/src/plugins/analysis/device_tree/internal/device_tree_utils.py b/src/plugins/analysis/device_tree/internal/device_tree_utils.py index 9bedd5f65..0b737d9ce 100644 --- a/src/plugins/analysis/device_tree/internal/device_tree_utils.py +++ b/src/plugins/analysis/device_tree/internal/device_tree_utils.py @@ -1,33 +1,15 @@ from __future__ import annotations import logging -from pathlib import Path from subprocess import run -from tempfile import NamedTemporaryFile -from typing import NamedTuple +from typing import TYPE_CHECKING -from more_itertools import chunked +if TYPE_CHECKING: + from pathlib import Path -MAGIC = bytes.fromhex('D00DFEED') -HEADER_SIZE = 40 - - -def _bytes_to_int(byte_str: list[int]) -> int: - return int.from_bytes(bytes(byte_str), byteorder='big') - - -class DeviceTreeHeader(NamedTuple): - # Based on https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#header - size: int - struct_block_offset: int - strings_block_offset: int - memory_map_offset: int - version: int - oldest_compatible_version: int - boot_cpu_id: int - strings_block_size: int - struct_block_size: int +def int_from_buf(buf: bytes, offset: int) -> int: + return int.from_bytes(buf[offset : offset + 4], byteorder='big') class Property: @@ -35,8 +17,8 @@ def __init__(self, raw: bytes, strings_by_offset: dict[int, bytes]): # a property consists of a struct {uint32_t len; uint32_t nameoff;} followed by the value # nameoff is an offset of the string in the strings block # see also: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#lexical-structure - self.length = _bytes_to_int(list(raw[4:8])) - self.name_offset = _bytes_to_int(list(raw[8:12])) + self.length = int_from_buf(raw, 4) + self.name_offset = int_from_buf(raw, 8) self.name = strings_by_offset.get(self.name_offset, None) self.value = raw[12 : 12 + self.length].strip(b'\0') @@ -61,24 +43,6 @@ def __iter__(self): self.raw = self.raw[next_property_offset + prop.get_size() :] -def parse_dtb_header(raw: bytes) -> DeviceTreeHeader: - return DeviceTreeHeader(*[_bytes_to_int(chunk) for chunk in chunked(raw[4:HEADER_SIZE], 4)]) - - -def header_has_illegal_values(header: DeviceTreeHeader, max_size: int) -> bool: - values = [ - header.struct_block_offset, - header.strings_block_offset, - header.struct_block_size, - header.strings_block_size, - ] - return ( - header.version > 20 # noqa: PLR2004 - or any(n > max_size or n > header.size for n in values) - or header.size > max_size - ) - - def convert_device_tree_to_str(file_path: str | Path) -> str | None: process = run(f'dtc -I dtb -O dts {file_path}', shell=True, capture_output=True) if process.returncode != 0: @@ -89,47 +53,7 @@ def convert_device_tree_to_str(file_path: str | Path) -> str | None: return process.stdout.decode(errors='replace').strip() -def dump_device_trees(raw: bytes) -> list[dict]: - total_offset = 0 - dumped_device_trees = [] - - while MAGIC in raw: - offset = raw.find(MAGIC) - raw = raw[offset:] - total_offset += offset - - json_result = analyze_device_tree(raw) - if json_result: - json_result['offset'] = total_offset - dumped_device_trees.append(json_result) - - # only skip HEADER_SIZE ahead because device trees might be inside other device trees - raw = raw[HEADER_SIZE:] - total_offset += HEADER_SIZE - - return dumped_device_trees - - -def analyze_device_tree(raw: bytes) -> dict | None: - header = parse_dtb_header(raw) - if header_has_illegal_values(header, len(raw)): - return None # probably false positive - - device_tree = raw[: header.size] - strings_block = device_tree[header.strings_block_offset : header.strings_block_offset + header.strings_block_size] - structure_block = device_tree[header.struct_block_offset : header.struct_block_offset + header.struct_block_size] - strings_by_offset = {strings_block.find(s): s for s in strings_block.split(b'\0') if s} - description, model = _get_model_or_description(StructureBlock(structure_block, strings_by_offset)) - - with NamedTemporaryFile(mode='wb') as temp_file: - Path(temp_file.name).write_bytes(device_tree) - string_representation = convert_device_tree_to_str(temp_file.name) - if string_representation: - return _result_to_json(header, string_representation, model, description) - return None - - -def _get_model_or_description(structure_block: StructureBlock): +def get_model_or_description(structure_block: StructureBlock): model, description = None, None for prop in structure_block: if prop.name == b'model': @@ -137,14 +61,3 @@ def _get_model_or_description(structure_block: StructureBlock): if not description and prop.name == b'description': description = prop.value.decode(errors='replace') return description, model - - -def _result_to_json( - header: DeviceTreeHeader, string_representation: str, model: str | None, description: str | None -) -> dict: - return { - 'header': header._asdict(), - 'device_tree': string_representation, - 'model': model, - 'description': description, - } diff --git a/src/plugins/analysis/device_tree/internal/schema.py b/src/plugins/analysis/device_tree/internal/schema.py new file mode 100644 index 000000000..c43daf66c --- /dev/null +++ b/src/plugins/analysis/device_tree/internal/schema.py @@ -0,0 +1,143 @@ +import pydantic +from pydantic import Field +from typing import Optional, List, ClassVar +from tempfile import NamedTemporaryFile +import pathlib as pl + +from .device_tree_utils import StructureBlock, convert_device_tree_to_str, get_model_or_description, int_from_buf + + +class IllegalDeviceTreeError(ValueError): + pass + + +class IllegalHeaderError(IllegalDeviceTreeError): + pass + + +class DeviceTree(pydantic.BaseModel): + class Header(pydantic.BaseModel): + """The devicetree header as described in [1]. + + [1]: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#header + """ + + SIZE: ClassVar[int] = 40 + MAX_VERSION: ClassVar[int] = 20 + MAGIC: ClassVar[bytes] = bytes.fromhex('D00DFEED') + + magic: int + totalsize: int + off_dt_struct: int + off_dt_strings: int + off_mem_rsvmap: int + version: int + last_comp_version: int + boot_cpuid_phys: int + size_dt_strings: int + size_dt_struct: int + + @classmethod + def from_binary(cls, binary: bytes): + """Given the whole device tree binary parses the header and does some sanity checks.""" + if len(binary) < cls.SIZE: + raise IllegalHeaderError( + f'Given header has size {len(binary)} but it should be at least {cls.SIZE}', + ) + header = cls( + magic=int_from_buf(binary, 0), + totalsize=int_from_buf(binary, 4), + off_dt_struct=int_from_buf(binary, 8), + off_dt_strings=int_from_buf(binary, 12), + off_mem_rsvmap=int_from_buf(binary, 16), + version=int_from_buf(binary, 20), + last_comp_version=int_from_buf(binary, 24), + boot_cpuid_phys=int_from_buf(binary, 28), + size_dt_strings=int_from_buf(binary, 32), + size_dt_struct=int_from_buf(binary, 36), + ) + + if header.version > cls.MAX_VERSION: + raise IllegalHeaderError(f'Version may not exceed {cls.MAX_VERSION} but is {header.version}.') + + dt_len = len(binary) + if header.totalsize > dt_len: + raise IllegalHeaderError( + f'Value {header.totalsize} for totalsize is larger than the whole device tree.' + ) + if header.size_dt_strings > dt_len: + raise IllegalHeaderError( + f'Value {header.size_dt_strings} for size_dt_strings is larger than the whole device tree.' + ) + if header.off_dt_strings > dt_len: + raise IllegalHeaderError( + f'Value {header.off_dt_strings} for off_dt_strings is larger than the whole device tree.' + ) + if header.size_dt_struct > dt_len: + raise IllegalHeaderError( + f'Value {header.size_dt_struct} for size_dt_struct is larger than the whole device tree.' + ) + if header.off_dt_struct > dt_len: + raise IllegalHeaderError( + f'Value {header.off_dt_struct} for off_dt_struct is larger than the whole device tree.' + ) + + return header + + offset: int = Field( + description='The offset where the device tree is located in the file.', + ) + header: Header = Field( + description=( + 'The struct as described in ' + 'https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#header ' + 'except it is missing the magic field.' + ), + ) + string: str = Field( + description='The whole device tree in string format.', + ) + model: Optional[str] = Field( + description=( + 'The model as described in the spec.\n' + 'https://devicetree-specification.readthedocs.io/en/latest/chapter2-devicetree-basics.html?highlight=model#model' + ), + ) + description: Optional[str] = Field() + + @classmethod + def from_binary(cls, binary: bytes, offset: int = 0): + """Given a binary and an offset into that binary constructs an instance of DeviceTree. + Raises IllegalDeviceTreeError for nonsensical device trees. + """ + binary = binary[offset:] + if not binary.startswith(DeviceTree.Header.MAGIC): + raise IllegalDeviceTreeError('Binary does not start with the right magic.') + + header = DeviceTree.Header.from_binary(binary) + + device_tree = binary[: header.totalsize] + strings_block = device_tree[header.off_dt_strings :][: header.size_dt_strings] + structure_block = device_tree[header.off_dt_struct :][: header.size_dt_struct] + + strings_by_offset = {strings_block.find(s): s for s in strings_block.split(b'\0') if s} + description, model = get_model_or_description(StructureBlock(structure_block, strings_by_offset)) + + with NamedTemporaryFile(mode='wb') as temp_file: + pl.Path(temp_file.name).write_bytes(device_tree) + string_representation = convert_device_tree_to_str(temp_file.name) + + if not string_representation: + raise IllegalDeviceTreeError('dtc could not parse the device tree') + + return cls( + header=header, + string=string_representation, + model=model, + description=description, + offset=offset, + ) + + +class Schema(pydantic.BaseModel): + device_trees: List[DeviceTree] diff --git a/src/plugins/analysis/device_tree/requirements.txt b/src/plugins/analysis/device_tree/requirements.txt deleted file mode 100644 index 3b92c43a5..000000000 --- a/src/plugins/analysis/device_tree/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -more_itertools==9.0.0 diff --git a/src/plugins/analysis/device_tree/test/test_device_tree.py b/src/plugins/analysis/device_tree/test/test_device_tree.py index 6121ba4fb..2357f3fcc 100644 --- a/src/plugins/analysis/device_tree/test/test_device_tree.py +++ b/src/plugins/analysis/device_tree/test/test_device_tree.py @@ -1,11 +1,10 @@ +import io from pathlib import Path import pytest -from objects.file import FileObject from ..code.device_tree import AnalysisPlugin -from ..internal.device_tree_utils import dump_device_trees TEST_DATA = Path(__file__).parent.parent / 'test/data' TEST_FILE = TEST_DATA / 'device_tree.dtb' @@ -16,29 +15,28 @@ @pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) -def test_process_object(analysis_plugin): - test_object = FileObject() - test_object.binary = TEST_FILE.read_bytes() - test_object.file_path = str(TEST_FILE) - processed_object = analysis_plugin.process_object(test_object) - result = processed_object.processed_analysis[analysis_plugin.NAME] +def test_analyze(analysis_plugin): + result = analysis_plugin.analyze(io.FileIO(TEST_FILE), {}, {}) + summary = analysis_plugin.summarize(result) - assert len(result['device_trees']) == 1 - assert result['device_trees'][0]['model'] == 'Manufac XYZ1234ABC' - assert result['summary'] == ['Manufac XYZ1234ABC'] + assert len(result.device_trees) == 1 + assert result.device_trees[0].model == 'Manufac XYZ1234ABC' + assert summary == ['Manufac XYZ1234ABC'] +@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) @pytest.mark.parametrize('file', [TEST_EMBEDDED, TEST_IMAGE]) -def test_dump_device_trees(file): - result = dump_device_trees(file.read_bytes()) - assert len(result) == 2 # noqa: PLR2004 - for dt_dict in result: - assert 'foo = "bar";' in dt_dict['device_tree'] - assert dt_dict['header']['version'] == 17 # noqa: PLR2004 - assert dt_dict['model'] in ['DeviceTreeTest-1', 'FooBar 1.0'] +def test_multiple_device_trees(file, analysis_plugin): + result = analysis_plugin.analyze(io.FileIO(file), {}, {}) + assert len(result.device_trees) == 2 # noqa: PLR2004 + for device_tree in result.device_trees: + assert 'foo = "bar";' in device_tree.string + assert device_tree.header.version == 17 # noqa: PLR2004 + assert device_tree.model in ['DeviceTreeTest-1', 'FooBar 1.0'] +@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) @pytest.mark.parametrize('file', [TEST_FP, TEST_BROKEN]) -def test_no_results(file): - result = dump_device_trees(file.read_bytes()) - assert len(result) == 0 +def test_no_device_trees(file, analysis_plugin): + result = analysis_plugin.analyze(io.FileIO(file), {}, {}) + assert result is None diff --git a/src/plugins/analysis/device_tree/view/device_tree.html b/src/plugins/analysis/device_tree/view/device_tree.html index 34b8bab4e..2c4975875 100644 --- a/src/plugins/analysis/device_tree/view/device_tree.html +++ b/src/plugins/analysis/device_tree/view/device_tree.html @@ -40,31 +40,18 @@ {% block analysis_result_details %} - -{# Don't break on legacy result #} -{% if "device_tree" in analysis_result %} -
{{ key | replace_underscore }} | {{ value | nice_number }} | @@ -72,7 +59,7 @@