-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathLegion.py
executable file
·1701 lines (1484 loc) · 66.1 KB
/
Legion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
import sys
import argparse
import cProfile
import datetime
import enum
import gc
import logging
import os
# import pdb
import random
import signal
import struct
import subprocess as sp
import time
from contextlib import closing
from math import sqrt, log, ceil, inf
# from memory_profiler import profile
from multiprocessing import Pool, cpu_count
from typing import Dict, List, Tuple
from types import GeneratorType
from angr import Project
from angr.errors import SimUnsatError
from angr.sim_state import SimState as State
from angr.storage.file import SimFileStream
from angr.sim_options import LAZY_SOLVES
from z3.z3types import Z3Exception
VERSION = "0.1-testcomp2020"
if __name__ == '__main__':
if len(sys.argv) == 2 and sys.argv[1] == '--version':
print(VERSION)
sys.exit(0)
# Hyper-parameters
MIN_SAMPLES = 3
MAX_SAMPLES = 100
TIME_COEFF = 0
Cp = 1 / sqrt(2)
RHO = 2 * Cp
RAN_SEED = None
SYMEX_TIMEOUT = None # in secs
CONEX_TIMEOUT = None # in secs
MAX_BYTES = 1000 # Max bytes per input
TREE_DEPTH_LIMIT = 100000000 # INT_MAX is 2147483647, a large value will cause a compilation error
# Budget
MAX_PATHS = float('inf')
MAX_ROUNDS = float('inf')
CORE = 1
MAX_TIME = 0
FOUND_BUG = False # type: bool
COVERAGE_ONLY = True
PERSISTENT = False
# Statistics
CUR_ROUND = 0
TIME_START = time.time()
SEED_IN_COUNT = 0
SOL_GEN_COUNT = 0
FUZ_GEN_COUNT = 0
RND_GEN_COUNT = 0
SYMEX_TIMEOUT_COUNT = 0
CONEX_TIMEOUT_COUNT = 0
SYMEX_TIME = 0
CONEX_TIME = 0
SYMEX_SUCCESS_COUNT = 0
CONEX_SUCCESS_COUNT = 0
MIN_TREE_DEPTH = inf
MAX_TREE_DEPTH = 0
SUM_TREE_DEPTH = 0
SOLV_TIME = 0
APPF_TIME = 0
RAND_TIME = 0
SOLV_COUNT = 0
APPF_COUNT = 0
RAND_COUNT = 0
SOLV_NEW = 0
APPF_NEW = 0
RAND_NEW = 0
SOLV_EXP = 0
APPF_EXP = 0
APPF_EQL = 0
PROFILE = False
COLLECT_STATISTICS = False
# Execution
INSTR_BIN = None
DIR_NAME = None
SEEDS = []
BUG_RET = 100 # the return code when finding a bug
SAVE_TESTINPUTS = []
SAVE_TESTCASES = []
DEFAULT_ADDR = -1
INPUTS = [] # type: List
MSGS = [] # type: List
TIMES = [] # type: List
# cache Node
# ROOT = TreeNode() # type: TreeNode or None
# Logging
LOGGER = logging.getLogger("Legion")
LOGGER.setLevel(logging.ERROR)
sthl = logging.StreamHandler()
sthl.setFormatter(fmt=logging.Formatter('%(message)s'))
LOGGER.addHandler(sthl)
logging.getLogger('angr').setLevel('ERROR')
SCORE_FUN = 'uct'
# Colour of tree nodes
class Colour(enum.Enum):
W = 'White'
R = 'Red'
G = 'Gold'
B = 'Black'
P = 'Purple'
# TreeNode:
class TreeNode:
"""
Colour | TraceJump | ANGR | Symex state
White | True | Undetermined | Undetermined
Red | True | True | False, stored in its simulation child
Gold | False | False | True, stores its parent's state
Black | True | no sibling | True if is intermediate, False if is leaf
Purple | False | True | True, only showed up in ANGR, not found by TraceJump
"""
def __init__(self, addr: int = DEFAULT_ADDR, parent: 'TreeNode' = None,
colour: Colour = Colour.W,
state: State = None, samples: iter = None):
# identifier
self.addr = addr # type: int
# Tree relations
self.parent = parent # type: TreeNode
self.children = {} # type: Dict[int or str, TreeNode]
# classifiers
self.colour = colour # type: Colour
self.phantom = False # type: bool
# concolic execution
self.state = state # type: State
self.samples = samples # type: iter
# statistics
self.sel_try = 0
self.sel_win = 0
self.sim_try = 0
self.sim_win = 0
# accumulated time spent on APPFuzzing the node
self.accumulated_time = 0
# the subtree beneath the node has been fully explored
self.fully_explored = False
self.exhausted = False
def child(self, name) -> 'TreeNode' or None:
"""
Get the child whose hex(addr) matches with the name
:param name: the hex(addr) of the child
:return: the matching child
"""
for child in self.children.values():
if hex(child.addr)[-len(name):] == name:
return child
return None
def sim_state(self) -> State or None:
"""
SimStates of red nodes are stored in their simualtion child
SimStates of white nodes are None
SimStates of black/gold nodes are stored in them
:return: the symbolic state of the node
"""
if self.colour is Colour.R:
return self.children['Simulation'].state
return self.state
def constraints(self) -> List:
"""
:return: the path constraints of the node/state
"""
return self.sim_state().solver.constraints \
if self.sim_state() else "No SimState"
def exploit_score(self) -> float:
# Evaluate to maximum value if not tried before
if not self.sel_try:
return inf
return self.sim_win / self.sel_try
def explore_score(self) -> float:
# If the exploration ratio rho is 0, then return 0
if RHO == 0:
return 0
# Evaluate to maximum value if is root
if self.is_root():
return inf
# Evaluate to maximum value if not tried before
if not self.sel_try:
return inf
# If the exploration ratio is 0, then explore score is 0
# This is to avoid 0 * inf = nan
if RHO == 0:
return 0
return RHO * sqrt(2 * log(self.parent.sel_try) / self.sel_try)
def score(self) -> float:
def time_penalisation() -> float:
"""
:return: Average constraint solving time / Expected sample number
"""
return average_constraint_solving_time() / expected_sample_num()
def average_constraint_solving_time() -> float:
"""
:return: Accumulated con-sol time / accumulated con-sol count
"""
# For the first time selected, it takes ceil(log(MIN_SAMPLES, 2))
# to gather MIN_SAMPLES samples
# For the rest, it takes 1 (estimated value)
count = ceil(log(MIN_SAMPLES, 2) + self.sel_try - 1)
return self.accumulated_time / count
def expected_sample_num() -> float:
"""
The first time it should return at least MIN_SAMPLES
the rest doubles the number of all solutions
:return: MIN_SAMPLES * 2 ^ number of times sampled
"""
return min(MAX_SAMPLES, MIN_SAMPLES * pow(2, self.sel_try))
# # Evaluate to minimum value if block node does not have any non-simulation child
# if self.colour is not Colour.G and (len(self.children.values()) - ('Simulation' in self.children)) == 0:
# return -inf
# Evaluate to minimum value if fully explored
if self.is_fully_explored():
return -inf
# Fish bone optimisation: if a simulation child
# has only one sibling X who is not fully explored,
# and X is not white (so that all siblings are found)
# then do not simulate from that simulation child but only from X
# as all new paths can only come from X
if self.colour is Colour.G and len(self.parent.children) > 1 \
and len([child for child in self.parent.children.values() if
child is not self and child.score() > -inf
and child.colour is not Colour.W]) == 1:
return -inf
if SCORE_FUN == 'random':
score = random.uniform(0, 100)
else:
debug_assertion(SCORE_FUN == 'uct')
uct_score = self.exploit_score() + self.explore_score()
score = uct_score - TIME_COEFF * time_penalisation() \
if TIME_COEFF else uct_score
return score
def is_fully_explored(self):
if PERSISTENT:
if ROOT.fully_explored and self is not ROOT:
return (self.is_leaf() and self.colour is not Colour.G) \
or self.exhausted
return self.fully_explored
def mark_fully_explored(self):
"""
Mark a node fully explored
If the node is simulation node, mark its parent fully explored
If the node is red, mark its simulation child fully explored
If all block siblings are fully explored, mark its parent fully explored
:return:
"""
if self.colour is Colour.W:
# White node might have an unrevealed sibling
# which can only be found by symex it
return
if not all([c.is_fully_explored() for c in self.children.values()
if c.colour is not Colour.G]):
# If not all children all fully explored, don't mark it
# exclude simulation child here.
return
# if not self.sel_try and self.colour is Colour.R:
if self.phantom:
# This line makes sure that we will simulate on every phantom node
# at least once to discover the path beneath them:
# 1. Black nodes cannot be a phantom, cause phantoms must have
# a sibling (phantoms are found when symex to their siblings).
# 2. Gold nodes do not have any sibling before the first
# execution, it will be picked even if it is fully explored.
# 3. Red nodes should not be marked fully explored before
# testing out at once, in case it is a phantom
return
LOGGER.info("Mark fully explored {}".format(self))
self.fully_explored = True
# if self.colour is Colour.G:
# self.parent.is_fully_explored() = True
if self.colour is Colour.R and self is not ROOT:
LOGGER.info("Red parent Fully explored {}".format(self.children['Simulation']))
self.children['Simulation'].fully_explored = True
if self.parent:
self.parent.mark_fully_explored()
def best_child(self) -> 'TreeNode':
"""
Select the child of the highest uct score, break tie uniformly
:return: a tree node
"""
LOGGER.info("Selecting from children: {}".format(self.children.values()))
# TODO: more elegant method, if time permitted
max_score, candidates = -inf, [] # type: float, List[TreeNode]
for child in self.children.values():
cur_score = child.score()
if cur_score == max_score:
candidates.append(child)
continue
if cur_score > max_score:
max_score = cur_score
candidates = [child]
return random.choice(candidates) if candidates else None
def is_root(self) -> bool:
"""
All node except the root should have a parent
:return: if the node is root
"""
return not self.parent
def is_leaf(self) -> bool:
"""
If the node has no other child than simulation node,
and it is not a phantom. then it is a leaf
:return: whether the node is a leaf
"""
no_child_or_only_gold = not self.children \
or all([child.colour == Colour.G for child in self.children.values()])
return not self.phantom and no_child_or_only_gold
def dye(self, colour: Colour,
state: State = None, samples: iter = None) -> None:
"""
Dye a node
:param colour: the colour to dye to
:param state: the state to be attached
:param samples: the samples to be attached
:return:
"""
# Don't double dye a node
debug_assertion(self.colour is Colour.W)
# All colours should come with a state, except black
debug_assertion(bool(colour is Colour.B) or bool(state))
self.colour = colour
if colour is Colour.R:
# No pre-existing simulation child
debug_assertion('Simulation' not in self.children)
self.add_child(key='Simulation',
new_child=TreeNode(addr=self.addr, parent=self))
self.children['Simulation'].dye(
colour=Colour.G, state=state, samples=samples)
return
# Black, Gold, or Purple
self.state = state
self.samples = samples
def is_diverging(self) -> bool:
"""
If the node has more than one child, except simulation
:return: True if there are more than one
"""
return len(self.children) > ('Simulation' in self.children) + 1
def mutate(self):
if self.state and self.state.solver.constraints:
results = self.app_fuzzing()
return results
return self.random_fuzzing()
def app_fuzzing(self) -> List[Tuple[bytes, str]]:
def byte_len() -> int:
"""
The number of bytes in the input
:return: byte length
"""
return (target.size() + 7) // 8
global SOLV_EXP, SOLV_TIME, SOLV_COUNT, APPF_EXP, APPF_TIME, APPF_COUNT
# Note: Once we fuzz a simulation child,
# its parent is no longer a phantom
# This is important as we do not mark phantom fully explored
self.parent.phantom = False
target = self.state.posix.stdin.load(0, self.state.posix.stdin.size)
results = []
if not self.samples:
self.samples = self.state.solver.iterate(target)
if type(self.samples) is not GeneratorType:
# Note: self.samples might not be an iterator in some cases
# e.g. when solving for the wrong thing
# which happened before when the constraint is solving for the
# number of args
self.exhausted = True
self.fully_explored = True
return results
# Denotes the generation method for each input
# S: constraint solving; F: fuzzing; R: random generation
method = "S"
while len(results) < MAX_SAMPLES:
try:
if COLLECT_STATISTICS:
start = time.time()
val = next(self.samples)
if COLLECT_STATISTICS:
end = time.time()
if method == "S":
SOLV_COUNT += 1
SOLV_TIME += (end - start)
print("AVG_SOLV_TIME: {}".format(SOLV_TIME/SOLV_COUNT))
print("SOLV_COUNT: {}".format(SOLV_COUNT))
elif method == "F":
APPF_COUNT += 1
APPF_TIME += (end - start)
print("AVG_APPF_TIME: {}".format(APPF_TIME/APPF_COUNT))
print("APPF_COUNT: {}".format(APPF_COUNT))
if val is None and len(results) >= MIN_SAMPLES:
# next val requires constraint solving and enough results
break
if val is None and len(results) < MIN_SAMPLES:
# requires constraint solving but not enough results
method = "S"
continue
result = (val.to_bytes(byte_len(), 'big'), method)
results.append(result)
method = "F"
except Z3Exception as e:
# NOTE: In case of z3.z3types.Z3Exception: b'maximization suspended'
# TODO: May have a better way to solve this, e.g. redo sampling?
LOGGER.warning("Z3Exception in APPF: {}".format(e))
LOGGER.info("Redo APPF sampling")
if COLLECT_STATISTICS:
end = time.time()
if method == "S":
SOLV_EXP += 1
SOLV_TIME += (end-start)
print("AVG_SOLV_TIME: {}".format(SOLV_TIME/max(1, SOLV_COUNT) ))
print("SOLV_EXCPTION: {}".format(SOLV_EXP))
elif method == "F":
APPF_EXP += 1
APPF_TIME += (end-start)
print("AVG_APPF_TIME: {}".format(APPF_TIME/max(1, APPF_COUNT)))
print("APPF_EXCPTION: {}".format(APPF_EXP))
# LOGGER.info("Exhausted {}".format(self))
# LOGGER.info("Fully explored {}".format(self))
# self.fully_explored = True
# self.exhausted = True
# self.parent.exhausted = True
# break
except StopIteration:
# NOTE: Insufficient results from APPFuzzing:
# Case 1: break in the outside while:
# Not more input available from constraint solving
# Implies no more undiscovered path in its subtree
# should break
# Case 2: break in the inside while:
# No more solution available from the current sigma
# Needs to restart from a new sigma
# should continue, and may trigger case 1 next.
# even if not, the next constraint solving will take long
# as it has to exclude all past solutions
# Assume Case 1 for simplicity
if COLLECT_STATISTICS:
end = time.time()
if method == "S":
SOLV_COUNT += 1
SOLV_TIME += (end-start)
print("AVG_SOLV_TIME: {}".format(SOLV_TIME/max(1, SOLV_COUNT)))
print("SOLV_COUNT: {}".format(SOLV_COUNT))
elif method == "F":
APPF_COUNT += 1
APPF_TIME += (end-start)
print("AVG_APPF_TIME: {}".format(APPF_TIME/max(1, APPF_COUNT)))
print("APPF_COUNT: {}".format(SOLV_COUNT))
# Note: If the state of the simulation node is unsatisfiable
# then this will occur in the first time the node is selected
LOGGER.info("Exhausted {}".format(self))
LOGGER.info("Fully explored {}".format(self))
self.fully_explored = True
self.exhausted = True
self.parent.exhausted = True
# NOTE: In some case, no input can be found from the simul child
# even if its red parent is considered as feasible, weird.
# In this case, parent.sel_try is 0, which prevents it to
# be marked as fully explored with
# self.parent.mark_fully_explored()
# block_sibs = [c for c in self.parent.children.values()
# if c.colour is not Colour.G]
# if not block_sibs:
# self.parent.is_fully_explored() = True
# Note: Should not mark parent fully explored
# as 1) there may be a path although no input was found
# 2) this exception occurs when NO ENOUGH inputs were found
# which does not imply no input was found
# here there could be a child to be selected in the
# next iteration
# self.parent.mark_fully_explored()
break
return results
@staticmethod
def random_fuzzing() -> List[Tuple[bytes, str]]:
def random_bytes():
global RAND_TIME, RAND_COUNT
LOGGER.debug("Generating random {} bytes".format(MAX_BYTES))
# input_bytes = b''
# for _ in range(MAX_BYTES):
# input_bytes += os.urandom(1)
# return input_bytes
# Or return end of file char?
if COLLECT_STATISTICS:
start = time.time()
random_input_bytes = os.urandom(MAX_BYTES)
if COLLECT_STATISTICS:
end = time.time()
RAND_COUNT += 1
RAND_TIME += (end - start)
print("AVG_RAND_TIME: {}".format(RAND_TIME / RAND_COUNT))
print("RAND_COUNT: {}".format(RAND_COUNT))
return random_input_bytes
return [(random_bytes(), "R") for _ in range(MIN_SAMPLES)]
def add_child(self, key: str or int, new_child: 'TreeNode') -> None:
debug_assertion((key == 'Simulation') ^ (key == new_child.addr))
self.children[key] = new_child
def match_child(self, addr: int) -> Tuple[bool, 'TreeNode']:
"""
Check if the addr matches to an existing child:
if not, it corresponds to a new path, add the addr as a child
:param addr: the address to check
:return: if the addr corresponds to a new path
"""
# check if the addr corresponds to a new path:
# Note: There are two cases for addr to be new:
# 1. addr is a phantom child
# 2. addr is not a child of self
child = self.children.get(addr)
if child:
is_phantom = child.phantom
child.phantom = False
return is_phantom, child
child = TreeNode(addr=addr, parent=self)
self.add_child(key=addr, new_child=child)
return True, child
def print_path(self) -> List[str]:
"""
print all address from root to the current node
:return: a list of addresses
"""
path, parent = [], self
while parent:
path.append(parent.addr)
parent = parent.parent
return path[::-1]
def pp(self, indent: int = 0,
mark: 'TreeNode' = None, found: int = 0, forced: bool = False):
if LOGGER.level > logging.INFO and not forced:
return
s = ""
for _ in range(indent - 1):
s += "| "
if indent > 15 and self.parent and self.parent.colour is Colour.W:
LOGGER.info("...")
return
if indent:
s += "|-- "
s += str(self)
if self == mark:
s += "\033[1;32m <=< found {}\033[0;m".format(found)
LOGGER.info(s)
if self.children:
indent += 1
for _, child in sorted(list(self.children.items()),
key=lambda k: str(k)):
child.pp(indent=indent, mark=mark, found=found, forced=forced)
def repr_node_name(self) -> str:
# return ("Simul: " if self.colour is Colour.G else
# "Block: " if self.parent else "@Root: ") \
# + (hex(self.addr)[-4:] if self.addr else "None")
return hex(self.addr)[-4:] if self.addr else "None"
def repr_node_data(self) -> str:
"""
UCT = sim_win / sel_try
+ RHO * sqrt(2 * log(self.parent.sel_try) / self.self_try)
:return:
"""
return "{uct:.2f} = {explore:.2f}({simw}/{selt}) " \
"+ {exploit:.2f}(sqrt(log({pselt})/{selt})" \
.format(uct=self.score(),
explore=self.exploit_score(),
exploit=self.explore_score(),
simw=self.sim_win,
selt=self.sel_try,
pselt=self.parent.sel_try if self.parent else None,
simt=self.sim_try)
# return "{uct:.2f} = {simw}/{selt} " 1\
# "+ 2*{r:.2f}*sqrt(log({pselt})/{simt}) " \
# "- {t:.2f}*{at:.2f}/({selt}+log({MS}, 2)-1)/{MS}*2^{selt})" \
# .format(uct=self.score(), simw=self.sim_win, selt=self.sel_try,
# r=RHO, pselt=self.parent.sel_try if self.parent else inf,
# simt=self.sim_try,
# t=TIME_COEFF, at=self.accumulated_time, MS=MIN_SAMPLES)
def repr_node_state(self) -> str:
return "{}".format(self.sim_state()) if self.sim_state() else "NoState"
def __repr__(self) -> str:
return '\033[1;{colour}m{name}: {data}, {state}\033[0m' \
.format(colour=30 if self.colour is Colour.B else
35 if self.phantom else
31 if self.colour is Colour.R else
33 if self.colour is Colour.G else
37 if self.colour is Colour.W else 32,
name=self.repr_node_name(),
state=self.repr_node_state(),
data=self.repr_node_data())
ROOT = TreeNode()
def consider_tree_fully_explored() -> bool:
return ROOT.is_fully_explored() and not PERSISTENT
def run() -> None:
"""
The main function
"""
initialisation()
ROOT.pp()
while has_budget():
mcts()
def initialisation():
def init_angr():
LOGGER.info("Initialising ANGR Project")
return Project(thing=INSTR_BIN,
ignore_functions=['printf',
'__trace_jump',
'__trace_jump_set'
],
)
def init_root() -> TreeNode:
"""
NOTE: prepare the root (dye red, add simulation child)
otherwise the data in simulation stage of SEEDs
cannot be recorded without building another special case
recorded in the simulation child of it.
Cannot dye root with dye_to_the_next_red() as usual, as:
1. The addr of root will not be known before simulation
2. The function requires a red node
in the previous line of the node to dye,
which does not exist for root
"""
# Assert all traces start with the same address (i.e. main())
firsts = [trace for trace in zip(*traces)][0]
# Note: Relies on the first trace being correct at all times.
main_addr = -1
for first in firsts:
if first == DEFAULT_ADDR:
# Having the DEFAULT_ADDR means binary execution did not find a meaningful address
continue
main_addr = first
# debug_assertion(main_addr != DEFAULT_ADDR)
if main_addr != DEFAULT_ADDR:
# NOTE: a meaningful address for root has been found
# Jump to the state of main_addr
project = init_angr()
# Noted: Tested angr on symbolic argc, failed
# main_state = project.factory.entry_state(
# addr=main_addr,
# stdin=SimFileStream,
# argc=claripy.BVS('argc', 100*8)
# )
main_state = project.factory.blank_state(addr=main_addr,
stdin=SimFileStream,
add_options={LAZY_SOLVES}
)
else:
# Switch to random fuzzing
main_state = None
# sp.run([args.cc, "-no-pie", "-O0", "-o", INSTR_BIN, verifier_c, "__VERIFIER_assume.c", source])
root = TreeNode(addr=main_addr)
root.dye(colour=Colour.R, state=main_state)
LOGGER.info("ROOT created")
return root
global ROOT
LOGGER.info("Simulating on the seeded inputs")
traces, test_cases, test_inputs = simulation(node=None)
LOGGER.info("Initialising the ROOT")
ROOT = init_root()
LOGGER.info("Expanding the tree with paths taken by seeded inputs")
are_new = expansion(traces=traces)
LOGGER.info("Propagating the first results")
propagation(node=ROOT.children['Simulation'], traces=traces,
are_new=are_new)
save_results_to_files(test_cases, test_inputs, are_new)
def has_budget() -> bool:
"""
Control whether to terminate mcts or not
:return: True if terminate
"""
return not FOUND_BUG \
and not consider_tree_fully_explored() \
and ROOT.sim_win < MAX_PATHS \
and CUR_ROUND < MAX_ROUNDS
def mcts():
"""
The four steps of MCTS
"""
global SOLV_NEW, APPF_NEW, RAND_NEW, APPF_EQL
node = selection()
if node is ROOT:
return
traces, test_cases, test_inputs = simulation(node=node)
are_new = expansion(traces=traces)
if COLLECT_STATISTICS:
get_trace = lambda cur_node: [cur_node.addr] + get_trace(cur_node.parent) if cur_node else []
selection_path = get_trace(node)[:-1][::-1]
if traces[:len(selection_path)] == selection_path:
APPF_EQL += 1
print("APPF_EQL: {}".format(APPF_EQL))
for i in range(len(are_new)):
if test_cases[i][-1][-1] == "R":
RAND_NEW += are_new[i]
if test_cases[i][-1][-1] == "S":
SOLV_NEW += are_new[i]
if test_cases[i][-1][-1] == "F":
APPF_NEW += are_new[i]
print("RAND_NEW: {}".format(RAND_NEW))
print("SOLV_NEW: {}".format(SOLV_NEW))
print("APPF_NEW: {}".format(APPF_NEW))
debug_assertion(len(traces) == len(are_new))
propagation(node=node, traces=traces, are_new=are_new)
ROOT.pp(mark=node, found=sum(are_new))
save_results_to_files(test_cases, test_inputs, are_new)
def selection() -> TreeNode:
"""
Repeatedly apply tree policy until a simulation node is selected
# :param node: the node to start selection on
:return: nodes along the selection path
"""
# def dye_node(target: TreeNode) -> List[State]:
# """
# Since the target is white, dye it and its siblings
# :param target: the node to dye
# :return: the states left after dying (i.e. because the node is black)
# """
# # states = dye_siblings(child=target)
# #
# # if target.colour is Colour.R:
# # # Add the states left as phantom child of the target's parent
# # add_children(parent=target.parent, states=states)
# # # NOTE: if the node is dyed to red,
# # # it means all states left must belong to its siblings
# # states = []
# # return states
def reach_symex_timeout() -> bool:
LOGGER.info("symex time available: {}/{}".format(symex_time, SYMEX_TIMEOUT))
return SYMEX_TIMEOUT and symex_time >= SYMEX_TIMEOUT
global SYMEX_TIME, SYMEX_TIMEOUT_COUNT, SYMEX_SUCCESS_COUNT
symex_time = 0
last_red = ROOT
node = ROOT
selection_start_time = time.time()
while node.colour is not Colour.G:
if node.colour is Colour.R:
last_red = node
# Note: Must check this before dying,
# otherwise the phantom red nodes which are added when
# dying their sibling will be wrongly marked as fully explored
if node.is_leaf() and node.colour is Colour.B:
# NOTE: a red/white leaf may have unknown siblings
node.mark_fully_explored()
# If the node is white, dye it
if node.colour is Colour.W:
dye_start_time = time.time()
try:
dye_siblings(child=node) # Upon an exception, mark this node fully explored
except Z3Exception:
LOGGER.info("Z3 exception occurred in symex, any type casting in program")
node.fully_explored = True
node.exhausted = True
node.parent.mark_fully_explored()
except SimUnsatError:
# NOTE: angr.errors.SimUnsatError: Got an unsat result, caused by
# claripy.errors.UnsatError: CompositeSolver is already unsat
LOGGER.info("claripy.errors.UnsatError: CompositeSolver is already unsat")
node.fully_explored = True
node.exhausted = True
node.parent.mark_fully_explored()
symex_time += time.time() - dye_start_time
# # IF the node is dyed to black and there is no states left,
# # it implies the previous parent state does not have any diverging
# # descendants found by `compute_to_diverging()`, hence the rest of the
# # tree must be fully explored, and there is no difference in fuzzing
# # any of them
# if node.colour is Colour.B and not states_left:
# LOGGER.info("Fully explored {}".format(node))
# node.is_fully_explored() = True
if reach_symex_timeout():
LOGGER.info(
"Symex timeout, choose the simulation child of the last red {}".format(last_red))
node = last_red.children['Simulation']
SYMEX_TIMEOUT_COUNT += 1
if COLLECT_STATISTICS:
print("SYMEX_TIMEOUT count: {}".format(SYMEX_TIMEOUT_COUNT))
break
if node.is_leaf():
LOGGER.info("Leaf reached before tree policy: {}".format(node))
LOGGER.info("Fully explored {}".format(node))
node.fully_explored = True
if node.parent:
# NOTE: the if condition above makes sure there is parent to set
# the check is trivial in most cases
# but handles the case when the ROOT is a leaf
# e.g. the program crashes right after entry because of allocating too much memory
node.parent.mark_fully_explored()
# If the node's score is the minimum, return ROOT to restart
if node.is_fully_explored() and node is not ROOT:
return ROOT
node = tree_policy(node=node)
if node.is_leaf() and node.colour is Colour.R:
# Note: There is no point fuzzing a red leaf,
# as it will not have any child
# (assuming no trace is a prefix of another)
# Mark the red leaf fully explored and check its parent
# restart the selection from ROOT
LOGGER.info("Leaf reached after tree policy: {}".format(node))
LOGGER.info("Fully explored {}".format(node))
node.fully_explored = True
node.parent.mark_fully_explored()
if node.is_fully_explored():
# NOTE: If, for some reason, the node selected if fully explored
# then we ASSUME its parent is fully explored
# but not correctly marked as fully explored
# return ROOT to re-launch selection stage
# pdb.set_trace()
if ROOT.fully_explored and node.parent:
node.parent.exhausted = True
node.parent.mark_fully_explored()
return ROOT
# the node selected by tree policy should not be None
debug_assertion(node is not None)
LOGGER.info("Select: {}".format(node))
debug_assertion(node.colour is Colour.G)
selection_end_time = time.time()
SYMEX_TIME += (selection_end_time - selection_start_time)
SYMEX_SUCCESS_COUNT += 1
if COLLECT_STATISTICS:
print("SYMEX_TIME: {:.4f}".format(SYMEX_TIME))
print("SYMEX_SUCCESS count: {}".format(SYMEX_SUCCESS_COUNT))
print("SYMEX_TIME_AVG: {:.4f}".format(SYMEX_TIME / SYMEX_SUCCESS_COUNT))
return node
def tree_policy(node: TreeNode) -> TreeNode:
"""
Select the best child of the node
:param node: the node to select child from
:return: the child selected
"""
return node.best_child()
def dye_siblings(child: TreeNode) -> None:
"""
If a child of the parent is found white,
then all children of the parent must also be white;
This function dyes them all.
:param child: the node to match
:return: a list of states that
do not match with any of the existing children of the parent,
they correspond to the child nodes of the parent who
have not been found by TraceJump
"""
#
# # Case 1: parent is red, then execute parent's state to find states of sibs
# # Case 2: parent is black, use the states left to dye siblings
# # Either 1 or 2, not both
# debug_assertion((parent.colour is Colour.R) ^ bool(target_states))
# if child.parent.colour is Colour.R:
# debug_assertion(not target_states)
# parent_state = parent.children['Simulation'].state
# target_states = symex_to_match(state=parent_state, addr=child.addr)
#
# # NOTE: Empty target states implies
# # the symbolic execution has reached the end of program
# # without seeing any divergence after the parent's state
# # hence the parent is fully explored
# # Note: a single target state does not mean the parent is fully explored
# # It may be the case where the target is the only feasible child,
# # but the target has other diverging child states
# if not target_states:
# pdb.set_trace()
# parent.is_fully_explored() = True
sibling_states = symex_to_match(target=child)
# Note: dye child according to the len of sibling_states:
# 1. len is 0, the child's parent must have been fully explored.
# 2. len is 1, the child should be dyed black, as it is the only feasible child
# 3. len >= 2, the child should be dyed red, add phantom if needed
if not sibling_states:
# No state is found, no way to explore deeper on this path
# Ideally, no diverging tree node should exist beneath the parent of the child.
# hence mark the child fully explored, and trace back to ancestors
LOGGER.info("No state found: {}".format(child))
# if hex(child.addr)[-4:] == '0731':
# pdb.set_trace()
child.fully_explored = True
child.parent.mark_fully_explored()
if len(sibling_states) == 1:
state = sibling_states.pop()
# This should only happen if the only state in sibling_states matches with the child
debug_assertion(child.addr == state.addr)
# No gold node for black nodes, hence no simulation will start from black ones
# No sibling node for black ndoes, hence they will never be compared with other nodes,