Skip to content

Commit 7953670

Browse files
Merge remote-tracking branch 'origin/feature/ZENKO-1379-crrPendingAndBatchLimit' into w/8.0/feature/ZENKO-1379-crrPendingAndBatchLimit
2 parents c8b0580 + c98ffea commit 7953670

File tree

2 files changed

+241
-31
lines changed

2 files changed

+241
-31
lines changed

README.md

Lines changed: 173 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,202 @@
11
# s3utils
22
S3 Connector and Zenko Utilities
33

4-
Run the docker container as
4+
Run the Docker container as
55
```
6-
docker run --net=host -e 'ACCESS_KEY=accessKey' -e 'SECRET_KEY=secretkey' -e 'ENDPOINT=http://127.0.0.1:8000' zenko/s3utils node scriptName bucket1[,bucket2...]
6+
docker run --net=host -e 'ACCESS_KEY=accessKey' -e 'SECRET_KEY=secretKey' -e 'ENDPOINT=http://127.0.0.1:8000' zenko/s3utils node scriptName bucket1[,bucket2...]
77
```
88

9-
Optionally, the environment variable "WORKERS" may be set to specify
10-
how many parallel workers should run, otherwise a default of 10
11-
workers will be used.
9+
## Trigger CRR on existing objects
1210

13-
## Trigger CRR on objects that were put before replication was enabled on the bucket
14-
15-
1. Enable versioning and setup replication on the bucket
11+
1. Enable versioning and set up replication on the bucket.
1612
2. Run script as
1713
```
1814
node crrExistingObjects.js testbucket1,testbucket2
1915
```
2016

21-
## Trigger CRR on *all* objects of a bucket
17+
### Extra environment variables
18+
19+
Additionally, the following extra environment variables can be passed
20+
to the script to modify its behavior:
21+
22+
#### TARGET_REPLICATION_STATUS
23+
24+
Comma-separated list of replication statuses to target for CRR
25+
requeueing. The recognized statuses are:
26+
27+
* **NEW**: No replication status is attached to the object. This is
28+
the state of objects written without any CRR policy attached to
29+
the bucket that would have triggered CRR on them.
30+
31+
* **PENDING**: The object replication status is PENDING.
32+
33+
* **COMPLETED**: The object replication status is COMPLETED.
34+
35+
* **FAILED**: The object replication status is FAILED.
36+
37+
* **REPLICA**: The object replication status is REPLICA (objects that
38+
were put to a target site via CRR have this status).
39+
40+
The default script behavior is to affect objects that have no
41+
replication status attached (so equivalent to
42+
`TARGET_REPLICATION_STATUS=NEW`).
43+
44+
Examples:
45+
46+
`TARGET_REPLICATION_STATUS=PENDING,COMPLETED`
47+
48+
Requeue objects that either have a replication status of PENDING or
49+
COMPLETED for CRR, do not requeue the others.
50+
51+
`TARGET_REPLICATION_STATUS=NEW,PENDING,COMPLETED,FAILED`
52+
53+
Trigger CRR on all original source objects (not replicas) in a bucket.
54+
55+
`TARGET_REPLICATION_STATUS=REPLICA`
56+
57+
For disaster recovery notably, it may be useful to reprocess REPLICA
58+
objects to re-sync a backup bucket to the primary site.
59+
60+
#### WORKERS
61+
62+
Specify how many parallel workers should run to update object
63+
metadata. The default is 10 parallel workers.
64+
65+
Example:
66+
67+
`WORKERS=50`
68+
69+
#### MAX_UPDATES
70+
71+
Specify a maximum number of metadata updates to execute before
72+
stopping the script.
73+
74+
If the script reaches this limit, it outputs a log line containing
75+
the KeyMarker and VersionIdMarker to pass to the next invocation (as
76+
environment variables `KEY_MARKER` and `VERSION_ID_MARKER`) and the
77+
updated bucket list without the already completed buckets. At the next
78+
invocation of the script, those two environment variables must be
79+
set and the updated bucket list passed on the command line to resume
80+
where the script stopped.
81+
82+
The default is unlimited (will process the complete listing of buckets
83+
passed on the command line).
84+
85+
**If the script queues too many objects and Backbeat cannot
86+
process them quickly enough, Kafka may drop the oldest entries**,
87+
and the associated objects will stay in the **PENDING** state
88+
permanently without being replicated. When the number of objects
89+
is large, it is a good idea to limit the batch size and wait
90+
for CRR to complete between invocations.
91+
92+
Example:
93+
94+
`MAX_UPDATES=10000`
95+
96+
This limits the number of updates to 10,000 objects, which requeues
97+
a maximum of 10,000 objects to replicate before the script stops.
98+
99+
#### KEY_MARKER
100+
101+
Set to resume from where an earlier invocation stopped (see
102+
[MAX_UPDATES](#MAX_UPDATES)).
103+
104+
Example:
22105

23-
This mode includes the objects that have already been replicated or
24-
that have a replication status attached.
106+
`KEY_MARKER="some/key"`
25107

26-
For disaster recovery notably, to re-sync a backup bucket to the
27-
primary site, it may be useful to reprocess all objects regardless of
28-
the existence of a current replication status (e.g. "REPLICA").
108+
#### VERSION_ID_MARKER
29109

30-
Follow the above steps for using "crrExistingObjects" script, and
31-
specify an extra environment variable `-e "PROCESS_ALL=true"` to force
32-
the script to reset the replication status of all objects in the
33-
bucket to "pending", which will force a replication for all objects.
110+
Set to resume from where an earlier invocation stopped (see
111+
[MAX_UPDATES](#MAX_UPDATES)).
112+
113+
Example:
114+
115+
`VERSION_ID_MARKER="123456789"`
116+
117+
118+
### Example use cases
119+
120+
#### CRR existing objects after setting a replication policy for the first time
121+
122+
For this use case, it's not necessary to pass any extra environment
123+
variable, because the default behavior is to process objects without a
124+
replication status attached.
125+
126+
To avoid requeuing too many entries at once, pass this value:
127+
128+
```
129+
export MAX_UPDATES=10000
130+
```
131+
132+
#### Re-queue objects stuck in PENDING state
133+
134+
If Kafka has dropped replication entries, leaving objects stuck in a
135+
PENDING state without being replicated, pass the following extra
136+
environment variables to reprocess them:
137+
138+
```
139+
export TARGET_REPLICATION_STATUS=PENDING
140+
export MAX_UPDATES=10000
141+
```
142+
143+
**Warning**: This may cause replication of objects already in the
144+
Kafka queue to repeat. To avoid this, set the backbeat consumer
145+
offsets of "backbeat-replication" Kafka topic to the latest topic
146+
offsets before launching the script, to skip over the existing
147+
consumer log.
148+
149+
#### Replicate entries that failed a previous replication
150+
151+
If entries have permanently failed to replicate with a FAILED
152+
replication status and were lost in the failed CRR API, it's still
153+
possible to re-attempt replication later with the following
154+
extra environment variables:
155+
156+
```
157+
export TARGET_REPLICATION_STATUS=FAILED
158+
export MAX_UPDATES=10000
159+
```
160+
161+
#### Re-sync a primary site completely to a new DR site
162+
163+
To re-sync objects to a new DR site (for example, when the original
164+
DR site is lost) force a new replication of all original objects
165+
with the following environment variables (after setting the proper
166+
replication configuration to the DR site bucket):
167+
168+
```
169+
export TARGET_REPLICATION_STATUS=NEW,PENDING,COMPLETED,FAILED
170+
export MAX_UPDATES=10000
171+
```
172+
173+
#### Re-sync a DR site back to the primary site
174+
175+
When objects have been lost from the primary site you can re-sync
176+
objects from the DR site to the primary site by re-syncing the
177+
objects that have a REPLICA status with the following environment
178+
variables (after setting the proper replication configuration
179+
from the DR bucket to the primary bucket):
180+
181+
```
182+
export TARGET_REPLICATION_STATUS=REPLICA
183+
export MAX_UPDATES=10000
184+
```
34185

35186
# Empty a versioned bucket
36187

37-
This script deletes all versions of objects in the bucket including delete markers,
188+
This script deletes all versions of objects in the bucket, including delete markers,
38189
and aborts any ongoing multipart uploads to prepare the bucket for deletion.
39190

40-
**Note: This will delete data associated with the objects and it's not recoverable**
191+
**Note: This deletes the data associated with objects and is not recoverable**
41192
```
42193
node cleanupBuckets.js testbucket1,testbucket2
43194
```
44195

45196
# List objects that failed replication
46197

47-
This script can print the list of objects that failed replication to stdout by
48-
taking a comma-separated list of buckets. Run the command as
198+
This script prints the list of objects that failed replication to stdout,
199+
following a comma-separated list of buckets. Run the command as
49200

50201
````
51202
node listFailedObjects testbucket1,testbucket2

crrExistingObjects.js

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@ const ACCESS_KEY = process.env.ACCESS_KEY;
1313
const SECRET_KEY = process.env.SECRET_KEY;
1414
const ENDPOINT = process.env.ENDPOINT;
1515
const SITE_NAME = process.env.SITE_NAME;
16-
const PROCESS_ALL = process.env.PROCESS_ALL === 'true';
16+
let TARGET_REPLICATION_STATUS = process.env.TARGET_REPLICATION_STATUS;
1717
const WORKERS = (process.env.WORKERS &&
1818
Number.parseInt(process.env.WORKERS, 10)) || 10;
19+
const MAX_UPDATES = (process.env.MAX_UPDATES &&
20+
Number.parseInt(process.env.MAX_UPDATES, 10));
21+
let KEY_MARKER = process.env.KEY_MARKER;
22+
let VERSION_ID_MARKER = process.env.VERSION_ID_MARKER;
23+
1924
const LISTING_LIMIT = 1000;
2025
const LOG_PROGRESS_INTERVAL_MS = 10000;
2126

@@ -36,11 +41,23 @@ if (!SECRET_KEY) {
3641
log.fatal('SECRET_KEY not defined');
3742
process.exit(1);
3843
}
39-
if (PROCESS_ALL) {
40-
log.warn('PROCESS_ALL environment option is active: ' +
41-
'ALL objects in the bucket(s) will be reprocessed for CRR!');
44+
if (!TARGET_REPLICATION_STATUS) {
45+
TARGET_REPLICATION_STATUS = 'NEW';
4246
}
4347

48+
const replicationStatusToProcess = TARGET_REPLICATION_STATUS.split(',');
49+
replicationStatusToProcess.forEach(state => {
50+
if (!['NEW', 'PENDING', 'COMPLETED', 'FAILED', 'REPLICA'].includes(state)) {
51+
log.fatal('invalid TARGET_REPLICATION_STATUS environment: must be a ' +
52+
'comma-separated list of replication statuses to requeue, ' +
53+
'as NEW, PENDING, COMPLETED, FAILED or REPLICA.');
54+
process.exit(1);
55+
}
56+
});
57+
log.info('Objects with replication status ' +
58+
`${replicationStatusToProcess.join(' or ')} ` +
59+
'will be reset to PENDING to trigger CRR');
60+
4461
const options = {
4562
accessKeyId: ACCESS_KEY,
4663
secretAccessKey: SECRET_KEY,
@@ -66,13 +83,24 @@ let nErrors = 0;
6683
let bucketInProgress = null;
6784

6885
function _logProgress() {
69-
log.info(`progress update: ${nProcessed - nSkipped} touched, ` +
86+
log.info(`progress update: ${nProcessed - nSkipped} updated, ` +
7087
`${nSkipped} skipped, ${nErrors} errors, ` +
7188
`bucket in progress: ${bucketInProgress || '(none)'}`);
7289
}
7390

7491
const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS);
7592

93+
function _objectShouldBeUpdated(objMD) {
94+
return replicationStatusToProcess.some(filter => {
95+
if (filter === 'NEW') {
96+
return (!objMD.replicationInfo ||
97+
objMD.replicationInfo.status === '');
98+
}
99+
return (objMD.replicationInfo &&
100+
objMD.replicationInfo.status === filter);
101+
});
102+
}
103+
76104
function _markObjectPending(bucket, key, versionId, storageClass,
77105
repConfig, cb) {
78106
let objMD;
@@ -84,12 +112,9 @@ function _markObjectPending(bucket, key, versionId, storageClass,
84112
Key: key,
85113
VersionId: versionId,
86114
}, next),
87-
// update replication info and put back object blob
88115
(mdRes, next) => {
89116
objMD = JSON.parse(mdRes.Body);
90-
if (!PROCESS_ALL &&
91-
objMD.replicationInfo && objMD.replicationInfo.status !== '') {
92-
// skip object since it's already marked for crr
117+
if (!_objectShouldBeUpdated(objMD)) {
93118
skip = true;
94119
return next();
95120
}
@@ -125,6 +150,7 @@ function _markObjectPending(bucket, key, versionId, storageClass,
125150
return next();
126151
});
127152
},
153+
// update replication info and put back object blob
128154
next => {
129155
if (skip) {
130156
return next();
@@ -215,6 +241,15 @@ function triggerCRROnBucket(bucketName, cb) {
215241
let KeyMarker = null;
216242
bucketInProgress = bucket;
217243
log.info(`starting task for bucket: ${bucket}`);
244+
if (KEY_MARKER || VERSION_ID_MARKER) {
245+
// resume from where we left off in previous script launch
246+
KeyMarker = KEY_MARKER;
247+
VersionIdMarker = VERSION_ID_MARKER;
248+
KEY_MARKER = undefined;
249+
VERSION_ID_MARKER = undefined;
250+
log.info(`resuming at: KeyMarker=${KeyMarker} ` +
251+
`VersionIdMarker=${VersionIdMarker}`);
252+
}
218253
doWhilst(
219254
done => _listObjectVersions(bucket, VersionIdMarker, KeyMarker,
220255
(err, data) => {
@@ -227,6 +262,30 @@ function triggerCRROnBucket(bucketName, cb) {
227262
return _markPending(bucket, data.Versions, done);
228263
}),
229264
() => {
265+
if (nProcessed - nSkipped >= MAX_UPDATES) {
266+
_logProgress();
267+
let remainingBuckets;
268+
if (VersionIdMarker || KeyMarker) {
269+
// next bucket to process is still the current one
270+
remainingBuckets = BUCKETS.slice(
271+
BUCKETS.findIndex(bucket => bucket === bucketName));
272+
} else {
273+
// next bucket to process is the next in bucket list
274+
remainingBuckets = BUCKETS.slice(
275+
BUCKETS.findIndex(bucket => bucket === bucketName) + 1);
276+
}
277+
let message =
278+
'reached update count limit, resuming from this ' +
279+
'point can be achieved by re-running the script with ' +
280+
`the bucket list "${remainingBuckets.join(',')}"`;
281+
if (VersionIdMarker || KeyMarker) {
282+
message += ' and the following environment variables set: '
283+
+ `KEY_MARKER=${KeyMarker} ` +
284+
`VERSION_ID_MARKER=${VersionIdMarker}`;
285+
}
286+
log.info(message);
287+
process.exit(0);
288+
}
230289
if (VersionIdMarker || KeyMarker) {
231290
return true;
232291
}

0 commit comments

Comments
 (0)