-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbitwardenclient.py
118 lines (102 loc) · 4.28 KB
/
bitwardenclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
import encryption
import tempfile
import pathlib
import random
import socket
import string
import json
import os
class BitwardenClient:
def __init__(self, defaults):
self.host = defaults.host
self.port = defaults.port
self.do_raise = defaults.do_raise
self.saltfolder = defaults.saltfolder
self.encryption = defaults.encryption
def send_request(self, request_data):
"""Send an encrypted request to the Bitwarden daemon and receive the response."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.connect((self.host, self.port))
saltfile = pathlib.Path(tempfile.gettempdir())
if self.saltfolder:
saltfile /= "Bitwarden+Cli-Autofiller+Script-Directory"
os.makedirs(saltfile, exist_ok=True)
saltfile /= "Bitwarden+Cli-Autofiller+Script-Salt+File"
if saltfile.is_file():
saltfile.unlink()
salt = "".join(
random.choice(string.ascii_uppercase + string.digits)
for _ in range(32)
)
with open(saltfile, "w") as salty:
salty.write(salt)
on_premise_key_local = (
"Bitwarden+Cli~Autofiller:Script-Comunication*Passphrase"
)
on_premise_key_local += self.encryption
on_premise_key_local += salt
# Encrypt the request data and send it as bytes
encrypted_request = encryption.encrypt_aes_256(
json.dumps(request_data), on_premise_key_local
)
sock.sendall(encrypted_request)
if request_data.get("command", "") == "exit":
return
# Receive and decrypt the response
encrypted_response = self.receive_data(sock)
try:
err = str(encrypted_response)
if err == "Decryption failed":
print(
f"Server failed to decrypt incoming data (encryption password might not match)\nClosing..."
)
if self.do_raise:
raise ValueError("Server failed to decrypt incoming data (encryption password might not match)")
else:
exit(1)
elif err == "Encryption failed":
print("Server failed to encrypt data\nClosing...")
if self.do_raise:
raise RuntimeError("Server failed to encrypt data")
else:
exit(1)
except Exception as e:
pass
try:
decrypted_response = encryption.decrypt_aes_256(
encrypted_response, on_premise_key_local
)
except Exception as e:
print(
f"Failed to decrypt incoming data (encryption password might not match): {e}\nClosing..."
)
if self.do_raise:
raise ValueError(f"Failed to decrypt incoming data (encryption password might not match): {e}")
else:
exit(1)
return json.loads(decrypted_response)
except Exception as e:
print(f"An error occurred: {e}")
return None
def receive_data(self, sock):
"""Receive data from the socket."""
data = b""
while True:
packet = sock.recv(4096)
if not packet:
break
data += packet
break # only receive one pice of data
# No need to decode, just return raw bytes
return data
def send_command(self, command, args=[], dictfilter={}):
"""Send a command to the Bitwarden daemon and return the result."""
request_data = {
"command": command,
"args": args,
"filter": dictfilter,
}
response = self.send_request(request_data)
return response