9
9
import logging
10
10
import itertools
11
11
import collections
12
- from typing import Any
12
+ from dataclasses import dataclass
13
13
14
14
import capa .perf
15
15
import capa .features .freeze as frz
16
16
import capa .render .result_document as rdoc
17
17
from capa .rules import Scope , RuleSet
18
18
from capa .engine import FeatureSet , MatchResults
19
- from capa .capabilities .common import find_file_capabilities
19
+ from capa .capabilities .common import Capabilities , find_file_capabilities
20
20
from capa .features .extractors .base_extractor import CallHandle , ThreadHandle , ProcessHandle , DynamicFeatureExtractor
21
21
22
22
logger = logging .getLogger (__name__ )
26
26
SEQUENCE_SIZE = 5
27
27
28
28
29
+ @dataclass
30
+ class CallCapabilities :
31
+ features : FeatureSet
32
+ matches : MatchResults
33
+
34
+
29
35
def find_call_capabilities (
30
36
ruleset : RuleSet , extractor : DynamicFeatureExtractor , ph : ProcessHandle , th : ThreadHandle , ch : CallHandle
31
- ) -> tuple [ FeatureSet , MatchResults ] :
37
+ ) -> CallCapabilities :
32
38
"""
33
39
find matches for the given rules for the given call.
34
-
35
- returns: tuple containing (features for call, match results for call)
36
40
"""
37
41
# all features found for the call.
38
42
features : FeatureSet = collections .defaultdict (set )
@@ -50,16 +54,22 @@ def find_call_capabilities(
50
54
for addr , _ in res :
51
55
capa .engine .index_rule_matches (features , rule , [addr ])
52
56
53
- return features , matches
57
+ return CallCapabilities (features , matches )
58
+
59
+
60
+ @dataclass
61
+ class ThreadCapabilities :
62
+ features : FeatureSet
63
+ thread_matches : MatchResults
64
+ sequence_matches : MatchResults
65
+ call_matches : MatchResults
54
66
55
67
56
68
def find_thread_capabilities (
57
69
ruleset : RuleSet , extractor : DynamicFeatureExtractor , ph : ProcessHandle , th : ThreadHandle
58
- ) -> tuple [ FeatureSet , MatchResults , MatchResults , MatchResults ] :
70
+ ) -> ThreadCapabilities :
59
71
"""
60
72
find matches for the given rules within the given thread.
61
-
62
- returns: tuple containing (features for thread, match results for thread, match results for sequences, match results for calls)
63
73
"""
64
74
# all features found within this thread,
65
75
# includes features found within calls.
@@ -75,20 +85,20 @@ def find_thread_capabilities(
75
85
sequence : collections .deque [FeatureSet ] = collections .deque (maxlen = SEQUENCE_SIZE )
76
86
77
87
for ch in extractor .get_calls (ph , th ):
78
- cfeatures , cmatches = find_call_capabilities (ruleset , extractor , ph , th , ch )
79
- for feature , vas in cfeatures .items ():
88
+ call_capabilities = find_call_capabilities (ruleset , extractor , ph , th , ch )
89
+ for feature , vas in call_capabilities . features .items ():
80
90
features [feature ].update (vas )
81
91
82
- for rule_name , res in cmatches .items ():
92
+ for rule_name , res in call_capabilities . matches .items ():
83
93
call_matches [rule_name ].extend (res )
84
94
85
- sequence .append (cfeatures )
86
- sfeatures : FeatureSet = collections .defaultdict (set )
95
+ sequence .append (call_capabilities . features )
96
+ sequence_features : FeatureSet = collections .defaultdict (set )
87
97
for call in sequence :
88
98
for feature , vas in call .items ():
89
- sfeatures [feature ].update (vas )
99
+ sequence_features [feature ].update (vas )
90
100
91
- _ , smatches = ruleset .match (Scope .SEQUENCE , sfeatures , ch .address )
101
+ _ , smatches = ruleset .match (Scope .SEQUENCE , sequence_features , ch .address )
92
102
for rule_name , res in smatches .items ():
93
103
sequence_matches [rule_name ].extend (res )
94
104
@@ -103,16 +113,23 @@ def find_thread_capabilities(
103
113
for va , _ in res :
104
114
capa .engine .index_rule_matches (features , rule , [va ])
105
115
106
- return features , matches , sequence_matches , call_matches
116
+ return ThreadCapabilities (features , matches , sequence_matches , call_matches )
117
+
118
+
119
+ @dataclass
120
+ class ProcessCapabilities :
121
+ process_matches : MatchResults
122
+ thread_matches : MatchResults
123
+ sequence_matches : MatchResults
124
+ call_matches : MatchResults
125
+ feature_count : int
107
126
108
127
109
128
def find_process_capabilities (
110
129
ruleset : RuleSet , extractor : DynamicFeatureExtractor , ph : ProcessHandle
111
- ) -> tuple [ MatchResults , MatchResults , MatchResults , MatchResults , int ] :
130
+ ) -> ProcessCapabilities :
112
131
"""
113
132
find matches for the given rules within the given process.
114
-
115
- returns: tuple containing (match results for process, match results for threads, match results for calls, number of features)
116
133
"""
117
134
# all features found within this process,
118
135
# includes features found within threads (and calls).
@@ -131,29 +148,29 @@ def find_process_capabilities(
131
148
call_matches : MatchResults = collections .defaultdict (list )
132
149
133
150
for th in extractor .get_threads (ph ):
134
- features , tmatches , smatches , cmatches = find_thread_capabilities (ruleset , extractor , ph , th )
135
- for feature , vas in features .items ():
151
+ thread_capabilities = find_thread_capabilities (ruleset , extractor , ph , th )
152
+ for feature , vas in thread_capabilities . features .items ():
136
153
process_features [feature ].update (vas )
137
154
138
- for rule_name , res in tmatches .items ():
155
+ for rule_name , res in thread_capabilities . thread_matches .items ():
139
156
thread_matches [rule_name ].extend (res )
140
157
141
- for rule_name , res in smatches .items ():
158
+ for rule_name , res in thread_capabilities . sequence_matches .items ():
142
159
sequence_matches [rule_name ].extend (res )
143
160
144
- for rule_name , res in cmatches .items ():
161
+ for rule_name , res in thread_capabilities . call_matches .items ():
145
162
call_matches [rule_name ].extend (res )
146
163
147
164
for feature , va in itertools .chain (extractor .extract_process_features (ph ), extractor .extract_global_features ()):
148
165
process_features [feature ].add (va )
149
166
150
167
_ , process_matches = ruleset .match (Scope .PROCESS , process_features , ph .address )
151
- return process_matches , thread_matches , sequence_matches , call_matches , len (process_features )
168
+ return ProcessCapabilities ( process_matches , thread_matches , sequence_matches , call_matches , len (process_features ) )
152
169
153
170
154
171
def find_dynamic_capabilities (
155
172
ruleset : RuleSet , extractor : DynamicFeatureExtractor , disable_progress = None
156
- ) -> tuple [ MatchResults , Any ] :
173
+ ) -> Capabilities :
157
174
all_process_matches : MatchResults = collections .defaultdict (list )
158
175
all_thread_matches : MatchResults = collections .defaultdict (list )
159
176
all_sequence_matches : MatchResults = collections .defaultdict (list )
@@ -170,21 +187,21 @@ def find_dynamic_capabilities(
170
187
) as pbar :
171
188
task = pbar .add_task ("matching" , total = n_processes , unit = "processes" )
172
189
for p in processes :
173
- process_matches , thread_matches , sequence_matches , call_matches , feature_count = find_process_capabilities (
174
- ruleset , extractor , p
175
- )
190
+ process_capabilities = find_process_capabilities (ruleset , extractor , p )
176
191
feature_counts .processes += (
177
- rdoc .ProcessFeatureCount (address = frz .Address .from_capa (p .address ), count = feature_count ),
192
+ rdoc .ProcessFeatureCount (
193
+ address = frz .Address .from_capa (p .address ), count = process_capabilities .feature_count
194
+ ),
178
195
)
179
- logger .debug ("analyzed %s and extracted %d features" , p .address , feature_count )
196
+ logger .debug ("analyzed %s and extracted %d features" , p .address , process_capabilities . feature_count )
180
197
181
- for rule_name , res in process_matches .items ():
198
+ for rule_name , res in process_capabilities . process_matches .items ():
182
199
all_process_matches [rule_name ].extend (res )
183
- for rule_name , res in thread_matches .items ():
200
+ for rule_name , res in process_capabilities . thread_matches .items ():
184
201
all_thread_matches [rule_name ].extend (res )
185
- for rule_name , res in sequence_matches .items ():
202
+ for rule_name , res in process_capabilities . sequence_matches .items ():
186
203
all_sequence_matches [rule_name ].extend (res )
187
- for rule_name , res in call_matches .items ():
204
+ for rule_name , res in process_capabilities . call_matches .items ():
188
205
all_call_matches [rule_name ].extend (res )
189
206
190
207
pbar .advance (task )
@@ -199,8 +216,8 @@ def find_dynamic_capabilities(
199
216
rule = ruleset [rule_name ]
200
217
capa .engine .index_rule_matches (process_and_lower_features , rule , locations )
201
218
202
- all_file_matches , feature_count = find_file_capabilities (ruleset , extractor , process_and_lower_features )
203
- feature_counts .file = feature_count
219
+ all_file_capabilities = find_file_capabilities (ruleset , extractor , process_and_lower_features )
220
+ feature_counts .file = all_file_capabilities . feature_count
204
221
205
222
matches = dict (
206
223
itertools .chain (
@@ -211,12 +228,8 @@ def find_dynamic_capabilities(
211
228
all_sequence_matches .items (),
212
229
all_thread_matches .items (),
213
230
all_process_matches .items (),
214
- all_file_matches .items (),
231
+ all_file_capabilities . matches .items (),
215
232
)
216
233
)
217
234
218
- meta = {
219
- "feature_counts" : feature_counts ,
220
- }
221
-
222
- return matches , meta
235
+ return Capabilities (matches , feature_counts )
0 commit comments