-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_server_to_shards
149 lines (122 loc) · 4.67 KB
/
get_server_to_shards
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
import json
import struct
from typing import List, Tuple
import fdb
fdb.api_version(710)
KEY_SERVERS_PREFIX = b"\xff/keyServers/"
KEY_SERVERS_END = b"\xff/keyServers0"
SERVER_LIST_PREFIX = b"\xff/serverList/"
SERVER_LIST_END = b"\xff/serverList\x00"
class UID:
"""Represents a unique identifier used in FoundationDB."""
def __init__(self, first: int, second: int):
self.first = first # 64-bit integer
self.second = second # 64-bit integer
def __repr__(self):
return f"UID({self.first:#018x}, {self.second:#018x})"
def get_process_ids_from_status_json(db):
"""
Return a map storageID(SS) -> processID (ID that we show in the dashboad)
"""
# Get the status JSON from the special key \xff\xff/status/json
status_json = db[b"\xff\xff/status/json"]
status_data = json.loads(status_json.decode("utf-8"))
# Build mapping from storage server IDs to process IDs
server_list_map = {}
processes = status_data["cluster"]["processes"]
for proc_id, proc_info in processes.items():
for role in proc_info["roles"]:
if role["role"] == "storage":
# The storage server ID is the 'id' field in the role
server_id = role["id"]
process_id = proc_id
server_list_map[server_id] = process_id
return server_list_map
def decode_key_servers_value(value_bytes: bytes) -> Tuple[int, List[UID], List[UID]]:
"""
Decodes the value associated with a key starting with \xff/keyServers.
Args:
value_bytes (bytes): The binary value to decode.
Returns:
Tuple[int, List[UID], List[UID]]: A tuple containing the protocol version,
list of source UIDs, and list of destination UIDs.
"""
offset = 0
data_len = len(value_bytes)
# Read protocol version (8 bytes)
if data_len - offset < 8:
raise ValueError("Value too short to contain protocol version")
(protocol_version,) = struct.unpack_from("<Q", value_bytes, offset)
offset += 8
# Read src vector length (4 bytes)
if data_len - offset < 4:
raise ValueError("Value too short to contain src vector length")
(src_len,) = struct.unpack_from("<I", value_bytes, offset)
offset += 4
src = []
for _ in range(src_len):
if data_len - offset < 16:
raise ValueError("Value too short to contain src UIDs")
first, second = struct.unpack_from("<QQ", value_bytes, offset)
offset += 16
src.append(UID(first, second))
# Read dest vector length (4 bytes)
if data_len - offset < 4:
raise ValueError("Value too short to contain dest vector length")
(dest_len,) = struct.unpack_from("<I", value_bytes, offset)
offset += 4
dest = []
for _ in range(dest_len):
if data_len - offset < 16:
raise ValueError("Value too short to contain dest UIDs")
first, second = struct.unpack_from("<QQ", value_bytes, offset)
offset += 16
dest.append(UID(first, second))
return protocol_version, src, dest
def get_key_range_to_storage_servers(db):
"""
Return an array of dict, each dict has 2 or 3 keys:
* begin
* end
* storage_servers an array of the SS ID, beware that's not what we(rockset) call SS ID, which is actually is the process ID
"""
# Get the mapping from key ranges to storage servers
key_range_map = []
transaction = db.create_transaction()
transaction.options.set_access_system_keys()
# Read from \xff/keyServers/ to \xff/keyServers\x00
kvs = transaction.get_range(begin=KEY_SERVERS_PREFIX, end=KEY_SERVERS_END)
end_key = None
for kv in kvs:
key = kv.key
value = kv.value
begin_key = key[len(KEY_SERVERS_PREFIX) :]
if len(value) < 1:
continue
(version, src, dst) = decode_key_servers_value(value)
storage_server_ids = [f"{s.first:#018x}"[2:] for s in src]
key_range_map.append(
{
"begin": begin_key,
"storage_servers": storage_server_ids,
}
)
if end_key is not None:
key_range_map[-1]["end"] = end_key
end_key = begin_key
return key_range_map
def main():
db = fdb.open("/fdb-cluster-config/cluster-file")
# Can have issues with multi version
storage_server_to_process_id = get_process_ids_from_status_json(db)
# print(storage_server_to_process_id)
try:
v = get_key_range_to_storage_servers(db)
except:
v = get_key_range_to_storage_servers(db)
for val in v:
print(
f"{val['begin']} {[storage_server_to_process_id[s] for s in val['storage_servers']]}"
)
main()