Skip to content

Commit

Permalink
sequence: add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin committed Dec 17, 2024
1 parent 69f4728 commit 8fe6cb2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 8 deletions.
2 changes: 1 addition & 1 deletion capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def find_process_capabilities(


def find_dynamic_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None
ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress: bool = False
) -> Capabilities:
all_process_matches: MatchResults = collections.defaultdict(list)
all_thread_matches: MatchResults = collections.defaultdict(list)
Expand Down
6 changes: 3 additions & 3 deletions capa/features/extractors/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,11 @@ def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) ->
raise NotImplementedError()


def ProcessFilter(extractor: DynamicFeatureExtractor, processes: set) -> DynamicFeatureExtractor:
def ProcessFilter(extractor: DynamicFeatureExtractor, pids: set[int]) -> DynamicFeatureExtractor:
original_get_processes = extractor.get_processes

def filtered_get_processes(self):
yield from (f for f in original_get_processes() if f.address.pid in processes)
yield from (f for f in original_get_processes() if f.address.pid in pids)

# we make a copy of the original extractor object and then update its get_processes() method with the decorated filter one.
# this is in order to preserve the original extractor object's get_processes() method, in case it is used elsewhere in the code.
Expand All @@ -497,7 +497,7 @@ def filtered_get_processes(self):
return new_extractor


def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set) -> DynamicFeatureExtractor:
def ThreadFilter(extractor: DynamicFeatureExtractor, threads: set[Address]) -> DynamicFeatureExtractor:
original_get_threads = extractor.get_threads

def filtered_get_threads(self, ph: ProcessHandle):
Expand Down
2 changes: 1 addition & 1 deletion capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def build_statements(d, scopes: Scopes):
)

elif key == "sequence":
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD)):
if all(s not in scopes for s in (Scope.FILE, Scope.PROCESS, Scope.THREAD, Scope.SEQUENCE)):
raise InvalidRule("sequence subscope supported only for the process and thread scopes")

if len(d[key]) != 1:
Expand Down
111 changes: 108 additions & 3 deletions tests/test_dynamic_sequence_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
# ...

import textwrap
from typing import Iterator
from functools import lru_cache

import pytest
import fixtures

import capa.main
import capa.rules
import capa.capabilities.dynamic
from capa.features.extractors.base_extractor import ThreadFilter, DynamicFeatureExtractor

Expand Down Expand Up @@ -62,7 +65,7 @@ def get_0000a657_thread3064():
return extractor


def get_call_ids(matches):
def get_call_ids(matches) -> Iterator[int]:
for address, _ in matches:
yield address.id

Expand Down Expand Up @@ -96,7 +99,7 @@ def test_dynamic_call_scope():
assert 8 in get_call_ids(capabilities.matches[r.name])


# match the first 5-tuple sequence.
# match the first sequence.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
Expand Down Expand Up @@ -147,7 +150,7 @@ def test_dynamic_sequence_scope():
# call 14: RtlAddVectoredExceptionHandler(1921490089, 0)
# call 15: GetSystemTime()
# call 16: NtAllocateVirtualMemory(no, 4, 786432, 4784128, 4294967295)
def test_dynamic_sequence_scope2():
def test_dynamic_sequence_scope_length():
extractor = get_0000a657_thread3064()

rule = textwrap.dedent(
Expand Down Expand Up @@ -178,6 +181,108 @@ def test_dynamic_sequence_scope2():
assert r.name not in capabilities.matches


# show that you can use a call subscope in sequence rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
# ...
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_call_subscope():
extractor = get_0000a657_thread3064()

rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- call:
- and:
- api: LdrGetProcedureAddress
- string: AddVectoredExceptionHandler
"""
)

r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])

capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 11 in get_call_ids(capabilities.matches[r.name])


# show that you can use a sequence subscope in sequence rules.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
# ...
# call 10: LdrGetDllHandle(1974337536, kernel32.dll)
# call 11: LdrGetProcedureAddress(2010595649, 0, AddVectoredExceptionHandler, 1974337536, kernel32.dll)
# call 12: LdrGetDllHandle(1974337536, kernel32.dll)
# call 13: LdrGetProcedureAddress(2010595072, 0, RemoveVectoredExceptionHandler, 1974337536, kernel32.dll)
# ...
def test_dynamic_sequence_scope_sequence_subscope():
extractor = get_0000a657_thread3064()

rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- sequence:
- description: resolve add VEH # should match at 11
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: AddVectoredExceptionHandler
- sequence:
- description: resolve remove VEH # should match at 13
- and:
- api: LdrGetDllHandle
- api: LdrGetProcedureAddress
- string: RemoveVectoredExceptionHandler
"""
)

r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])

capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert 13 in get_call_ids(capabilities.matches[r.name])


# show that you can't use thread subscope in sequence rules.
def test_dynamic_sequence_scope_thread_subscope():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: unsupported
dynamic: sequence
features:
- and:
- thread:
- string: "foo"
"""
)

with pytest.raises(capa.rules.InvalidRule):
capa.rules.Rule.from_yaml(rule)


# show how you might use a sequence rule: to match a small window for a collection of features.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
Expand Down

1 comment on commit 8fe6cb2

@mr-tz
Copy link
Collaborator

@mr-tz mr-tz commented on 8fe6cb2 Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

Please sign in to comment.