11const http = require ( 'http' ) ;
22
33const AWS = require ( 'aws-sdk' ) ;
4- const { doWhilst, mapLimit , waterfall } = require ( 'async' ) ;
4+ const { doWhilst, eachSeries , eachLimit , waterfall } = require ( 'async' ) ;
55
66const { Logger } = require ( 'werelogs' ) ;
77
@@ -14,6 +14,7 @@ const SECRET_KEY = process.env.SECRET_KEY;
1414const ENDPOINT = process . env . ENDPOINT ;
1515const SITE_NAME = process . env . SITE_NAME ;
1616const LISTING_LIMIT = 1000 ;
17+ const LOG_PROGRESS_INTERVAL_MS = 10000 ;
1718
1819if ( ! BUCKETS || BUCKETS . length === 0 ) {
1920 log . fatal ( 'No buckets given as input! Please provide ' +
@@ -51,6 +52,17 @@ const options = {
5152const s3 = new AWS . S3 ( options ) ;
5253const bb = new BackbeatClient ( options ) ;
5354
55+ let nProcessed = 0 ;
56+ let nErrors = 0 ;
57+ let bucketInProgress = null ;
58+
59+ function _logProgress ( ) {
60+ log . info ( `progress update: ${ nProcessed } processed, ${ nErrors } errors, ` +
61+ `bucket in progress: ${ bucketInProgress || '(none)' } ` ) ;
62+ }
63+
64+ const logProgressInterval = setInterval ( _logProgress , LOG_PROGRESS_INTERVAL_MS ) ;
65+
5466function _markObjectPending ( bucket , key , versionId , storageClass ,
5567 repConfig , cb ) {
5668 let objMD ;
@@ -166,19 +178,27 @@ function _markPending(bucket, versions, cb) {
166178 log . error ( errMsg ) ;
167179 return next ( new Error ( errMsg ) ) ;
168180 }
169- return mapLimit ( versions , 10 , ( i , apply ) => {
181+ return eachLimit ( versions , 10 , ( i , apply ) => {
170182 const { Key, VersionId } = i ;
171- _markObjectPending ( bucket , Key , VersionId , storageClass ,
172- repConfig , apply ) ;
183+ _markObjectPending (
184+ bucket , Key , VersionId , storageClass , repConfig , err => {
185+ ++ nProcessed ;
186+ if ( err ) {
187+ ++ nErrors ;
188+ return apply ( err ) ;
189+ }
190+ return apply ( ) ;
191+ } ) ;
173192 } , next ) ;
174193 } ,
175194 ] , cb ) ;
176195}
177196
178- function triggerCRR ( bucketName , cb ) {
197+ function triggerCRROnBucket ( bucketName , cb ) {
179198 const bucket = bucketName . trim ( ) ;
180199 let VersionIdMarker = null ;
181200 let KeyMarker = null ;
201+ bucketInProgress = bucket ;
182202 doWhilst (
183203 done => _listObjectVersions ( bucket , VersionIdMarker , KeyMarker ,
184204 ( err , data ) => {
@@ -197,17 +217,20 @@ function triggerCRR(bucketName, cb) {
197217 return false ;
198218 } ,
199219 err => {
220+ bucketInProgress = null ;
200221 if ( err ) {
201222 log . error ( 'error marking objects for crr' , { bucket } ) ;
202223 return cb ( err ) ;
203224 }
225+ _logProgress ( ) ;
204226 log . info ( `completed task for bucket: ${ bucket } ` ) ;
205227 return cb ( ) ;
206228 } ) ;
207229}
208230
209231// trigger the calls to list objects and mark them for crr
210- mapLimit ( BUCKETS , 1 , triggerCRR , err => {
232+ eachSeries ( BUCKETS , triggerCRROnBucket , err => {
233+ clearInterval ( logProgressInterval ) ;
211234 if ( err ) {
212235 return log . error ( 'error during task execution' , { error : err } ) ;
213236 }
0 commit comments