forked from flymedllva/iCloud-bypass-via-USB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bypass.py
executable file
·150 lines (122 loc) · 4.36 KB
/
bypass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import socketserver
import paramiko
import usbmux
import select
from threading import Thread
from socket import socket
class SocketRelay(object):
def __init__(self, a: socket, b: socket, max_buffer: int = 65535):
self.a = a
self.b = b
self.atob = bytes()
self.btoa = bytes()
self.max_buffer = max_buffer
def handle(self) -> None:
while True:
rlist = list()
wlist = list()
xlist = [self.a, self.b]
if self.atob:
wlist.append(self.b)
if self.btoa:
wlist.append(self.a)
if len(self.atob) < self.max_buffer:
rlist.append(self.a)
if len(self.btoa) < self.max_buffer:
rlist.append(self.b)
rlo, wlo, xlo = select.select(rlist, wlist, xlist)
if xlo:
return
if self.a in wlo:
n = self.a.send(self.btoa)
self.btoa = self.btoa[n:]
if self.b in wlo:
n = self.b.send(self.atob)
self.atob = self.atob[n:]
if self.a in rlo:
s = self.a.recv(self.max_buffer - len(self.atob))
if not s:
return
self.atob += s
if self.b in rlo:
s = self.b.recv(self.max_buffer - len(self.btoa))
if not s:
return
self.btoa += s
class TCPRelay(socketserver.BaseRequestHandler):
def handle(self) -> None:
print("Incoming connection to %d" % self.server.server_address[1])
mux = usbmux.USBMux(None)
print("Waiting for devices...")
if not mux.devices:
mux.process(1.0)
if not mux.devices:
print("No device found")
self.request.close()
return
dev = mux.devices[0]
print("Connecting to device %s" % str(dev))
d_sock = mux.connect(dev, self.server.r_port)
l_sock = self.request
print("Connection established, relaying data")
try:
fwd = SocketRelay(d_sock, l_sock, self.server.buffer_size * 1024)
fwd.handle()
finally:
d_sock.close()
l_sock.close()
print("Connection closed")
class TCPServer(socketserver.TCPServer):
allow_reuse_address = True
class ThreadedTCPServer(socketserver.ThreadingMixIn, TCPServer):
allow_reuse_address = True
class PhoneConnect:
def __init__(self,
host: str = "localhost", mobile_port: int = 44, computer_port: int = 2222, buffer_size: int = 128):
self.host = host
self.mobile_port = mobile_port
self.computer_port = computer_port
self.buffer_size = buffer_size
def start(self) -> None:
servers = list()
ports = [(self.mobile_port, self.computer_port)]
for r_port, l_port in ports:
print(f"Forwarding local port {l_port} to remote port {r_port}")
server = ThreadedTCPServer((self.host, l_port), TCPRelay)
server.r_port = r_port
server.buffer_size = self.buffer_size
servers.append(server)
alive = True
while alive:
try:
rl, wl, xl = select.select(servers, [], [])
for server in rl:
server.handle_request()
except KeyboardInterrupt:
print("Server stopped")
except Exception:
alive = False
if __name__ == '__main__':
server = PhoneConnect()
thread = Thread(target=server.start)
thread.start()
host = "localhost"
user = "root"
secret = "alpine"
port = 2222
command = "cd /;mount -o rw,union,update /;cd /Applications;mv Setup.app Setup.app.bak;uicache -a;" \
"killall -9 SpringBoard"
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=host, username=user, password=secret, port=port)
client.exec_command(command)
except paramiko.ssh_exception.AuthenticationException:
print("Authentication failed")
except paramiko.ssh_exception.NoValidConnectionsError:
print("Failed to establish connection with the server")
client.close()
print("Done!")
thread.join()