-
Notifications
You must be signed in to change notification settings - Fork 10
/
MainPresenter.py
104 lines (92 loc) · 3.42 KB
/
MainPresenter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import CoreModel
import Parser
import threading
import queue
import datetime
from PyQt5.Qt import *
from netaddr import IPNetwork
class MainPresenter:
def __init__(self, ui):
self.ui = ui
self.threads = []
self.isScanEnabled = False
self.queue = queue.Queue()
def startScan(self, ipRanges, portsStr, threadNumber, timeout):
if timeout == '':
timeout = '3'
cidrIPRanges = Parser.getCIDRFromRanges(ipRanges)
ports = Parser.getPortsFromString(portsStr)
ips = []
for cidr in cidrIPRanges[0]:
for ip in IPNetwork(cidr):
ips.append(str(ip))
for ip in ips:
self.queue.put(ip)
for i in range(int(threadNumber)):
self.threads.append(ScanThread(self.queue, ports, timeout, self))
self.setCurrentThreadsLabel(len(self.threads))
for thread in self.threads:
thread.signal.connect(self.setLogText)
thread.exit_signal.connect(self.on_thread_exit)
thread.start()
def on_thread_exit(self, is_last):
if is_last == True:
self.isScanEnabled = False
self.ui.startButton.setText("Start")
return
count = 0
for thr in self.threads:
if thr.is_running == True:
count = count + 1
self.setCurrentThreadsLabel(count)
def stopScan(self):
self.isScanEnabled = False
for thread in self.threads:
thread.exit()
thread.is_running = False
count = 0
is_last_thread = False
for i in self.threads:
if i.is_running != True:
count += 1
if count == len(self.threads):
is_last_thread = True
thread.exit_signal.emit(is_last_thread)
self.threads.clear()
self.ui.currentThreadsLabel.setText("0")
self.queue = queue.Queue()
def setLogText(self, string):
self.ui.dataText.append("[" + str(datetime.datetime.now()) + "] " + str(string))
def setCurrentThreadsLabel(self, threadNumber):
self.ui.currentThreadsLabel.setText(str(threadNumber))
class ScanThread(QThread):
signal = pyqtSignal(str)
exit_signal = pyqtSignal(bool)
def __init__(self, scanQueue, ports, timeout, presenter, parent=None):
QThread.__init__(self, parent)
self.scanQueue = scanQueue
self.coreModel = CoreModel.CoreModel(timeout)
self.ports = ports
self._stop_event = threading.Event()
self.timeout = timeout
self.presenter = presenter
self.is_running = True
def run(self):
while True:
if self.scanQueue.empty():
self.is_running = False
count = 0
is_last_thread = False
for i in self.presenter.threads:
if i.isRunning() != True:
count += 1
if count == len(self.presenter.threads):
is_last_thread = True
self.exit_signal.emit(is_last_thread)
self.exit(1)
hostObject = self.scanQueue.get()
open_ports = self.coreModel.scanIP(str(hostObject), self.ports)
signalStr = ', '.join(open_ports)
if signalStr != '':
self.signal.emit(str(hostObject) + ' has open ports: ' + signalStr)
self.scanQueue.task_done()