Skip to content

Commit 83cc071

Browse files
ft: ZENKO-1363 log progress update every 10s
Add a log every 10 seconds to show how many objects have been processed, and how many errors occurred.
1 parent 84c4d51 commit 83cc071

File tree

1 file changed

+29
-6
lines changed

1 file changed

+29
-6
lines changed

crrExistingObjects.js

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
const http = require('http');
22

33
const AWS = require('aws-sdk');
4-
const { doWhilst, mapLimit, waterfall } = require('async');
4+
const { doWhilst, eachSeries, eachLimit, waterfall } = require('async');
55

66
const { Logger } = require('werelogs');
77

@@ -14,6 +14,7 @@ const SECRET_KEY = process.env.SECRET_KEY;
1414
const ENDPOINT = process.env.ENDPOINT;
1515
const SITE_NAME = process.env.SITE_NAME;
1616
const LISTING_LIMIT = 1000;
17+
const LOG_PROGRESS_INTERVAL_MS = 10000;
1718

1819
if (!BUCKETS || BUCKETS.length === 0) {
1920
log.fatal('No buckets given as input! Please provide ' +
@@ -51,6 +52,17 @@ const options = {
5152
const s3 = new AWS.S3(options);
5253
const bb = new BackbeatClient(options);
5354

55+
let nProcessed = 0;
56+
let nErrors = 0;
57+
let bucketInProgress = null;
58+
59+
function _logProgress() {
60+
log.info(`progress update: ${nProcessed} processed, ${nErrors} errors, ` +
61+
`bucket in progress: ${bucketInProgress || '(none)'}`);
62+
}
63+
64+
const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS);
65+
5466
function _markObjectPending(bucket, key, versionId, storageClass,
5567
repConfig, cb) {
5668
let objMD;
@@ -166,19 +178,27 @@ function _markPending(bucket, versions, cb) {
166178
log.error(errMsg);
167179
return next(new Error(errMsg));
168180
}
169-
return mapLimit(versions, 10, (i, apply) => {
181+
return eachLimit(versions, 10, (i, apply) => {
170182
const { Key, VersionId } = i;
171-
_markObjectPending(bucket, Key, VersionId, storageClass,
172-
repConfig, apply);
183+
_markObjectPending(
184+
bucket, Key, VersionId, storageClass, repConfig, err => {
185+
++nProcessed;
186+
if (err) {
187+
++nErrors;
188+
return apply(err);
189+
}
190+
return apply();
191+
});
173192
}, next);
174193
},
175194
], cb);
176195
}
177196

178-
function triggerCRR(bucketName, cb) {
197+
function triggerCRROnBucket(bucketName, cb) {
179198
const bucket = bucketName.trim();
180199
let VersionIdMarker = null;
181200
let KeyMarker = null;
201+
bucketInProgress = bucket;
182202
doWhilst(
183203
done => _listObjectVersions(bucket, VersionIdMarker, KeyMarker,
184204
(err, data) => {
@@ -197,17 +217,20 @@ function triggerCRR(bucketName, cb) {
197217
return false;
198218
},
199219
err => {
220+
bucketInProgress = null;
200221
if (err) {
201222
log.error('error marking objects for crr', { bucket });
202223
return cb(err);
203224
}
225+
_logProgress();
204226
log.info(`completed task for bucket: ${bucket}`);
205227
return cb();
206228
});
207229
}
208230

209231
// trigger the calls to list objects and mark them for crr
210-
mapLimit(BUCKETS, 1, triggerCRR, err => {
232+
eachSeries(BUCKETS, triggerCRROnBucket, err => {
233+
clearInterval(logProgressInterval);
211234
if (err) {
212235
return log.error('error during task execution', { error: err });
213236
}

0 commit comments

Comments
 (0)