Skip to content

Commit

Permalink
plugins/device_tree: Refactor and use AnalysisPluginV0
Browse files Browse the repository at this point in the history
Major Version bumped because the schema changed.
Defining the schema allowed us to clean up much of the logic.
While there is some arguable code left this is a step in the right
direction.
Also required a change in the architecture_detection plugin to
accomodate that the result can be None.
  • Loading branch information
maringuu committed Feb 22, 2024
1 parent 0fa2ed3 commit aa772bd
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 171 deletions.
6 changes: 5 additions & 1 deletion src/plugins/analysis/architecture_detection/internal/dt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ def _get_compatible_entry(dts: str) -> str | None:


def construct_result(file_object):
device_tree_result = file_object.processed_analysis['device_tree'].get('result', {})
if not device_tree_result:
return {}

result = {}
for dt_dict in file_object.processed_analysis['device_tree'].get('result', {}).get('device_trees', []):
for dt_dict in device_tree_result.get('device_trees', []):
dt = dt_dict['device_tree']

compatible_entry = _get_compatible_entry(dt)
Expand Down
102 changes: 68 additions & 34 deletions src/plugins/analysis/device_tree/code/device_tree.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,72 @@
from analysis.PluginBase import AnalysisBasePlugin
from __future__ import annotations

from helperFunctions.tag import TagColor
from objects.file import FileObject
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from analysis.plugin import AnalysisPluginV0, Tag
from typing import Optional, Dict, TYPE_CHECKING

from ..internal.device_tree_utils import dump_device_trees


class AnalysisPlugin(AnalysisBasePlugin):
"""
Device Tree Plug-in
"""

NAME = 'device_tree'
DESCRIPTION = 'get the device tree in text from the device tree blob'
VERSION = '1.0.1'
MIME_BLACKLIST = [*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'] # noqa: RUF012
FILE = __file__

def process_object(self, file_object: FileObject):
file_object.processed_analysis[self.NAME] = {'summary': []}

device_trees = dump_device_trees(file_object.binary)
if device_trees:
file_object.processed_analysis[self.NAME]['device_trees'] = device_trees
for result in device_trees:
model = result.get('model')
if model:
file_object.processed_analysis[self.NAME]['summary'].append(model)
self.add_analysis_tag(
file_object=file_object,
tag_name=self.NAME,
value=self.NAME.replace('_', ' '),
color=TagColor.ORANGE,
propagate=False,
)
from ..internal.schema import Schema
from ..internal.schema import DeviceTree, IllegalDeviceTreeError

if TYPE_CHECKING:
import io


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
def __init__(self):
metadata = AnalysisPluginV0.MetaData(
name='device_tree',
description='get the device tree in text from the device tree blob',
version='2.0.0',
system_version=None,
mime_blacklist=[*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'],
timeout=10,
Schema=Schema,
)
super().__init__(metadata=metadata)

def summarize(self, result: Schema) -> list[str]:
models = [device_tree.model for device_tree in result.device_trees if device_tree.model]

if not models:
return ['unknown-model']

return file_object
return models

def analyze(
self,
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: Dict[str, dict],
) -> Optional[Schema]:
del virtual_file_path, analyses

binary = file_handle.readall()

device_trees = []
offset = 0
while (offset := binary.find(DeviceTree.Header.MAGIC, offset)) >= 0:
try:
device_tree = DeviceTree.from_binary(binary, offset=offset)
# We found a valid device tree.
# Skip only the header because device trees may contain device trees themselves.
offset += DeviceTree.Header.SIZE
device_trees.append(device_tree)
except IllegalDeviceTreeError:
offset += 1

if len(device_trees) == 0:
return None

return Schema(device_trees=device_trees)

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del result, summary
return [
Tag(
name=self.metadata.name,
value='device tree',
color=TagColor.ORANGE,
),
]
103 changes: 8 additions & 95 deletions src/plugins/analysis/device_tree/internal/device_tree_utils.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,24 @@
from __future__ import annotations

import logging
from pathlib import Path
from subprocess import run
from tempfile import NamedTemporaryFile
from typing import NamedTuple
from typing import TYPE_CHECKING

from more_itertools import chunked
if TYPE_CHECKING:
from pathlib import Path

MAGIC = bytes.fromhex('D00DFEED')

HEADER_SIZE = 40


def _bytes_to_int(byte_str: list[int]) -> int:
return int.from_bytes(bytes(byte_str), byteorder='big')


class DeviceTreeHeader(NamedTuple):
# Based on https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#header
size: int
struct_block_offset: int
strings_block_offset: int
memory_map_offset: int
version: int
oldest_compatible_version: int
boot_cpu_id: int
strings_block_size: int
struct_block_size: int
def int_from_buf(buf: bytes, offset: int) -> int:
return int.from_bytes(buf[offset : offset + 4], byteorder='big')


class Property:
def __init__(self, raw: bytes, strings_by_offset: dict[int, bytes]):
# a property consists of a struct {uint32_t len; uint32_t nameoff;} followed by the value
# nameoff is an offset of the string in the strings block
# see also: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#lexical-structure
self.length = _bytes_to_int(list(raw[4:8]))
self.name_offset = _bytes_to_int(list(raw[8:12]))
self.length = int_from_buf(raw, 4)
self.name_offset = int_from_buf(raw, 8)
self.name = strings_by_offset.get(self.name_offset, None)
self.value = raw[12 : 12 + self.length].strip(b'\0')

Expand All @@ -61,24 +43,6 @@ def __iter__(self):
self.raw = self.raw[next_property_offset + prop.get_size() :]


def parse_dtb_header(raw: bytes) -> DeviceTreeHeader:
return DeviceTreeHeader(*[_bytes_to_int(chunk) for chunk in chunked(raw[4:HEADER_SIZE], 4)])


def header_has_illegal_values(header: DeviceTreeHeader, max_size: int) -> bool:
values = [
header.struct_block_offset,
header.strings_block_offset,
header.struct_block_size,
header.strings_block_size,
]
return (
header.version > 20 # noqa: PLR2004
or any(n > max_size or n > header.size for n in values)
or header.size > max_size
)


def convert_device_tree_to_str(file_path: str | Path) -> str | None:
process = run(f'dtc -I dtb -O dts {file_path}', shell=True, capture_output=True)
if process.returncode != 0:
Expand All @@ -89,62 +53,11 @@ def convert_device_tree_to_str(file_path: str | Path) -> str | None:
return process.stdout.decode(errors='replace').strip()


def dump_device_trees(raw: bytes) -> list[dict]:
total_offset = 0
dumped_device_trees = []

while MAGIC in raw:
offset = raw.find(MAGIC)
raw = raw[offset:]
total_offset += offset

json_result = analyze_device_tree(raw)
if json_result:
json_result['offset'] = total_offset
dumped_device_trees.append(json_result)

# only skip HEADER_SIZE ahead because device trees might be inside other device trees
raw = raw[HEADER_SIZE:]
total_offset += HEADER_SIZE

return dumped_device_trees


def analyze_device_tree(raw: bytes) -> dict | None:
header = parse_dtb_header(raw)
if header_has_illegal_values(header, len(raw)):
return None # probably false positive

device_tree = raw[: header.size]
strings_block = device_tree[header.strings_block_offset : header.strings_block_offset + header.strings_block_size]
structure_block = device_tree[header.struct_block_offset : header.struct_block_offset + header.struct_block_size]
strings_by_offset = {strings_block.find(s): s for s in strings_block.split(b'\0') if s}
description, model = _get_model_or_description(StructureBlock(structure_block, strings_by_offset))

with NamedTemporaryFile(mode='wb') as temp_file:
Path(temp_file.name).write_bytes(device_tree)
string_representation = convert_device_tree_to_str(temp_file.name)
if string_representation:
return _result_to_json(header, string_representation, model, description)
return None


def _get_model_or_description(structure_block: StructureBlock):
def get_model_or_description(structure_block: StructureBlock):
model, description = None, None
for prop in structure_block:
if prop.name == b'model':
model = prop.value.decode(errors='replace')
if not description and prop.name == b'description':
description = prop.value.decode(errors='replace')
return description, model


def _result_to_json(
header: DeviceTreeHeader, string_representation: str, model: str | None, description: str | None
) -> dict:
return {
'header': header._asdict(),
'device_tree': string_representation,
'model': model,
'description': description,
}
Loading

0 comments on commit aa772bd

Please sign in to comment.