Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

device_tree: Use PluginV1 #1028

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/plugins/analysis/architecture_detection/internal/dt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ def _get_compatible_entry(dts: str) -> str | None:


def construct_result(file_object):
device_tree_result = file_object.processed_analysis['device_tree'].get('result', {})
if not device_tree_result:
return {}

result = {}
for dt_dict in file_object.processed_analysis['device_tree'].get('result', {}).get('device_trees', []):
for dt_dict in device_tree_result.get('device_trees', []):
dt = dt_dict['device_tree']

compatible_entry = _get_compatible_entry(dt)
Expand Down
102 changes: 68 additions & 34 deletions src/plugins/analysis/device_tree/code/device_tree.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,72 @@
from analysis.PluginBase import AnalysisBasePlugin
from __future__ import annotations

from helperFunctions.tag import TagColor
from objects.file import FileObject
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from analysis.plugin import AnalysisPluginV0, Tag
from typing import Optional, Dict, TYPE_CHECKING

from ..internal.device_tree_utils import dump_device_trees


class AnalysisPlugin(AnalysisBasePlugin):
"""
Device Tree Plug-in
"""

NAME = 'device_tree'
DESCRIPTION = 'get the device tree in text from the device tree blob'
VERSION = '1.0.1'
MIME_BLACKLIST = [*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'] # noqa: RUF012
FILE = __file__

def process_object(self, file_object: FileObject):
file_object.processed_analysis[self.NAME] = {'summary': []}

device_trees = dump_device_trees(file_object.binary)
if device_trees:
file_object.processed_analysis[self.NAME]['device_trees'] = device_trees
for result in device_trees:
model = result.get('model')
if model:
file_object.processed_analysis[self.NAME]['summary'].append(model)
self.add_analysis_tag(
file_object=file_object,
tag_name=self.NAME,
value=self.NAME.replace('_', ' '),
color=TagColor.ORANGE,
propagate=False,
)
from ..internal.schema import Schema
from ..internal.schema import DeviceTree, IllegalDeviceTreeError

if TYPE_CHECKING:
import io


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
def __init__(self):
metadata = AnalysisPluginV0.MetaData(
name='device_tree',
description='get the device tree in text from the device tree blob',
version='2.0.0',
system_version=None,
mime_blacklist=[*MIME_BLACKLIST_COMPRESSED, 'audio', 'image', 'video'],
timeout=10,
Schema=Schema,
maringuu marked this conversation as resolved.
Show resolved Hide resolved
)
super().__init__(metadata=metadata)

def summarize(self, result: Schema) -> list[str]:
models = [device_tree.model for device_tree in result.device_trees if device_tree.model]

if not models:
return ['unknown-model']

return file_object
return models

def analyze(
self,
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: Dict[str, dict],
) -> Optional[Schema]:
del virtual_file_path, analyses

binary = file_handle.readall()

device_trees = []
offset = 0
while (offset := binary.find(DeviceTree.Header.MAGIC, offset)) >= 0:
try:
device_tree = DeviceTree.from_binary(binary, offset=offset)
# We found a valid device tree.
# Skip only the header because device trees may contain device trees themselves.
offset += DeviceTree.Header.SIZE
device_trees.append(device_tree)
except IllegalDeviceTreeError:
offset += 1

if len(device_trees) == 0:
return None

return Schema(device_trees=device_trees)

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del result, summary
return [
Tag(
name=self.metadata.name,
value='device tree',
color=TagColor.ORANGE,
),
]
103 changes: 8 additions & 95 deletions src/plugins/analysis/device_tree/internal/device_tree_utils.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,24 @@
from __future__ import annotations

import logging
from pathlib import Path
from subprocess import run
from tempfile import NamedTemporaryFile
from typing import NamedTuple
from typing import TYPE_CHECKING

from more_itertools import chunked
if TYPE_CHECKING:
from pathlib import Path

MAGIC = bytes.fromhex('D00DFEED')

HEADER_SIZE = 40


def _bytes_to_int(byte_str: list[int]) -> int:
return int.from_bytes(bytes(byte_str), byteorder='big')


class DeviceTreeHeader(NamedTuple):
# Based on https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#header
size: int
struct_block_offset: int
strings_block_offset: int
memory_map_offset: int
version: int
oldest_compatible_version: int
boot_cpu_id: int
strings_block_size: int
struct_block_size: int
def int_from_buf(buf: bytes, offset: int) -> int:
return int.from_bytes(buf[offset : offset + 4], byteorder='big')


class Property:
def __init__(self, raw: bytes, strings_by_offset: dict[int, bytes]):
# a property consists of a struct {uint32_t len; uint32_t nameoff;} followed by the value
# nameoff is an offset of the string in the strings block
# see also: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html#lexical-structure
self.length = _bytes_to_int(list(raw[4:8]))
self.name_offset = _bytes_to_int(list(raw[8:12]))
self.length = int_from_buf(raw, 4)
self.name_offset = int_from_buf(raw, 8)
self.name = strings_by_offset.get(self.name_offset, None)
self.value = raw[12 : 12 + self.length].strip(b'\0')

Expand All @@ -61,24 +43,6 @@ def __iter__(self):
self.raw = self.raw[next_property_offset + prop.get_size() :]


def parse_dtb_header(raw: bytes) -> DeviceTreeHeader:
return DeviceTreeHeader(*[_bytes_to_int(chunk) for chunk in chunked(raw[4:HEADER_SIZE], 4)])


def header_has_illegal_values(header: DeviceTreeHeader, max_size: int) -> bool:
values = [
header.struct_block_offset,
header.strings_block_offset,
header.struct_block_size,
header.strings_block_size,
]
return (
header.version > 20 # noqa: PLR2004
or any(n > max_size or n > header.size for n in values)
or header.size > max_size
)


def convert_device_tree_to_str(file_path: str | Path) -> str | None:
process = run(f'dtc -I dtb -O dts {file_path}', shell=True, capture_output=True)
if process.returncode != 0:
Expand All @@ -89,62 +53,11 @@ def convert_device_tree_to_str(file_path: str | Path) -> str | None:
return process.stdout.decode(errors='replace').strip()


def dump_device_trees(raw: bytes) -> list[dict]:
total_offset = 0
dumped_device_trees = []

while MAGIC in raw:
offset = raw.find(MAGIC)
raw = raw[offset:]
total_offset += offset

json_result = analyze_device_tree(raw)
if json_result:
json_result['offset'] = total_offset
dumped_device_trees.append(json_result)

# only skip HEADER_SIZE ahead because device trees might be inside other device trees
raw = raw[HEADER_SIZE:]
total_offset += HEADER_SIZE

return dumped_device_trees


def analyze_device_tree(raw: bytes) -> dict | None:
header = parse_dtb_header(raw)
if header_has_illegal_values(header, len(raw)):
return None # probably false positive

device_tree = raw[: header.size]
strings_block = device_tree[header.strings_block_offset : header.strings_block_offset + header.strings_block_size]
structure_block = device_tree[header.struct_block_offset : header.struct_block_offset + header.struct_block_size]
strings_by_offset = {strings_block.find(s): s for s in strings_block.split(b'\0') if s}
description, model = _get_model_or_description(StructureBlock(structure_block, strings_by_offset))

with NamedTemporaryFile(mode='wb') as temp_file:
Path(temp_file.name).write_bytes(device_tree)
string_representation = convert_device_tree_to_str(temp_file.name)
if string_representation:
return _result_to_json(header, string_representation, model, description)
return None


def _get_model_or_description(structure_block: StructureBlock):
def get_model_or_description(structure_block: StructureBlock):
model, description = None, None
for prop in structure_block:
if prop.name == b'model':
model = prop.value.decode(errors='replace')
if not description and prop.name == b'description':
description = prop.value.decode(errors='replace')
return description, model


def _result_to_json(
header: DeviceTreeHeader, string_representation: str, model: str | None, description: str | None
) -> dict:
return {
'header': header._asdict(),
'device_tree': string_representation,
'model': model,
'description': description,
}
Loading
Loading