Skip to content

Commit

Permalink
Adds Poetry as dependency manager
Browse files Browse the repository at this point in the history
Signed-off-by: Phineas09 <[email protected]>
  • Loading branch information
Phineas09 committed Nov 1, 2022
0 parents commit 17c0569
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 0 deletions.
13 changes: 13 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Poetry

/.venv

# Python 3

__pycache__
*.pyc
/poetry.lock

# Visual Studio Code symlink

.vscode
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# vulnerability_analytics

TODO: Define requirements, examples and explanations.
Binary file added hammer_controller.bin
Binary file not shown.
52 changes: 52 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[tool.poetry]
name = "vulnerability_analytics"
description = "Module for discovering further details about a crash"
authors = ["OpenCRS"]
version = "0.1.0"

[tool.poetry.dependencies]
python = "^3.8"
angr = "^9.2.24"
docker = "^5.0.3"
click = "^8.1.3"
rich = "^12.5.1"
nclib = "^1.0.2"
pathlib = "^1.0.1"
povsim = { git = "https://github.com/mechaphish/povsim.git" }
compilerex = { git = "https://github.com/mechaphish/compilerex.git" }
rex = { git = "https://github.com/angr/rex.git" }
tracer = { git = "https://github.com/angr/tracer.git" }
# archinfo = "^9.2.24" # = { git = "https://github.com/angr/archinfo.git", branch = "master" }
# pyvex = { git = "https://github.com/angr/pyvex.git" }
# cle = { git = "https://github.com/angr/cle.git" }
# archr = { git = "https://github.com/angr/archr.git" }
setuptools = "^65.5.0"
keystone = "^22.0.0"
keystone-engine = "^0.9.2"

[tool.poetry.dev-dependencies]
black = "^22.6.0"
isort = "^5.10.1"
pylint = "^2.14.4"
pyproject-flake8 = "^0.0.1-alpha.5"
flake8-annotations = "^2.9.1"

[tool.poetry.scripts]
vulnerability_analytics = "vulnerability_analytics.cli:main"

[tool.pylint.MASTER]
init-hook="import sys; sys.path.append('.')"

[tool.pylint.MESSAGES_CONTROL]
disable="missing-function-docstring, missing-class-docstring, missing-module-docstring"

[tool.flake8]
extend-ignore = "ANN101,"
max-line-length=88
per-file-ignores = """
./**/__init__.py: F401
"""

[tool.black]
line-length = 79
preview = true
4 changes: 4 additions & 0 deletions vulnerability_analytics/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from vulnerability_analytics.cli import main

if __name__ == "__main__":
main()
51 changes: 51 additions & 0 deletions vulnerability_analytics/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python3

from array import array
import os
import click
from vulnerability_analytics.rex_api.DockerBuilder import *
from vulnerability_analytics.rex_api.VunlerabilityAnalysis import *
from archr.analyzers.rr import *


@click.group("cli")
def cli() -> None:
"""Displays information about a binary crash for given input."""


@cli.command("get", help="Displays information about the crash.")
@click.option("--binary-path", required=True, type=str, help="Desired binary path.")
@click.option("--other-arguments", type=str)
@click.option("--crash-input", required=True, type=str)
def get(
binary_path: str = None,
other_arguments: array = [],
crash_input: str = None
) -> None:

image = DockerBuilder()

# Maybe encode the crash_input into base64?

other_arguments = other_arguments.split(',')

binary_name = os.path.basename(binary_path).split('.')[0]

print(binary_path, other_arguments, crash_input)

# Parse other_arguments

image.build_custom_image(
binary_path, binary_name + ":crash", other_arguments)
analyzer = VulnerabilityAnalysis(
image, bytes(crash_input, 'utf-8')).start()
input("Press Enter to continue...")
print(analyzer.export2JSON())


def main() -> None:
cli(prog_name="vulnerability_analytics")


if __name__ == "__main__":
main()
7 changes: 7 additions & 0 deletions vulnerability_analytics/res/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM ubuntu:latest

COPY libc.so.6.binary /libc.so.6
COPY ld-linux-x86-64.so.2.binary /ld-linux-x86-64.so.2



Binary file not shown.
Binary file added vulnerability_analytics/res/libc.so.6.binary
Binary file not shown.
80 changes: 80 additions & 0 deletions vulnerability_analytics/rex_api/DockerBuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import tempfile
import shutil
import typing
import docker

class DockerBuilder:

tmp_docker_folder: tempfile.TemporaryDirectory
tmp_dockerfile: str
client: docker.DockerClient
image: docker.models.images.Image

def __init__(self) -> None:
# Create a temp file for current config
self.tmp_docker_folder = tempfile.TemporaryDirectory()
self.tmp_dockerfile = None
self.image = None
self.client = docker.from_env()


def build_custom_image(self, target_path: str, image_tag: str, *argv: str) -> None:

self.__prepare_dockerfile(target_path, *argv)

images = self.client.images.build(
path = self.tmp_docker_folder.name,
pull=True,
tag = image_tag
)
image = images[0]
# print(type(image))
self.image = image

# print(self.tmp_dockerfile)
# import time
# time.sleep(100000)

return image

def __prepare_dockerfile(self, target_path: str, argument_list: typing.List) -> None:

# Copy all resources to the new temp folder
shutil.copytree(os.path.dirname(os.path.realpath(__file__)) + '/../res/', self.tmp_docker_folder.name, dirs_exist_ok=True)
# print(self.tmp_docker_folder.name)
# Initialize the docker client
self.tmp_dockerfile = self.tmp_docker_folder.name + '/Dockerfile'

# Copy the binary to the temp folder
target_path = os.path.abspath(target_path)
# print(target_path)
if not os.path.exists(target_path):
print("Binary does not exist!")
exit(0)

# Copy the binary
shutil.copy2(target_path, self.tmp_docker_folder.name)

# Create the ENTRYPOINT command using the followint template
entry_point = 'ENTRYPOINT ["/ld-linux-x86-64.so.2", "--library-path", "/", ' + '"' + os.path.basename(target_path) + '"'
# If we have some additional arguments, add them

if(len(argument_list)):
for arg in argument_list:
entry_point = entry_point + ', "' + arg + '"'

# Then close the statement
entry_point = entry_point + ']'

# Open temp file in append mode and add copy command (used to copy the binary) and the ENTRYPOINT command
with open(self.tmp_dockerfile, 'a') as dockerfile:
dockerfile.write("\nCOPY {} /\n".format(os.path.basename(target_path)))
dockerfile.write("\n" + entry_point + "\n")







161 changes: 161 additions & 0 deletions vulnerability_analytics/rex_api/VunlerabilityAnalysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from xmlrpc.client import boolean
from vulnerability_analytics.rex_api.DockerBuilder import *
import archr
import rex
import logging
import typing
from archr.analyzers.rr import *
import json

class VulnerabilityAnalysis:

crash : rex.Crash
dockerImage : DockerBuilder
crashInput : str
crashTarget : archr.targets.DockerImageTarget
localLogger : logging.Logger
crashOk : boolean

def __init__(self, dockerImage : DockerBuilder, crashInput : str) -> None:

# Configure loggers for angr and rex
logging.getLogger("angr.exploration_techniques.tracer").setLevel(logging.CRITICAL)
logging.getLogger("rex").setLevel(logging.INFO)

# Configure logger for our program

# Get custom logger for out app
self.localLogger = logging.getLogger('crs.vulnerability_analysis')
# By using this we can log everything
# self.localLogger = logging.getLogger()
self.localLogger.setLevel(logging.DEBUG)

# Initialize components
self.crashOk = False
self.dockerImage = dockerImage
self.crashInput = crashInput

if(not self.dockerImage.image):
self.localLogger.error('Image does not exist or some error has occured!')
exit(0)

# Do not contrain the crash input as it could be anything...

def start(self): #-> typing.VulnerabilityAnalysis:

self.crashTarget = archr.targets.DockerImageTarget(self.dockerImage.image.tags[0]).build().start()
self.crash = rex.Crash(self.crashTarget, self.crashInput, aslr=False, use_rop=False)
self.localLogger.info(self.crash.crash_types)

self.crashOk = True

return self


def getCrashTypes(self) -> typing.List:

'''List containing identified crash types'''

if self.crashOk:
return self.crash.crash_types
self.localLogger.error('Crash not triggered!')
return None

def getFunctionAddress(self) -> str:

'''Current function hex addres as string'''

if self.crashOk:
return hex(self.crash.state.callstack.func_addr)
self.localLogger.error('Crash not triggered!')
return None

def getBasicBlockAddress(self) -> str:

'''Address of the basic block that called the current function as hex string'''

if self.crashOk:
return hex(self.crash.state.callstack.func_addr)
self.localLogger.error('Crash not triggered!')
return None

def getReturnAddress(self) -> str:

'''Return address of the current function as hex string'''

if self.crashOk:
return hex(self.crash.state.callstack.ret_addr)
self.localLogger.error('Crash not triggered!')
return None

def getStackPointer(self) -> str:

'''Return address of the current stack pointer as hex string'''

if self.crashOk:
return hex(self.crash.state.callstack.stack_ptr)
self.localLogger.error('Crash not triggered!')
return None


def getJumpKind(self) -> str:

'''Type of executed jump
Source here: https://github.com/angr/fidget/blob/0f255bbd11c6721d39581c5d3d2863fce5fad785/fidget/structures.py'''

if self.crashOk:
return self.crash.state.callstack.jumpkind
self.localLogger.error('Crash not triggered!')
return None

def getCoreRegisters(self) -> typing.Dict:

'''Returns a dictionary containing all core registers'''

if self.crashOk:
return self.crash.core_registers
self.localLogger.error('Crash not triggered!')
return None

def export2JSON(self) -> str:

'''Export crash fields into JSON format'''

if self.crashOk:
return CrashExport(
self.getCrashTypes(),
self.getFunctionAddress(),
self.getBasicBlockAddress(),
self.getReturnAddress(),
self.getStackPointer(),
self.getJumpKind(),
self.getCoreRegisters()
).toJson()
self.localLogger.error('Crash not triggered!')
return None


class CrashExport:

'''Object used to export the following information to JSON object'''

def __init__(self,
crashTypes : typing.List,
functionAddress : str,
basicBlockAddress : str,
returnAddress : str,
stackPointer : str,
jumpKind : str,
coreRegisters: typing.Dict) -> None:

self.crashTypes = crashTypes
self.functionAddress = functionAddress
self.basicBlockAddress = basicBlockAddress
self.returnAddress = returnAddress
self.stackPointer = stackPointer
self.jumpKind = jumpKind
self.coreRegisters = coreRegisters

def toJson(self):
return json.dumps(self, default=lambda o: o.__dict__)

0 comments on commit 17c0569

Please sign in to comment.