-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #29 from AndrewFasano/master
Add panda target and test
- Loading branch information
Showing
4 changed files
with
325 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
import logging | ||
from angr.errors import SimConcreteMemoryError, SimConcreteRegisterError | ||
|
||
from ..concrete import ConcreteTarget | ||
from ..memory_map import MemoryMap | ||
|
||
l = logging.getLogger("angr_targets.panda") | ||
|
||
class PandaConcreteTarget(ConcreteTarget): | ||
''' | ||
Unlike other concrete targets, the PandaConcreteTarget is not initialized with a binary. | ||
Instead, a user controls the PANDA instance directly and uses PANDA callbacks to pause | ||
execution when they wish to initalize and run a symbolic execution using Symbion. | ||
Note we could expand this interface to also support a mode where a binary is passed in | ||
and concretely executed to a breakpoint. However, this is pretty much what the | ||
AvatarConcreteTarget does, so maybe there's no need. If we wanted to do that, we'd need | ||
to implement the various NYI methods at the end of this file. | ||
''' | ||
def __init__(self, panda, *args, **kwargs): | ||
self.panda = panda | ||
self.architecture = panda.arch_name | ||
super().__init__(*args, **kwargs) | ||
|
||
|
||
def read_memory(self, address, nbytes, **kwargs): | ||
try: | ||
l.debug("PandaConcreteTarget read_memory at %x", address) | ||
res = self.panda.virtual_memory_read(self.panda.get_cpu(), address, nbytes) | ||
return res | ||
except Exception as exn: | ||
l.debug("PandaConcreteTarget can't read_memory at address %x exception" \ | ||
" %s", address, exn) | ||
|
||
raise SimConcreteMemoryError("PandaConcreteTarget can't read_memory at" \ | ||
f" address {address:x}") from exn | ||
|
||
def write_memory(self,address, value, **kwargs): | ||
l.debug("PandaConcreteTarget write_memory at %x value %s", address, value) | ||
try: | ||
self.panda.virtual_memory_write(self.panda.get_cpu(), address, value) | ||
except Exception as exn: | ||
l.warning("PandaConcreteTarget write_memory at %x value %s exception %s", | ||
address, value, exn) | ||
raise SimConcreteMemoryError(f"PandaConcreteTarget write_memory at {address:x}" \ | ||
f" value {value}") from exn | ||
|
||
def read_register(self, register, **kwargs): | ||
# TODO: doesn't support xmm/ymm registers | ||
try: | ||
if self.architecture == 'x86_64' and register.endswith('_seg'): | ||
register = register.split('_seg')[0] | ||
elif self.architecture in ['mips', 'mipsel'] and register == 's8': | ||
register = 'R30' | ||
|
||
register_value = self.panda.arch.get_reg(self.panda.get_cpu(), register) | ||
|
||
l.debug("PandaConcreteTarget read_register %s value %x", register, register_value) | ||
return register_value | ||
except Exception as exn: | ||
l.debug("PandaConcreteTarget read_register %s exception %s %s", | ||
register, type(exn).__name__, exn) | ||
raise SimConcreteRegisterError("PandaConcreteTarget can't read register" \ | ||
f" {register}") from exn | ||
|
||
def write_register(self, register, value, **kwargs): | ||
l.debug("PandaConcreteTarget write_register at %s value %x ", register,value) | ||
try: | ||
self.panda.write_register(register, value) | ||
except Exception as exn: | ||
l.warning("PandaConcreteTarget write_register exception write reg %s value %x: %s", | ||
register, value, exn) | ||
raise SimConcreteRegisterError(f"PandaConcreteTarget write_register exception write" \ | ||
f" reg {register} value {value:x}") from exn | ||
|
||
|
||
def get_mappings(self): | ||
""" | ||
Returns the memory mappings of the currently-running process using PANDA's | ||
operating system introspection. | ||
""" | ||
l.debug("getting the vmmap of the concrete process") | ||
mapping_output = self.panda.get_mappings(self.panda.get_cpu()) | ||
|
||
vmmap = [] | ||
for mapping in mapping_output: | ||
if mapping.file == self.panda.ffi.NULL: | ||
continue # Unknown name | ||
filename = self.panda.ffi.string(mapping.file).decode() | ||
vmmap.append(MemoryMap(mapping.base, mapping.base + mapping.size, mapping.offset, | ||
filename)) | ||
|
||
return vmmap | ||
|
||
def execute_shellcode(self, shellcode, result_register): | ||
# We don't support executing shellcode. But SimLinux wants to read some registers | ||
# using shellcode. So if we detect one of these requests, just return the value | ||
# from the concrete panda state. | ||
if self.architecture == "x86_64": | ||
read_gs0_x64 = b"\x65\xA1\x00\x00\x00\x00\x90\x90\x90\x90" # mov eax, gs:[0] | ||
read_fs0_x64 = b"\x64\x48\x8B\x04\x25\x00\x00\x00\x00\x90\x90\x90\x90" # mov rax, fs:[0] | ||
|
||
if shellcode == read_fs0_x64: | ||
return self.panda.get_cpu().env_ptr.segs[4].base # FS | ||
|
||
if shellcode == read_gs0_x64: | ||
return self.panda.get_cpu().env_ptr.segs[5].base # GS | ||
|
||
raise NotImplementedError("execute_shellcode not implemented for panda target") | ||
|
||
# If we want this class to be more like the standard concrete targets, we should implement | ||
# the following methods. | ||
def is_running(self): | ||
raise NotImplementedError("is_running not implemented for panda target") | ||
|
||
def add_breakpoint(self, address): | ||
raise NotImplementedError("add_breakpoint not implemented for panda target") | ||
|
||
def remove_breakpoint(self, address, **kwargs): | ||
raise NotImplementedError("remove_breakpoint not implemented for panda target") | ||
|
||
def wait_for_breakpoint(self, which=None): | ||
raise NotImplementedError("wait_for_breakpoint not implemented for panda target") | ||
|
||
def set_watchpoint(self, address, **kwargs): | ||
raise NotImplementedError("set_watchpoint not implemented for panda target") | ||
|
||
def remove_watchpoint(self, address, **kwargs): | ||
raise NotImplementedError("remove_watchpoint not implemented for panda target") | ||
|
||
def run(self, **kwargs): | ||
raise NotImplementedError("run not implemented for panda target") | ||
|
||
def step(self, **kwargs): | ||
raise NotImplementedError("step not implemented for panda target") | ||
|
||
def stop(self, **kwargs): | ||
raise NotImplementedError("stop not implemented for panda target") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
import os | ||
import unittest | ||
import angr | ||
import claripy | ||
|
||
try: | ||
import pandare | ||
except ImportError: | ||
pandare = None | ||
|
||
try: | ||
from angr_targets import PandaConcreteTarget | ||
except ImportError: | ||
PandaConcreteTarget = None | ||
|
||
|
||
binary_x64 = os.path.join(os.path.dirname(os.path.realpath(__file__)), | ||
os.path.join('..', '..', 'binaries', 'tests', 'x86_64', | ||
'not_packed_elf64')) | ||
|
||
|
||
# Relative addresses just to prove we can | ||
BINARY_OEP = 0x9B2 | ||
BINARY_DECISION_ADDRESS = 0xAF3 | ||
DROP_STAGE2_V1 = 0xB87 | ||
DROP_STAGE2_V2 = 0xBB6 | ||
VENV_DETECTED = 0xBC2 | ||
FAKE_CC = 0xBD6 | ||
BINARY_EXECUTION_END = 0xC03 | ||
|
||
@unittest.skipUnless(pandare is not None, "requires pandare") | ||
class TestPanda(unittest.TestCase): | ||
''' | ||
Test the PandaConcreteTarget by running a PANDA guest, then switching | ||
to a symbolic execution with angr when we hit BINARY_DECISION_ADDRESS. | ||
''' | ||
def test_concrete_engine_linux_x64_simprocedures(self): | ||
''' | ||
Create a PANDA object and run an x86_64 guest system. Inside the guest, | ||
copy our target binary and run it until it reaches the decision point. | ||
At the decision point, run a symbolic execution to find the path | ||
we're looking for. Then use that info to change concrete state | ||
and resume the PANDA execution. | ||
''' | ||
panda = pandare.Panda(generic="x86_64") | ||
panda_target = PandaConcreteTarget(panda) | ||
|
||
# Register function to drive the PANDA guest once it starts | ||
@panda.queue_blocking | ||
def driver(): | ||
''' | ||
Drive the PANDA guest during emulation. | ||
First revert to a snapshot, then copy our binary in, | ||
and finally run it. Assert if we don't see the "stage 2" output | ||
that we should see if the symex finds the right path. | ||
''' | ||
panda.revert_sync("root") | ||
panda.copy_to_guest(binary_x64) | ||
# Run the command | ||
output = panda.run_serial_cmd("./not_packed_elf64/not_packed_elf64") | ||
assert "Executing stage 2" in output, f"Unexpected output: {output}" | ||
panda.end_analysis() | ||
|
||
@panda.ppp("proc_start_linux", "on_rec_auxv") | ||
def proc_start(cpu, _, auxv): | ||
''' | ||
Use PANDA's proc_start_linux plugin to detect the start of every | ||
process. When we see the target process start, use it's base load | ||
address to register a hook on the target function. When that hook | ||
triggers, we'll switch to angr for symex. | ||
''' | ||
name = panda.ffi.string(auxv.argv[0]).decode() | ||
|
||
if name.split("/")[-1] != 'not_packed_elf64': | ||
return # Not our target | ||
|
||
# Get memory maps and find where executable is loaded | ||
# This might be handled by target.get_mappings() automatically? | ||
code_base = None | ||
for mapping in panda.get_mappings(cpu): | ||
map_name = panda.ffi.string(mapping.name).decode() | ||
# First map with matching name is the one we want | ||
if map_name == 'not_packed_elf64': | ||
print(f"Found target {map_name} with base {mapping.base:x}") | ||
code_base = mapping.base | ||
break | ||
else: | ||
raise RuntimeError("Could not find target binary in maps") | ||
|
||
print(f"Registering hook at {BINARY_DECISION_ADDRESS+code_base:x}") | ||
@panda.hook(BINARY_DECISION_ADDRESS+code_base) | ||
def decision_hook(_cpu, _tb, hook): | ||
''' | ||
This hook will be called when the target binary hits the | ||
specified address. In here, we'll launch our symex. | ||
When this returns, the concrete guest will resume. | ||
''' | ||
# Craft our angr project while panda guest is stopped here | ||
proj = angr.Project(binary_x64, | ||
concrete_target=panda_target, | ||
use_sim_procedures=True) | ||
|
||
entry_state = proj.factory.entry_state() | ||
entry_state.options.add(angr.options.SYMBION_SYNC_CLE) | ||
entry_state.options.add(angr.options.SYMBION_KEEP_STUBS_ON_SYNC) | ||
|
||
# Sync state from panda into angr | ||
entry_state.concrete.sync() | ||
|
||
# Run a symex to find a valid solution | ||
soln_addr, soln = self.solv_concrete_engine_linux_x64(proj, entry_state, code_base) | ||
|
||
# Write the solution back into the concrete guest's memory | ||
panda_target.write_memory(soln_addr, soln) | ||
|
||
# Disable this hook so it doesn't trigger again | ||
hook.enabled = False | ||
|
||
# Start the emulation | ||
panda.run() | ||
|
||
@staticmethod | ||
def solv_concrete_engine_linux_x64(proj, new_concrete_state, base_address): | ||
''' | ||
Run a symbolic execution from the decision point with the stack set | ||
to unconstrained symbolic data. Find a path to the DROP_STAGE2_V2 | ||
address while avoiding the DROP_STAGE2_V1, VENV_DETECTED, and | ||
FAKE_CC addresses. Return the address of the solution and the | ||
solution itself. Also assert that we hit malloc and memcpy as | ||
we'd expect to during the symex. | ||
''' | ||
# Read the stack and make sure it's concrete | ||
the_sp = new_concrete_state.solver.eval(new_concrete_state.regs.sp) | ||
assert not new_concrete_state.memory.load(the_sp,20).symbolic | ||
|
||
# Ensure the original stack buffer is concrete, then replace it with symbolic data | ||
arg0 = claripy.BVS('arg0', 8*32) | ||
symbolic_buffer_address = new_concrete_state.regs.rbp-0xc0 | ||
assert not new_concrete_state.memory.load(symbolic_buffer_address, 36).symbolic | ||
new_concrete_state.memory.store(symbolic_buffer_address, arg0) | ||
|
||
# Ensure that the new buffer is symbolic | ||
assert new_concrete_state.memory.load(symbolic_buffer_address, 36).symbolic | ||
|
||
# Run our symbolic execution | ||
simgr = proj.factory.simgr(new_concrete_state) | ||
|
||
find_addr=DROP_STAGE2_V2+base_address | ||
avoid_addrs=[x + base_address for x in [DROP_STAGE2_V1, VENV_DETECTED, FAKE_CC]] | ||
|
||
simgr.use_technique(angr.exploration_techniques.DFS()) | ||
simgr.use_technique(angr.exploration_techniques.Explorer(find=find_addr, avoid=avoid_addrs)) | ||
|
||
new_concrete_state.globals["hit_malloc_sim_proc"] = False | ||
new_concrete_state.globals["hit_memcpy_sim_proc"] = False | ||
|
||
def check_hooked_simproc(state): | ||
sim_proc_name = state.inspect.simprocedure_name | ||
if sim_proc_name == "malloc": | ||
state.globals["hit_malloc_sim_proc"] = True | ||
elif sim_proc_name == "memcpy": | ||
state.globals["hit_memcpy_sim_proc"] = True | ||
|
||
new_concrete_state.inspect.b('simprocedure', action=check_hooked_simproc) | ||
simgr.explore() | ||
|
||
new_symbolic_state = simgr.stashes['found'][0] | ||
|
||
# Assert we hit the re-hooked SimProc. | ||
assert new_symbolic_state.globals["hit_malloc_sim_proc"] | ||
assert new_symbolic_state.globals["hit_memcpy_sim_proc"] | ||
|
||
# Return a concrete address (int) and buffer (bytes) that will reach our goal | ||
conc_buffer_address = new_symbolic_state.solver.eval(symbolic_buffer_address) | ||
binary_configuration = new_symbolic_state.solver.eval(arg0, cast_to=bytes) | ||
return (conc_buffer_address, binary_configuration) | ||
|
||
if __name__ == "__main__": | ||
unittest.main() |