-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathstatus.py
118 lines (89 loc) · 3.71 KB
/
status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#
# Store and watch all **status** values in system.
#
import sys, logging, asyncio
from pprint import pprint, pformat
from decimal import Decimal
from chrono import NOW
from objstruct import ObjectStruct
from hashlib import sha256
from copy import deepcopy
from utils import WatchableMixin
logging.getLogger(__name__).addHandler(logging.NullHandler())
class SystemStatus(WatchableMixin, ObjectStruct):
def __init__(self):
# define all values here. keep simple, small!
# - values must be JSON-able.
super(SystemStatus, self).__init__()
self.connected = False
self.serial_number = None
#self.xfp = None
self.hsm = dict(users=[], wallets=[]) # short for "hsm_status"
self.is_testnet = False
# storage locker has been read ok.
self.sl_loaded = False
# user doesn't want Tor regardless of other settings (also disables login process)
self.force_local_mode = False
# we are in setup mode
self.setup_mode = False
# PSBT related
self._pending_psbt = None # raw binary
self.psbt_hash = None # hex digits (sha256)
self.psbt_size = None # size of binary
self.local_code = None # string of 6 digits
self.psbt_preview = None # text
self.busy_signing = False
# tor related
self.tord_good = False # local tord control connection good
self.onion_addr = None # our present onion addr, if any
self.tor_enabled = False # config calls for tor (ie. BP['tor_enabled'])
# list of structs about creditials given by remote users
self.pending_auth = []
def reset_pending_auth(self):
# clear and setup pending auth list
from persist import BP
# make a list of users that might need to auth
ul = self.hsm.get('users')
if not ul:
if BP.get('policy'):
ul = set()
try:
for r in BP['policy']['rules']:
ul.union(r.users)
except KeyError: pass
ul = list(sorted(ul))
# they might have picked privacy over UX, so provide some "slots"
# regardless of above.
if not ul:
ul = ['' for i in range(5)]
# construct an obj for UX purposes, but keep the actual secrets separate
self.pending_auth = [ObjectStruct(name=n, has_name=bool(n),
has_guess='', totp=0) for n in ul]
self._auth_guess = [None]*len(ul)
def clear_psbt(self):
# wipe knowledge of PSBT
self._pending_psbt = None
self.psbt_hash = None
self.psbt_size = None
self.local_code = None
self.psbt_preview = None
def import_psbt(self, psbt):
from ckcc.utils import calc_local_pincode
from utils import cleanup_psbt
from binascii import b2a_hex
self.clear_psbt()
self._pending_psbt = cleanup_psbt(psbt)
self.psbt_size = len(self._pending_psbt)
hh = sha256(self._pending_psbt).digest()
self.psbt_hash = b2a_hex(hh).decode('ascii')
# local PIN code will be wrong/stale now.
if self.hsm and self.hsm.get('next_local_code'):
self.local_code = calc_local_pincode(hh, self.hsm.next_local_code)
logging.info("Imported PSBT with hash: " + self.psbt_hash)
def as_dict(self):
# we stream changes to web clients, so provide JSON
return dict((k, deepcopy(self[k]))
for k in self.keys() if k[0] != '_' and not callable(self[k]))
# singleton
STATUS = SystemStatus()
# EOF