-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflow_monitor.py
159 lines (148 loc) · 5.16 KB
/
flow_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-
from threading import Event, Thread
from psutil import process_iter
from scapy.sendrecv import sniff
class Monitor:
"""
流量监测
"""
# 程序使用的端口
process_ports = []
# 监测系统是否开始
start_flag = Event()
window = None
def __init__(self, window):
self.window = window
self.start_flag.set()
def getProcessList(self):
"""
获取有网络连接的进程列表
"""
process_list = set()
for proc in process_iter():
connections = proc.connections()
if connections:
process_list.add(proc.name())
return list(process_list)
def getProcessConnections(self):
"""
获取进程使用的网络连接
"""
process_name = set()
process_conn = {}
for proc in process_iter():
connections = proc.connections()
if connections:
process_name.add(proc.name())
for con in connections:
if con.type == 1: # TCP
protocol = 'TCP'
elif con.type == 2: # UDP
protocol = 'UDP'
# 本地使用的IP及端口
laddr = f"{con.laddr[0]}:{con.laddr[1]}"
if con.raddr:
raddr = f"{con.raddr[0]}:{con.raddr[1]}"
elif con.family.value == 2:
# IPv4
raddr = "0.0.0.0:0"
elif con.family.value == 23:
# IPv6
raddr = "[::]:0"
else:
raddr = "*:*"
info = f"{protocol}\t{con.status}\nLocal: {laddr}\nRemote: {raddr}\n"
process_conn.setdefault(
proc.name(),
set()
).add(info)
return list(process_name), process_conn
def getPortList(self, process_name):
"""
用于刷新某个进程的端口列表
将端口列表设置到self.process_ports
"""
ports = set()
while not self.start_flag.is_set():
ports.clear()
for proc in process_iter():
connections = proc.connections()
if proc.name() == process_name and connections:
for con in connections:
if con.laddr:
ports.add(con.laddr[1])
if con.raddr:
ports.add(con.raddr[1])
if ports:
self.process_ports = list(ports)
else:
# 进程已不存在
self.window.stop()
self.window.refresh_process()
self.window.alert(
f"进程 {process_name} 已停止运行!"
)
def getConnections(self, pak):
"""
获取进程连接信息
"""
try:
src = pak.payload.src
dst = pak.payload.dst
length = len(pak)
if src == dst:
# 相同源地址和目的地址,可能为Land攻击
self.window.alert(
"数据包源地址与目的地址相同, 疑为Land攻击!"
)
elif len(pak.payload) > 65535:
# IP数据包的最大长度大于64KB(即65535B), 若大于, 则疑为Ping of Death攻击
self.window.alert(
"收到IP数据包长度大于64KB, 疑为Ping拒绝服务攻击!"
)
else:
tt_trans = pak.payload.payload
protocol = tt_trans.name
if protocol != 'ICMP':
sport = tt_trans.sport
dport = tt_trans.dport
info = f"{protocol:7}{src}:{sport} -> {dst}:{dport}{length:>7}"
if protocol == 'TCP':
info += f'{str(tt_trans.flags):>5}'
self.window.conList.addItem(info)
else:
# ICMP报文
self.window.conList.addItem(
f"{protocol:7}{src} -> {dst}{length:>7}"
)
except:
pass
def cap_packet(self):
"""
设置过滤器, 只接收IP、IPv6、TCP、UDP
"""
sniff(store=False,
filter="(tcp or udp or icmp) and (ip6 or ip)",
prn=lambda x: self.getConnections(x),
stop_filter=lambda x: self.start_flag.is_set())
def start(self, process_name):
"""
开始监视某一进程的流量
"""
# 开启刷新程序端口的线程
self.start_flag.clear()
self.window.conList.clear()
Thread(
target=self.getPortList,
daemon=True,
args=(process_name, )
).start()
Thread(
target=self.cap_packet,
daemon=True
).start()
def stop(self):
"""
停止监测
"""
self.start_flag.set()