-
Notifications
You must be signed in to change notification settings - Fork 388
/
Reporting.h
603 lines (523 loc) · 17.8 KB
/
Reporting.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <folly/synchronization/RWSpinLock.h>
#include <wdt/AbortChecker.h>
#include <wdt/ErrorCodes.h>
#include <wdt/WdtOptions.h>
#include <wdt/WdtTransferRequest.h>
#include <wdt/util/EncryptionUtils.h>
#include <algorithm>
#include <chrono>
#include <iterator>
#include <limits>
#include <string>
#include <unordered_map>
#include <vector>
namespace facebook {
namespace wdt {
const double kMbToB = 1024 * 1024;
const double kMicroToMilli = 1000;
const double kMicroToSec = 1000 * 1000;
const double kMilliToSec = 1000;
typedef std::chrono::high_resolution_clock Clock;
template <typename T>
int64_t durationMicros(T d) {
return std::chrono::duration_cast<std::chrono::microseconds>(d).count();
}
template <typename T>
int durationMillis(T d) {
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
}
template <typename T>
double durationSeconds(T d) {
return std::chrono::duration_cast<std::chrono::duration<double>>(d).count();
}
template <typename T>
std::ostream &operator<<(std::ostream &os, const std::vector<T> &v) {
std::copy(v.begin(), v.end(), std::ostream_iterator<T>(os, " "));
return os;
}
// TODO rename to ThreadResult
/// class representing statistics related to file transfer
class TransferStats {
private:
/// number of header bytes transferred
int64_t headerBytes_ = 0;
/// number of data bytes transferred
int64_t dataBytes_ = 0;
/// number of header bytes transferred as part of successful file transfer
int64_t effectiveHeaderBytes_ = 0;
/// number of data bytes transferred as part of successful file transfer
int64_t effectiveDataBytes_ = 0;
/// number of files successfully transferred
int64_t numFiles_ = 0;
/// number of blocks successfully transferred
int64_t numBlocks_ = 0;
/// number of failed transfers
int64_t failedAttempts_ = 0;
/// Total number of blocks sent by sender
int64_t numBlocksSend_{-1};
/// Total number of bytes sent by sender
int64_t totalSenderBytes_{-1};
/// status of the transfer
ErrorCode localErrCode_ = OK;
/// status of the remote
ErrorCode remoteErrCode_ = OK;
/// id of the owner object
std::string id_;
/// encryption type used
EncryptionType encryptionType_{ENC_NONE};
/// is tls enabled?
bool tls_{false};
/// mutex to support synchronized access
std::unique_ptr<folly::RWSpinLock> mutex_{nullptr};
std::unique_lock<folly::RWSpinLock> getUniqueLock() {
using lock = std::unique_lock<folly::RWSpinLock>;
return mutex_ ? lock{*mutex_} : lock{};
}
std::shared_lock<folly::RWSpinLock> getSharedLock() const {
using lock = std::shared_lock<folly::RWSpinLock>;
return mutex_ ? lock{*mutex_} : lock{};
}
public:
// making the object noncopyable
TransferStats(const TransferStats &stats) = delete;
TransferStats &operator=(const TransferStats &stats) = delete;
TransferStats(TransferStats &&stats) = default;
TransferStats &operator=(TransferStats &&stats) = default;
explicit TransferStats(bool isLocked = false) {
if (isLocked) {
mutex_ = std::make_unique<folly::RWSpinLock>();
}
}
explicit TransferStats(const std::string &id, bool isLocked = false)
: TransferStats(isLocked) {
id_ = id;
}
void reset() {
std::unique_lock lock(getUniqueLock());
headerBytes_ = dataBytes_ = 0;
effectiveHeaderBytes_ = effectiveDataBytes_ = 0;
numFiles_ = numBlocks_ = 0;
failedAttempts_ = 0;
localErrCode_ = remoteErrCode_ = OK;
}
/// @return the number of blocks sent by sender
int64_t getNumBlocksSend() const {
std::shared_lock lock(getSharedLock());
return numBlocksSend_;
}
/// @return the total sender bytes
int64_t getTotalSenderBytes() const {
std::shared_lock lock(getSharedLock());
return totalSenderBytes_;
}
/// @return number of header bytes transferred
int64_t getHeaderBytes() const {
std::shared_lock lock(getSharedLock());
return headerBytes_;
}
/// @return number of data bytes transferred
int64_t getDataBytes() const {
std::shared_lock lock(getSharedLock());
return dataBytes_;
}
/**
* @param needLocking specifies whether we need to lock or not. this is
* for performance improvement. in sender, we do not
* need locking for this call, even though the other
* calls have to be locked
*
* @return number of total bytes transferred
*/
int64_t getTotalBytes(bool needLocking = true) const {
if (needLocking) {
std::shared_lock lock(getSharedLock());
return headerBytes_ + dataBytes_;
}
return headerBytes_ + dataBytes_;
}
/**
* @return number of header bytes transferred as part of successful file
* transfer
*/
int64_t getEffectiveHeaderBytes() const {
std::shared_lock lock(getSharedLock());
return effectiveHeaderBytes_;
}
/**
* @return number of data bytes transferred as part of successful file
* transfer
*/
int64_t getEffectiveDataBytes() const {
std::shared_lock lock(getSharedLock());
return effectiveDataBytes_;
}
/**
* @return number of total bytes transferred as part of successful file
* transfer
*/
int64_t getEffectiveTotalBytes() const {
std::shared_lock lock(getSharedLock());
return effectiveHeaderBytes_ + effectiveDataBytes_;
}
/// @return number of files successfully transferred
int64_t getNumFiles() const {
std::shared_lock lock(getSharedLock());
return numFiles_;
}
/// @return number of blocks successfully transferred
int64_t getNumBlocks() const {
std::shared_lock lock(getSharedLock());
return numBlocks_;
}
/// @return number of failed transfers
int64_t getFailedAttempts() const {
std::shared_lock lock(getSharedLock());
return failedAttempts_;
}
/// @return error code based on combinator of local and remote error
ErrorCode getErrorCode() const {
std::shared_lock lock(getSharedLock());
return getMoreInterestingError(localErrCode_, remoteErrCode_);
}
/// @return status of the transfer on this side
ErrorCode getLocalErrorCode() const {
std::shared_lock lock(getSharedLock());
return localErrCode_;
}
/// @return status of the transfer on the remote end
ErrorCode getRemoteErrorCode() const {
std::shared_lock lock(getSharedLock());
return remoteErrCode_;
}
const std::string &getId() const {
std::shared_lock lock(getSharedLock());
return id_;
}
/// @param number of additional data bytes transferred
void addDataBytes(int64_t count) {
std::unique_lock lock(getUniqueLock());
dataBytes_ += count;
}
/// @param number of additional header bytes transferred
void addHeaderBytes(int64_t count) {
std::unique_lock lock(getUniqueLock());
headerBytes_ += count;
}
/// @param set num blocks send
void setNumBlocksSend(int64_t numBlocksSend) {
std::unique_lock lock(getUniqueLock());
numBlocksSend_ = numBlocksSend;
}
/// @param set total sender bytes
void setTotalSenderBytes(int64_t totalSenderBytes) {
std::unique_lock lock(getUniqueLock());
totalSenderBytes_ = totalSenderBytes;
}
/// one more file transfer failed
void incrFailedAttempts() {
std::unique_lock lock(getUniqueLock());
failedAttempts_++;
}
/// @param status of the transfer
void setLocalErrorCode(ErrorCode errCode) {
std::unique_lock lock(getUniqueLock());
localErrCode_ = errCode;
}
/// @param status of the transfer on the remote end
void setRemoteErrorCode(ErrorCode remoteErrCode) {
std::unique_lock lock(getUniqueLock());
remoteErrCode_ = remoteErrCode;
}
/// @param id of the corresponding entity
void setId(const std::string &id) {
std::unique_lock lock(getUniqueLock());
id_ = id;
}
/// @param numFiles number of files successfully send
void setNumFiles(int64_t numFiles) {
std::unique_lock lock(getUniqueLock());
numFiles_ = numFiles;
}
/// one more block successfully transferred
void incrNumBlocks() {
std::unique_lock lock(getUniqueLock());
numBlocks_++;
}
void decrNumBlocks() {
std::unique_lock lock(getUniqueLock());
numBlocks_--;
}
/**
* @param headerBytes header bytes transfered part of a successful file
* transfer
* @param dataBytes data bytes transferred part of a successful file
* transfer
*/
void addEffectiveBytes(int64_t headerBytes, int64_t dataBytes) {
std::unique_lock lock(getUniqueLock());
effectiveHeaderBytes_ += headerBytes;
effectiveDataBytes_ += dataBytes;
}
void subtractEffectiveBytes(int64_t headerBytes, int64_t dataBytes) {
std::unique_lock lock(getUniqueLock());
effectiveHeaderBytes_ -= headerBytes;
effectiveDataBytes_ -= dataBytes;
}
void setEncryptionType(EncryptionType encryptionType) {
std::unique_lock lock(getUniqueLock());
encryptionType_ = encryptionType;
}
EncryptionType getEncryptionType() const {
std::shared_lock lock(getSharedLock());
return encryptionType_;
}
void setTls(bool tls) {
std::unique_lock lock(getUniqueLock());
tls_ = tls;
}
bool getTls() const {
std::shared_lock lock(getSharedLock());
return tls_;
}
TransferStats &operator+=(const TransferStats &stats);
friend std::ostream &operator<<(std::ostream &os, const TransferStats &stats);
};
/**
* Class representing entire client transfer report.
* Unit are mebibyte (MiB), ie 1048576 bytes which we call "Mbytes"
* for familiarity
*/
class TransferReport {
public:
// TODO: too many constructor parameters, needs to clean-up
/**
* This constructor moves all the stat objects to member variables. This is
* only called at the end of transfer by the sender
*/
TransferReport(std::vector<TransferStats> &transferredSourceStats,
std::vector<TransferStats> &failedSourceStats,
std::vector<TransferStats> &threadStats,
std::vector<std::string> &failedDirectories, double totalTime,
int64_t totalFileSize, int64_t numDiscoveredFiles,
int64_t previouslySentBytes, bool fileDiscoveryFinished);
/**
* This function does not move the thread stats passed to it. This is called
* by the progress reporter thread.
*/
TransferReport(const std::vector<TransferStats> &threadStats,
double totalTime, int64_t totalFileSize,
int64_t numDiscoveredFiles, bool fileDiscoveryFinished);
TransferReport(TransferStats &&stats, double totalTime, int64_t totalFileSize,
int64_t numDiscoveredFiles, bool fileDiscoveryFinished);
/// constructor used by receiver, does move the stats
explicit TransferReport(TransferStats &&globalStats);
/// @return summary of the report
const TransferStats &getSummary() const {
return summary_;
}
/// @return transfer throughput in Mbytes/sec
double getThroughputMBps() const {
return summary_.getEffectiveTotalBytes() / totalTime_ / kMbToB;
}
/// @return total time taken in transfer
double getTotalTime() const {
return totalTime_;
}
/// @return stats for successfully transferred sources
const std::vector<TransferStats> &getTransferredSourceStats() const {
return transferredSourceStats_;
}
/// @return stats for failed sources
const std::vector<TransferStats> &getFailedSourceStats() const {
return failedSourceStats_;
}
/// @return stats for threads
const std::vector<TransferStats> &getThreadStats() const {
return threadStats_;
}
const std::vector<std::string> &getFailedDirectories() const {
return failedDirectories_;
}
int64_t getTotalFileSize() const {
return totalFileSize_;
}
/// @return recent throughput in Mbytes/sec
double getCurrentThroughputMBps() const {
return currentThroughput_ / kMbToB;
}
/// @param stats stats to added
void addTransferStats(const TransferStats &stats) {
summary_ += stats;
}
/// @param currentThroughput current throughput
void setCurrentThroughput(double currentThroughput) {
currentThroughput_ = currentThroughput;
}
void setTotalTime(double totalTime) {
totalTime_ = totalTime;
}
void setTotalFileSize(int64_t totalFileSize) {
totalFileSize_ = totalFileSize;
}
void setErrorCode(const ErrorCode errCode) {
summary_.setLocalErrorCode(errCode);
summary_.setRemoteErrorCode(errCode);
}
int64_t getNumDiscoveredFiles() const {
return numDiscoveredFiles_;
}
bool fileDiscoveryFinished() const {
return fileDiscoveryFinished_;
}
int64_t getPreviouslySentBytes() const {
return previouslySentBytes_;
}
friend std::ostream &operator<<(std::ostream &os,
const TransferReport &report);
private:
TransferStats summary_;
/// stats for successfully transferred sources
std::vector<TransferStats> transferredSourceStats_;
/// stats for failed sources
std::vector<TransferStats> failedSourceStats_;
/// stats for client threads
std::vector<TransferStats> threadStats_;
/// directories which could not be opened
std::vector<std::string> failedDirectories_;
/// total transfer time
double totalTime_{0};
/// sum of all the file sizes
int64_t totalFileSize_{0};
/// recent throughput in bytes/sec
double currentThroughput_{0};
/// Count of all files discovered so far
int64_t numDiscoveredFiles_{0};
/// Number of bytes sent in previous transfers
int64_t previouslySentBytes_{0};
/// Is file discovery finished?
bool fileDiscoveryFinished_{false};
};
/**
* This class represents interface and default implementation of progress
* reporting
*/
class ProgressReporter {
public:
explicit ProgressReporter(const WdtTransferRequest &transferRequest)
: transferRequest_(transferRequest) {
isTty_ = isatty(STDOUT_FILENO);
}
/// this method is called before the transfer starts
virtual void start() {
}
/**
* This method gets called repeatedly with interval defined by
* progress_report_interval. If stdout is a terminal, then it displays
* transfer progress in stdout. Example output [===> ] 30% 5.00 Mbytes/sec.
* Else, it prints progress details in stdout.
*
* @param report current transfer report
*/
virtual void progress(const std::unique_ptr<TransferReport> &report);
/**
* This method gets called after the transfer ends
*
* @param report final transfer report
*/
virtual void end(const std::unique_ptr<TransferReport> &report);
virtual ~ProgressReporter() {
}
protected:
/// Reference to the wdt transfer request for the wdt base
/// object using the progress reporter
const WdtTransferRequest &transferRequest_;
private:
/**
* Displays progress of the transfer in stdout
*
* @param progress progress percentage
* @param throughput average throughput
* @param currentThroughput recent throughput
* @param numDiscoveredFiles number of files discovered so far
* @param fileDiscoveryFinished true once file discovery has compeleted
*/
void displayProgress(int progress, double averageThroughput,
double currentThroughput, int64_t numDiscoveredFiles,
bool fileDiscoveryFinished);
/**
* logs progress details
*
* @param effectiveDataBytes number of bytes sent
* @param progress progress percentage
* @param throughput average throughput
* @param currentThroughput recent throughput
* @param numDiscoveredFiles number of files discovered so far
* @param fileDiscoveryFinished true once file discovery has compeleted
*/
void logProgress(int64_t effectiveDataBytes, int progress,
double averageThroughput, double currentThroughput,
int64_t numDiscoveredFiles, bool fileDiscoveryFinished);
/// whether stdout is redirected to a terminal or not
bool isTty_;
};
/// class representing perf stat collection
class PerfStatReport {
public:
enum StatType {
SOCKET_READ,
SOCKET_WRITE,
FILE_OPEN,
FILE_CLOSE,
FILE_READ,
FILE_WRITE,
SYNC_FILE_RANGE,
FSYNC_STATS, // just 'FSYNC' is defined on Windows/conflicts
FILE_SEEK,
THROTTLER_SLEEP,
RECEIVER_WAIT_SLEEP, // receiver sleep duration between sending wait cmd to
// sender. A high sum for this suggests threads
// were not properly load balanced
DIRECTORY_CREATE,
IOCTL,
UNLINK,
FADVISE,
END
};
explicit PerfStatReport(const WdtOptions &options);
/**
* @param statType stat-type
* @param timeInMicros time taken by the operation in microseconds
*/
void addPerfStat(StatType statType, int64_t timeInMicros);
friend std::ostream &operator<<(std::ostream &os,
const PerfStatReport &statReport);
PerfStatReport &operator+=(const PerfStatReport &statReport);
private:
const static int kNumTypes_ = PerfStatReport::END;
const static std::string statTypeDescription_[];
const static int32_t kHistogramBuckets[];
/// mapping from time to number of entries
std::unordered_map<int64_t, int64_t> perfStats_[kNumTypes_];
/// max time for different stat types
int64_t maxValueMicros_[kNumTypes_] = {0};
/// min time for different stat types
int64_t minValueMicros_[kNumTypes_] = {std::numeric_limits<int64_t>::max()};
/// number of records for different stat types
int64_t count_[kNumTypes_] = {0};
/// sum of all records for different stat types
int64_t sumMicros_[kNumTypes_] = {0};
/// network timeout in milliseconds
int networkTimeoutMillis_;
/// mutex to support synchronized access
mutable folly::RWSpinLock mutex_;
};
} // namespace wdt
} // namespace facebook