Skip to content

Commit

Permalink
plugins/device_tree: Refactor and use AnalysisPluginV0
Browse files Browse the repository at this point in the history
Version bumped because unknown-model is now also part of the summary.
Defining the schema allowed us to clean up much of the logic.
While there is some arguable code left this is a step in the right
direction.
Also required a change in the architecture_detection plugin to
accomodate that the result can be None.
  • Loading branch information
maringuu committed Oct 12, 2023
1 parent 04e334a commit 02638ec
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 162 deletions.
6 changes: 5 additions & 1 deletion src/plugins/analysis/architecture_detection/internal/dt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ def _get_compatible_entry(dts: str) -> str | None:


def construct_result(file_object):
device_tree_result = file_object.processed_analysis['device_tree'].get('result', {})
if not device_tree_result:
return {}

result = {}
for dt_dict in file_object.processed_analysis['device_tree'].get('result', {}).get('device_trees', []):
for dt_dict in device_tree_result.get('device_trees', []):
dt = dt_dict['device_tree']

compatible_entry = _get_compatible_entry(dt)
Expand Down
105 changes: 71 additions & 34 deletions src/plugins/analysis/device_tree/code/device_tree.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,75 @@
from analysis.PluginBase import AnalysisBasePlugin
from __future__ import annotations

from helperFunctions.tag import TagColor
from objects.file import FileObject
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from analysis.plugin import AnalysisPluginV0, Tag
from typing import Optional, Dict, TYPE_CHECKING

from ..internal.device_tree_utils import dump_device_trees


class AnalysisPlugin(AnalysisBasePlugin):
"""
Device Tree Plug-in
"""

NAME = 'device_tree'
DESCRIPTION = 'get the device tree in text from the device tree blob'
VERSION = '1.0.1'
MIME_BLACKLIST = [*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'] # noqa: RUF012
FILE = __file__

def process_object(self, file_object: FileObject):
file_object.processed_analysis[self.NAME] = {'summary': []}

device_trees = dump_device_trees(file_object.binary)
if device_trees:
file_object.processed_analysis[self.NAME]['device_trees'] = device_trees
for result in device_trees:
model = result.get('model')
if model:
file_object.processed_analysis[self.NAME]['summary'].append(model)
self.add_analysis_tag(
file_object=file_object,
tag_name=self.NAME,
value=self.NAME.replace('_', ' '),
color=TagColor.ORANGE,
propagate=False,
)
from ..internal.schema import Schema
from ..internal.schema import DeviceTree, IllegalDeviceTreeError

if TYPE_CHECKING:
import io


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
def __init__(self):
metadata = AnalysisPluginV0.MetaData(
name='device_tree',
description='get the device tree in text from the device tree blob',
version='1.1.0',
system_version=None,
mime_blacklist=[*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'],
timeout=10,
Schema=Schema,
)
super().__init__(metadata=metadata)

def summarize(self, result: Schema) -> list[str]:
models = [device_tree.model for device_tree in result.device_trees if device_tree.model]

if not models:
return ['unknown-model']

return models

return file_object
def analyze(
self,
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: Dict[str, dict],
) -> Optional[Schema]:
del virtual_file_path, analyses

binary = file_handle.readall()

offset = binary.find(DeviceTree.Header.MAGIC, 0)
device_trees = []

while offset >= 0:
try:
device_tree = DeviceTree.from_binary(binary, offset=offset)
# We found a valid device tree.
# Skip only the header because device trees may contain devicetrees tehmselves.
offset += DeviceTree.Header.SIZE
device_trees.append(device_tree)
except IllegalDeviceTreeError:
offset += 1

offset = binary.find(DeviceTree.Header.MAGIC, offset)

if len(device_trees) == 0:
return None

return Schema(device_trees=device_trees)

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del result, summary
return [
Tag(
name=self.metadata.name,
value='device tree',
color=TagColor.ORANGE,
),
]
95 changes: 4 additions & 91 deletions src/plugins/analysis/device_tree/internal/device_tree_utils.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,17 @@
from __future__ import annotations

import logging
from pathlib import Path
from subprocess import run
from tempfile import NamedTemporaryFile
from typing import NamedTuple
from typing import TYPE_CHECKING

from more_itertools import chunked

MAGIC = bytes.fromhex('D00DFEED')

HEADER_SIZE = 40
if TYPE_CHECKING:
from pathlib import Path


def _bytes_to_int(byte_str: list[int]) -> int:
return int.from_bytes(bytes(byte_str), byteorder='big')


class DeviceTreeHeader(NamedTuple):
# Based on https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#header
size: int
struct_block_offset: int
strings_block_offset: int
memory_map_offset: int
version: int
oldest_compatible_version: int
boot_cpu_id: int
strings_block_size: int
struct_block_size: int


class Property:
def __init__(self, raw: bytes, strings_by_offset: dict[int, bytes]):
# a property consists of a struct {uint32_t len; uint32_t nameoff;} followed by the value
Expand Down Expand Up @@ -61,24 +43,6 @@ def __iter__(self):
self.raw = self.raw[next_property_offset + prop.get_size() :]


def parse_dtb_header(raw: bytes) -> DeviceTreeHeader:
return DeviceTreeHeader(*[_bytes_to_int(chunk) for chunk in chunked(raw[4:HEADER_SIZE], 4)])


def header_has_illegal_values(header: DeviceTreeHeader, max_size: int) -> bool:
values = [
header.struct_block_offset,
header.strings_block_offset,
header.struct_block_size,
header.strings_block_size,
]
return (
header.version > 20 # noqa: PLR2004
or any(n > max_size or n > header.size for n in values)
or header.size > max_size
)


def convert_device_tree_to_str(file_path: str | Path) -> str | None:
process = run(f'dtc -I dtb -O dts {file_path}', shell=True, capture_output=True)
if process.returncode != 0:
Expand All @@ -89,62 +53,11 @@ def convert_device_tree_to_str(file_path: str | Path) -> str | None:
return process.stdout.decode(errors='replace').strip()


def dump_device_trees(raw: bytes) -> list[dict]:
total_offset = 0
dumped_device_trees = []

while MAGIC in raw:
offset = raw.find(MAGIC)
raw = raw[offset:]
total_offset += offset

json_result = analyze_device_tree(raw)
if json_result:
json_result['offset'] = total_offset
dumped_device_trees.append(json_result)

# only skip HEADER_SIZE ahead because device trees might be inside other device trees
raw = raw[HEADER_SIZE:]
total_offset += HEADER_SIZE

return dumped_device_trees


def analyze_device_tree(raw: bytes) -> dict | None:
header = parse_dtb_header(raw)
if header_has_illegal_values(header, len(raw)):
return None # probably false positive

device_tree = raw[: header.size]
strings_block = device_tree[header.strings_block_offset : header.strings_block_offset + header.strings_block_size]
structure_block = device_tree[header.struct_block_offset : header.struct_block_offset + header.struct_block_size]
strings_by_offset = {strings_block.find(s): s for s in strings_block.split(b'\0') if s}
description, model = _get_model_or_description(StructureBlock(structure_block, strings_by_offset))

with NamedTemporaryFile(mode='wb') as temp_file:
Path(temp_file.name).write_bytes(device_tree)
string_representation = convert_device_tree_to_str(temp_file.name)
if string_representation:
return _result_to_json(header, string_representation, model, description)
return None


def _get_model_or_description(structure_block: StructureBlock):
def get_model_or_description(structure_block: StructureBlock):
model, description = None, None
for prop in structure_block:
if prop.name == b'model':
model = prop.value.decode(errors='replace')
if not description and prop.name == b'description':
description = prop.value.decode(errors='replace')
return description, model


def _result_to_json(
header: DeviceTreeHeader, string_representation: str, model: str | None, description: str | None
) -> dict:
return {
'header': header._asdict(),
'device_tree': string_representation,
'model': model,
'description': description,
}
Loading

0 comments on commit 02638ec

Please sign in to comment.