-
Notifications
You must be signed in to change notification settings - Fork 388
/
WdtResourceController.h
294 lines (229 loc) · 9.76 KB
/
WdtResourceController.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/ErrorCodes.h>
#include <wdt/Receiver.h>
#include <wdt/Sender.h>
#include <unordered_map>
#include <vector>
namespace facebook {
namespace wdt {
typedef std::shared_ptr<Receiver> ReceiverPtr;
typedef std::shared_ptr<Sender> SenderPtr;
/**
* Base class for both wdt global and namespace controller
*/
class WdtControllerBase {
public:
/// Constructor with a name for the controller
explicit WdtControllerBase(const std::string &controllerName);
/// Destructor
virtual ~WdtControllerBase() {
}
/// Update max receivers limit
virtual void updateMaxReceiversLimit(int64_t maxNumReceivers);
/// Update max senders limit
virtual void updateMaxSendersLimit(int64_t maxNumSenders);
protected:
using GuardLock = std::unique_lock<std::recursive_mutex>;
/// Number of active receivers
int64_t numReceivers_{0};
/// Number of active senders
int64_t numSenders_{0};
/// Maximum number of senders allowed for this namespace
int64_t maxNumSenders_{0};
/// Maximum number of receivers allowed for this namespace
int64_t maxNumReceivers_{0};
/// Mutex that protects all the private members of this class
mutable std::recursive_mutex controllerMutex_;
/// Name of the resource controller
std::string controllerName_;
};
class WdtResourceController;
/**
* Controller defined per namespace if the user wants to divide
* things between different namespaces (ex db shards)
*/
class WdtNamespaceController : public WdtControllerBase {
public:
/// Constructor with a name for namespace
WdtNamespaceController(const std::string &wdtNamespace,
const WdtResourceController *const parent);
/// Is free to create sender.
bool hasSenderQuota() const;
/// Add a receiver for this namespace with identifier
ErrorCode createReceiver(const WdtTransferRequest &request,
const std::string &identifier,
ReceiverPtr &receiver);
/// Is free to create receiver.
bool hasReceiverQuota() const;
/// Add a sender for this namespace with identifier
ErrorCode createSender(const WdtTransferRequest &request,
const std::string &identifier, SenderPtr &sender);
/// Delete a receiver from this namespace
ErrorCode releaseReceiver(const std::string &identifier);
/// Delete a sender from this namespace
ErrorCode releaseSender(const std::string &identifier);
/// Releases all senders in this namespace
int64_t releaseAllSenders();
/// Releases all receivers in this namespace
int64_t releaseAllReceivers();
/**
* Get the sender you created by the createSender API
* using the same identifier you mentioned before
*/
SenderPtr getSender(const std::string &identifier) const;
/**
* Get the receiver you created by the createReceiver API
* using the same identifier you mentioned before
*/
ReceiverPtr getReceiver(const std::string &identifier) const;
/// Get all senders
std::vector<SenderPtr> getSenders() const;
/// Get all receivers
std::vector<ReceiverPtr> getReceivers() const;
// Get all senders ids
std::vector<std::string> getSendersIds() const;
/// Clear the senders that are not active anymore
std::vector<std::string> releaseStaleSenders();
/// Clear the receivers that are not active anymore
std::vector<std::string> releaseStaleReceivers();
/// Destructor, clears the senders and receivers
~WdtNamespaceController() override;
private:
/// Map of receivers associated with identifier
std::unordered_map<std::string, ReceiverPtr> receiversMap_;
/// Map of senders associated with identifier
std::unordered_map<std::string, SenderPtr> sendersMap_;
/// Throttler for this namespace
const WdtResourceController *const parent_;
};
/**
* A generic resource controller for wdt objects
* User can set the maximum limit for receiver/sender
* and organize them in different namespace
*/
class WdtResourceController : public WdtControllerBase {
public:
/// resource controller should take the option as reference so that it can be
/// changed later from the parent object
WdtResourceController(const WdtOptions &options,
std::shared_ptr<Throttler> throttler);
explicit WdtResourceController(const WdtOptions &options);
WdtResourceController();
/// Is free to create sender specified by namespace.
bool hasSenderQuota(const std::string &wdtNamespace) const;
/**
* Add a sender specified by namespace and a identifier.
* You can get this sender back by using the same identifier
*/
ErrorCode createSender(const std::string &wdtNamespace,
const std::string &identifier,
const WdtTransferRequest &request, SenderPtr &sender);
/// Is free to create receiver specified by namespace.
bool hasReceiverQuota(const std::string &wdtNamespace) const;
/// Add a receiver specified with namespace and identifier
ErrorCode createReceiver(const std::string &wdtNamespace,
const std::string &identifier,
const WdtTransferRequest &request,
ReceiverPtr &receiver);
/// Release a sender specified with namespace and identifier
ErrorCode releaseSender(const std::string &wdtNamespace,
const std::string &identifier);
/// Release a receiver specified with namespace and identifier
ErrorCode releaseReceiver(const std::string &wdtNamespace,
const std::string &identifier);
/// Register a wdt namespace (if strict mode)
ErrorCode registerWdtNamespace(const std::string &wdtNamespace);
/// De register a wdt namespace
ErrorCode deRegisterWdtNamespace(const std::string &wdtNamespace);
/// Use the base class methods for global limits
using WdtControllerBase::updateMaxReceiversLimit;
using WdtControllerBase::updateMaxSendersLimit;
/// Update max receivers limit of namespace
void updateMaxReceiversLimit(const std::string &wdtNamespace,
int64_t maxNumReceivers);
/// Update max senders limit of namespace
void updateMaxSendersLimit(const std::string &wdtNamespace,
int64_t maxNumSenders);
/// Release all senders in the specified namespace
ErrorCode releaseAllSenders(const std::string &wdtNamespace);
/// Releases all receivers in specified namespace
ErrorCode releaseAllReceivers(const std::string &wdtNamespace);
/// Get a particular sender from a wdt namespace
SenderPtr getSender(const std::string &wdtNamespace,
const std::string &identifier) const;
/// Get a particular receiver from a wdt namespace
ReceiverPtr getReceiver(const std::string &wdtNamespace,
const std::string &identifier) const;
/// Get all senders in a namespace
std::vector<SenderPtr> getAllSenders(const std::string &wdtNamespace) const;
/// Get all senders ids in a namespace
std::vector<std::string> getAllSendersIds(
const std::string &wdtNamespace) const;
/// Get all receivers in a namespace
std::vector<ReceiverPtr> getAllReceivers(
const std::string &wdtNamespace) const;
/// Clear the senders that are no longer active.
ErrorCode releaseStaleSenders(const std::string &wdtNamespace,
std::vector<std::string> &erasedIds);
/// Clear the receivers that are no longer active
ErrorCode releaseStaleReceivers(const std::string &wdtNamespace,
std::vector<std::string> &erasedIds);
/**
* Call with true to require registerWdtNameSpace() to be called
* before requesting sender/receiver for that namespace.
*/
void requireRegistration(bool isStrict);
/// Cleanly shuts down the controller
void shutdown();
/// @return Singleton instance of the controller
static WdtResourceController *get();
/// Destructor for the global resource controller
~WdtResourceController() override;
/// Default global namespace
static const char *const kGlobalNamespace;
/// Return current counts
ErrorCode getCounts(int32_t &numNamespaces, int32_t &numSenders,
int32_t &numReceivers);
/**
* getter for throttler.
* setThrottlerRates to this throttler may not take effect. Instead, update
* WdtOptions accordingly.
* Applications have to register transfers with the throttler, and at the end
* de-register it. For example-
* throttler->startTransfer();
* ...
* throttler->limit(numBytes);
* ...
* throttler->endTransfer();
*/
std::shared_ptr<Throttler> getWdtThrottler() const;
const WdtOptions &getOptions() const;
protected:
typedef std::shared_ptr<WdtNamespaceController> NamespaceControllerPtr;
/// Get the namespace controller
NamespaceControllerPtr getNamespaceController(
const std::string &wdtNamespace) const;
private:
NamespaceControllerPtr createNamespaceController(const std::string &name);
/// Map containing the resource controller per namespace
std::unordered_map<std::string, NamespaceControllerPtr> namespaceMap_;
/// Whether namespace need to be created explictly
bool strictRegistration_{false};
/// Throttler for all the namespaces
std::shared_ptr<Throttler> throttler_{nullptr};
const WdtOptions &options_;
/// Internal method for checking hasSenderQuota & hasReceiverQuota
bool hasSenderQuotaInternal(const std::shared_ptr<WdtNamespaceController>
&controller = nullptr) const;
bool hasReceiverQuotaInternal(const std::shared_ptr<WdtNamespaceController>
&controller = nullptr) const;
};
} // namespace wdt
} // namespace facebook