@@ -7,11 +7,12 @@ const { Logger } = require('werelogs');
77
88const BackbeatClient = require ( './BackbeatClient' ) ;
99
10- const log = new Logger ( 's3utils::emptyBucket ' ) ;
10+ const log = new Logger ( 's3utils::crrExistingObjects ' ) ;
1111const BUCKETS = process . argv [ 2 ] ? process . argv [ 2 ] . split ( ',' ) : null ;
1212const ACCESS_KEY = process . env . ACCESS_KEY ;
1313const SECRET_KEY = process . env . SECRET_KEY ;
1414const ENDPOINT = process . env . ENDPOINT ;
15+ const SITE_NAME = process . env . SITE_NAME ;
1516const LISTING_LIMIT = 1000 ;
1617
1718if ( ! BUCKETS || BUCKETS . length === 0 ) {
@@ -48,11 +49,11 @@ const options = {
4849 } ,
4950} ;
5051const s3 = new AWS . S3 ( options ) ;
51-
52-
5352const bb = new BackbeatClient ( options ) ;
5453
55- function _markObjectPending ( bucket , key , versionId , repConfig , cb ) {
54+ function _markObjectPending ( bucket , key , versionId , storageClass ,
55+ repConfig , cb ) {
56+ let objMD ;
5657 return waterfall ( [
5758 // get object blob
5859 next => bb . getMetadata ( {
@@ -62,14 +63,50 @@ function _markObjectPending(bucket, key, versionId, repConfig, cb) {
6263 } , next ) ,
6364 // update replication info and put back object blob
6465 ( mdRes , next ) => {
65- const objMD = JSON . parse ( mdRes . Body ) ;
66+ objMD = JSON . parse ( mdRes . Body ) ;
67+ if ( objMD . replicationInfo && objMD . replicationInfo . status !== '' ) {
68+ // skip object since it's already marked for crr
69+ return next ( ) ;
70+ }
71+ if ( objMD . versionId ) {
72+ // The object already has an *internal* versionId,
73+ // which exists when the object has been put on
74+ // versioned or versioning-suspended bucket. Even if
75+ // the listed version is "null", the object may have
76+ // an actual internal versionId, only if the bucket
77+ // was versioning-suspended when the object was put.
78+ return next ( ) ;
79+ }
80+ // The object does not have an *internal* versionId, as it
81+ // was put on a nonversioned bucket: do a first metadata
82+ // update to let cloudserver generate one, just passing on
83+ // the existing metadata blob. Note that the resulting key
84+ // will still be nonversioned, but the following update
85+ // will be able to create a versioned key for this object,
86+ // so that replication can happen. The externally visible
87+ // version will stay "null".
88+ return bb . putMetadata ( {
89+ Bucket : bucket ,
90+ Key : key ,
91+ ContentLength : Buffer . byteLength ( mdRes . Body ) ,
92+ Body : mdRes . Body ,
93+ } , ( err , putRes ) => {
94+ if ( err ) {
95+ return next ( err ) ;
96+ }
97+ // No need to fetch the whole metadata again, simply
98+ // update the one we have with the generated versionId.
99+ objMD . versionId = putRes . versionId ;
100+ return next ( ) ;
101+ } ) ;
102+ } ,
103+ next => {
66104 if ( objMD . replicationInfo && objMD . replicationInfo . status !== '' ) {
67105 // skip object since it's already marked for crr
68106 return next ( ) ;
69107 }
70108 const { Rules, Role } = repConfig ;
71109 const destination = Rules [ 0 ] . Destination . Bucket ;
72- const storageClass = Rules [ 0 ] . Destination . StorageClass ;
73110 // set replication properties
74111 const ops = objMD [ 'content-length' ] === 0 ? [ 'METADATA' ] :
75112 [ 'METADATA' , 'DATA' ] ;
@@ -119,10 +156,22 @@ function _markPending(bucket, versions, cb) {
119156 }
120157 return next ( null , res . ReplicationConfiguration ) ;
121158 } ) ,
122- ( repConfig , next ) => mapLimit ( versions , 10 , ( i , apply ) => {
123- const { Key, VersionId } = i ;
124- _markObjectPending ( bucket , Key , VersionId , repConfig , apply ) ;
125- } , next ) ,
159+ ( repConfig , next ) => {
160+ const { Rules } = repConfig ;
161+ const storageClass = Rules [ 0 ] . Destination . StorageClass || SITE_NAME ;
162+ if ( ! storageClass ) {
163+ const errMsg =
164+ 'missing SITE_NAME environment variable, must be set to' +
165+ ' the value of "site" property in the CRR configuration' ;
166+ log . error ( errMsg ) ;
167+ return next ( new Error ( errMsg ) ) ;
168+ }
169+ return mapLimit ( versions , 10 , ( i , apply ) => {
170+ const { Key, VersionId } = i ;
171+ _markObjectPending ( bucket , Key , VersionId , storageClass ,
172+ repConfig , apply ) ;
173+ } , next ) ;
174+ } ,
126175 ] , cb ) ;
127176}
128177
0 commit comments