From ad84578f8f8801b435d0c99067bc09872b2132e3 Mon Sep 17 00:00:00 2001 From: Andrew Fasano Date: Thu, 10 Aug 2023 16:18:16 -0400 Subject: [PATCH 1/2] Add panda target and test --- angr_targets/__init__.py | 5 + angr_targets/targets/panda_target.py | 138 ++++++++++++++++++++ tests/test_panda_not_packed_elf64.py | 180 +++++++++++++++++++++++++++ 3 files changed, 323 insertions(+) create mode 100644 angr_targets/targets/panda_target.py create mode 100644 tests/test_panda_not_packed_elf64.py diff --git a/angr_targets/__init__.py b/angr_targets/__init__.py index a05347a..ace394e 100644 --- a/angr_targets/__init__.py +++ b/angr_targets/__init__.py @@ -14,6 +14,11 @@ except Exception as e: l.error("Impossible to load R2ConcreteTarget exception %s"%(e)) +try: + from .targets.panda_target import PandaConcreteTarget +except Exception as e: + l.error("Impossible to load PandaConcreteTarget exception %s"%(e)) + ''' try: from .targets.ida_target import IDAConcreteTarget diff --git a/angr_targets/targets/panda_target.py b/angr_targets/targets/panda_target.py new file mode 100644 index 0000000..537cd1e --- /dev/null +++ b/angr_targets/targets/panda_target.py @@ -0,0 +1,138 @@ +import logging +from angr.errors import SimConcreteMemoryError, SimConcreteRegisterError + +from ..concrete import ConcreteTarget +from ..memory_map import MemoryMap + +l = logging.getLogger("angr_targets.panda") + +class PandaConcreteTarget(ConcreteTarget): + ''' + Unlike other concrete targets, the PandaConcreteTarget is not initialized with a binary. + Instead, a user controls the PANDA instance directly and uses PANDA callbacks to pause + execution when they wish to initalize and run a symbolic execution using Symbion. + + Note we could expand this interface to also support a mode where a binary is passed in + and concretely executed to a breakpoint. However, this is pretty much what the + AvatarConcreteTarget does, so maybe there's no need. If we wanted to do that, we'd need + to implement the various NYI methods at the end of this file. + ''' + def __init__(self, panda, *args, **kwargs): + self.panda = panda + self.architecture = panda.arch_name + super().__init__(*args, **kwargs) + + + def read_memory(self, address, nbytes, **kwargs): + try: + l.debug("PandaConcreteTarget read_memory at %x", address) + res = self.panda.virtual_memory_read(self.panda.get_cpu(), address, nbytes) + return res + except Exception as exn: + l.debug("PandaConcreteTarget can't read_memory at address %x exception" \ + " %s", address, exn) + + raise SimConcreteMemoryError("PandaConcreteTarget can't read_memory at" \ + f" address {address:x}") from exn + + def write_memory(self,address, value, **kwargs): + l.debug("PandaConcreteTarget write_memory at %x value %s", address, value) + try: + self.panda.virtual_memory_write(self.panda.get_cpu(), address, value) + except Exception as exn: + l.warning("PandaConcreteTarget write_memory at %x value %s exception %s", + address, value, exn) + raise SimConcreteMemoryError(f"PandaConcreteTarget write_memory at {address:x}" \ + f" value {value}") from exn + + def read_register(self, register, **kwargs): + # TODO: doesn't support xmm/ymm registers + try: + if self.architecture == 'x86_64' and register.endswith('_seg'): + register = register.split('_seg')[0] + elif self.architecture in ['mips', 'mipsel'] and register == 's8': + register = 'R30' + + register_value = self.panda.arch.get_reg(self.panda.get_cpu(), register) + + l.debug("PandaConcreteTarget read_register %s value %x", register, register_value) + return register_value + except Exception as exn: + l.debug("PandaConcreteTarget read_register %s exception %s %s", + register, type(exn).__name__, exn) + raise SimConcreteRegisterError("PandaConcreteTarget can't read register" \ + f" {register}") from exn + + def write_register(self, register, value, **kwargs): + l.debug("PandaConcreteTarget write_register at %s value %x ", register,value) + try: + self.panda.write_register(register, value) + except Exception as exn: + l.warning("PandaConcreteTarget write_register exception write reg %s value %x: %s", + register, value, exn) + raise SimConcreteRegisterError(f"PandaConcreteTarget write_register exception write" \ + f" reg {register} value {value:x}") from exn + + + def get_mappings(self): + """ + Returns the memory mappings of the currently-running process using PANDA's + operating system introspection. + """ + l.debug("getting the vmmap of the concrete process") + mapping_output = self.panda.get_mappings(self.panda.get_cpu()) + + vmmap = [] + for mapping in mapping_output: + if mapping.file == self.panda.ffi.NULL: + continue # Unknown name + filename = self.panda.ffi.string(mapping.file).decode() + vmmap.append(MemoryMap(mapping.base, mapping.base + mapping.size, mapping.offset, + filename)) + + return vmmap + + def execute_shellcode(self, shellcode, result_register): + # We don't support executing shellcode. But SimLinux wants to read some registers + # using shellcode. So if we detect one of these requests, just return the value + # from the concrete panda state. + if self.architecture == "x86_64": + read_gs0_x64 = b"\x65\xA1\x00\x00\x00\x00\x90\x90\x90\x90" # mov eax, gs:[0] + read_fs0_x64 = b"\x64\x48\x8B\x04\x25\x00\x00\x00\x00\x90\x90\x90\x90" # mov rax, fs:[0] + + if shellcode == read_fs0_x64: + return self.panda.get_cpu().env_ptr.segs[4].base # FS + + if shellcode == read_gs0_x64: + return self.panda.get_cpu().env_ptr.segs[5].base # GS + + raise NotImplementedError("execute_shellcode not implemented for panda target") + + # If we want this class to be more like the standard concrete targets, we should implement + # the following methods. + def is_running(self): + raise NotImplementedError("is_running not implemented for panda target") + + def add_breakpoint(self, address): + raise NotImplementedError("add_breakpoint not implemented for panda target") + + def remove_breakpoint(self, address, **kwargs): + raise NotImplementedError("remove_breakpoint not implemented for panda target") + + def wait_for_breakpoint(self, which=None): + raise NotImplementedError("wait_for_breakpoint not implemented for panda target") + + def set_watchpoint(self, address, **kwargs): + raise NotImplementedError("set_watchpoint not implemented for panda target") + + def remove_watchpoint(self, address, **kwargs): + raise NotImplementedError("remove_watchpoint not implemented for panda target") + + def run(self, **kwargs): + raise NotImplementedError("run not implemented for panda target") + + def step(self, **kwargs): + raise NotImplementedError("step not implemented for panda target") + + def stop(self, **kwargs): + raise NotImplementedError("stop not implemented for panda target") diff --git a/tests/test_panda_not_packed_elf64.py b/tests/test_panda_not_packed_elf64.py new file mode 100644 index 0000000..8ca4fec --- /dev/null +++ b/tests/test_panda_not_packed_elf64.py @@ -0,0 +1,180 @@ +import os +import unittest +import angr +import claripy + +try: + import pandare +except ImportError: + pandare = None + +try: + from angr_targets import PandaConcreteTarget +except ImportError: + PandaConcreteTarget = None + + +binary_x64 = os.path.join(os.path.dirname(os.path.realpath(__file__)), + os.path.join('..', '..', 'binaries', 'tests', 'x86_64', + 'not_packed_elf64')) + + +# Relative addresses just to prove we can +BINARY_OEP = 0x9B2 +BINARY_DECISION_ADDRESS = 0xAF3 +DROP_STAGE2_V1 = 0xB87 +DROP_STAGE2_V2 = 0xBB6 +VENV_DETECTED = 0xBC2 +FAKE_CC = 0xBD6 +BINARY_EXECUTION_END = 0xC03 + +@unittest.skipUnless(pandare is not None, "requires pandare") +class TestPanda(unittest.TestCase): + ''' + Test the PandaConcreteTarget by running a PANDA guest, then switching + to a symbolic execution with angr when we hit BINARY_DECISION_ADDRESS. + ''' + def test_concrete_engine_linux_x64_simprocedures(self): + ''' + Create a PANDA object and run an x86_64 guest system. Inside the guest, + copy our target binary and run it until it reaches the decision point. + At the decision point, run a symbolic execution to find the path + we're looking for. Then use that info to change concrete state + and resume the PANDA execution. + ''' + panda = pandare.Panda(generic="x86_64") + panda_target = PandaConcreteTarget(panda) + + # Register function to drive the PANDA guest once it starts + @panda.queue_blocking + def driver(): + ''' + Drive the PANDA guest during emulation. + First revert to a snapshot, then copy our binary in, + and finally run it. Assert if we don't see the "stage 2" output + that we should see if the symex finds the right path. + ''' + panda.revert_sync("root") + panda.copy_to_guest(binary_x64) + # Run the command + output = panda.run_serial_cmd("./not_packed_elf64/not_packed_elf64") + assert "Executing stage 2" in output, f"Unexpected output: {output}" + panda.end_analysis() + + @panda.ppp("proc_start_linux", "on_rec_auxv") + def proc_start(cpu, _, auxv): + ''' + Use PANDA's proc_start_linux plugin to detect the start of every + process. When we see the target process start, use it's base load + address to register a hook on the target function. When that hook + triggers, we'll switch to angr for symex. + ''' + name = panda.ffi.string(auxv.argv[0]).decode() + + if name.split("/")[-1] != 'not_packed_elf64': + return # Not our target + + # Get memory maps and find where executable is loaded + # This might be handled by target.get_mappings() automatically? + code_base = None + for mapping in panda.get_mappings(cpu): + map_name = panda.ffi.string(mapping.name).decode() + # First map with matching name is the one we want + if map_name == 'not_packed_elf64': + print(f"Found target {map_name} with base {mapping.base:x}") + code_base = mapping.base + break + else: + raise RuntimeError("Could not find target binary in maps") + + print(f"Registering hook at {BINARY_DECISION_ADDRESS+code_base:x}") + @panda.hook(BINARY_DECISION_ADDRESS+code_base) + def decision_hook(_cpu, _tb, hook): + ''' + This hook will be called when the target binary hits the + specified address. In here, we'll launch our symex. + When this returns, the concrete guest will resume. + ''' + # Craft our angr project while panda guest is stopped here + proj = angr.Project(binary_x64, + concrete_target=panda_target, + use_sim_procedures=True) + + entry_state = proj.factory.entry_state() + entry_state.options.add(angr.options.SYMBION_SYNC_CLE) + entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC) + + # Sync state from panda into angr + entry_state.concrete.sync() + + # Run a symex to find a valid solution + soln_addr, soln = self.solv_concrete_engine_linux_x64(proj, entry_state, code_base) + + # Write the solution back into the concrete guest's memory + panda_target.write_memory(soln_addr, soln) + + # Disable this hook so it doesn't trigger again + hook.enabled = False + + # Start the emulation + panda.run() + + @staticmethod + def solv_concrete_engine_linux_x64(proj, new_concrete_state, base_address): + ''' + Run a symbolic execution from the decision point with the stack set + to unconstrained symbolic data. Find a path to the DROP_STAGE2_V2 + address while avoiding the DROP_STAGE2_V1, VENV_DETECTED, and + FAKE_CC addresses. Return the address of the solution and the + solution itself. Also assert that we hit malloc and memcpy as + we'd expect to during the symex. + + ''' + # Read the stack and make sure it's concrete + the_sp = new_concrete_state.solver.eval(new_concrete_state.regs.sp) + assert not new_concrete_state.memory.load(the_sp,20).symbolic + + # Ensure the original stack buffer is concrete, then replace it with symbolic data + arg0 = claripy.BVS('arg0', 8*32) + symbolic_buffer_address = new_concrete_state.regs.rbp-0xc0 + assert not new_concrete_state.memory.load(symbolic_buffer_address, 36).symbolic + new_concrete_state.memory.store(symbolic_buffer_address, arg0) + + # Ensure that the new buffer is symbolic + assert new_concrete_state.memory.load(symbolic_buffer_address, 36).symbolic + + # Run our symbolic execution + simgr = proj.factory.simgr(new_concrete_state) + + find_addr=DROP_STAGE2_V2+base_address + avoid_addrs=[x + base_address for x in [DROP_STAGE2_V1, VENV_DETECTED, FAKE_CC]] + + simgr.use_technique(angr.exploration_techniques.DFS()) + simgr.use_technique(angr.exploration_techniques.Explorer(find=find_addr, avoid=avoid_addrs)) + + new_concrete_state.globals["hit_malloc_sim_proc"] = False + new_concrete_state.globals["hit_memcpy_sim_proc"] = False + + def check_hooked_simproc(state): + sim_proc_name = state.inspect.simprocedure_name + if sim_proc_name == "malloc": + state.globals["hit_malloc_sim_proc"] = True + elif sim_proc_name == "memcpy": + state.globals["hit_memcpy_sim_proc"] = True + + new_concrete_state.inspect.b('simprocedure', action=check_hooked_simproc) + simgr.explore() + + new_symbolic_state = simgr.stashes['found'][0] + + # Assert we hit the re-hooked SimProc. + assert new_symbolic_state.globals["hit_malloc_sim_proc"] + assert new_symbolic_state.globals["hit_memcpy_sim_proc"] + + # Return a concrete address (int) and buffer (bytes) that will reach our goal + conc_buffer_address = new_symbolic_state.solver.eval(symbolic_buffer_address) + binary_configuration = new_symbolic_state.solver.eval(arg0, cast_to=bytes) + return (conc_buffer_address, binary_configuration) + +if __name__ == "__main__": + unittest.main() From cef0e9a6685cba89c08708bc40ffa097b752b6bc Mon Sep 17 00:00:00 2001 From: Andrew Fasano Date: Mon, 14 Aug 2023 12:19:49 -0400 Subject: [PATCH 2/2] Add PANDA to docs --- Readme.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Readme.md b/Readme.md index a60c140..73aa6f7 100644 --- a/Readme.md +++ b/Readme.md @@ -19,8 +19,9 @@ The angr concrete target needs to implement the ConcreteTarget interface which m In the ConcreteTarget class docstrings you can find the detailed definition of the methods and the types of arguments/return values -Currently we support only 1 target: +Currently we support 2 targets: - `AvatarGDBTarget`: Connects to a gdbserver instance. +- `PandaConcreteTarget`: Connects to an emulated guest system running with [PANDA](https://panda.re). ## Install