Skip to content

Commit

Permalink
Updated retry handling in storage access.
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos committed Jan 27, 2025
1 parent a5dc0d2 commit 45de682
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 91 deletions.
14 changes: 7 additions & 7 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ final class ExportedApi {
// Only delete the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
// TODO: Conditionally deletion API from package:gcloud would be better!
await _bucket.tryDelete(item.name);
await _bucket.tryDeleteWithRetry(item.name);
}
});

Expand All @@ -137,7 +137,7 @@ final class ExportedApi {
item.updated.isBefore(gcFilesBefore)) {
// Only delete the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _bucket.tryDelete(item.name);
await _bucket.tryDeleteWithRetry(item.name);
}
});
}
Expand Down Expand Up @@ -184,7 +184,7 @@ final class ExportedApi {
await _listBucket(
prefix: entry.name,
delimiter: '',
(entry) async => await _bucket.tryDelete(entry.name),
(entry) async => await _bucket.tryDeleteWithRetry(entry.name),
);
}
}));
Expand Down Expand Up @@ -336,7 +336,7 @@ final class ExportedPackage {
item.updated.isBefore(clock.agoBy(_minGarbageAge))) {
// Only delete if the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _owner._bucket.tryDelete(item.name);
await _owner._bucket.tryDeleteWithRetry(item.name);
}
});

Expand Down Expand Up @@ -380,7 +380,7 @@ final class ExportedPackage {
if (info.updated.isBefore(clock.agoBy(_minGarbageAge))) {
// Only delete if the item if it's older than _minGarbageAge
// This avoids any races where we delete files we've just created
await _owner._bucket.tryDelete(item.name);
await _owner._bucket.tryDeleteWithRetry(item.name);
}
}
// Ignore cases where tryInfo fails, assuming the object has been
Expand All @@ -399,7 +399,7 @@ final class ExportedPackage {
await _owner._listBucket(
prefix: prefix + '/api/archives/$_package-',
delimiter: '',
(item) async => await _owner._bucket.tryDelete(item.name),
(item) async => await _owner._bucket.tryDeleteWithRetry(item.name),
);
}),
]);
Expand Down Expand Up @@ -442,7 +442,7 @@ sealed class ExportedObject {
Future<void> delete() async {
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
await _owner._bucket.tryDelete(prefix + _objectName);
await _owner._bucket.tryDeleteWithRetry(prefix + _objectName);
});
}));
}
Expand Down
4 changes: 2 additions & 2 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ class TarballStorage {
Future<void> deleteArchiveFromAllBuckets(
String package, String version) async {
final objectName = tarballObjectName(package, version);
await deleteFromBucket(_canonicalBucket, objectName);
await deleteFromBucket(_publicBucket, objectName);
await _canonicalBucket.deleteWithRetry(objectName);
await _publicBucket.deleteWithRetry(objectName);
}

/// Deletes the package archive file from the canonical bucket.
Expand Down
83 changes: 27 additions & 56 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@ import 'package:retry/retry.dart';

import 'configuration.dart';
import 'utils.dart'
show
contentType,
jsonUtf8Encoder,
retryAsync,
ByteArrayEqualsExt,
DeleteCounts;
show contentType, jsonUtf8Encoder, ByteArrayEqualsExt, DeleteCounts;
import 'versions.dart' as versions;

final _gzip = GZipCodec();
final _logger = Logger('shared.storage');

const _retryStatusCodes = <int>{502, 503, 504};

/// Additional methods on the storage service.
extension StorageExt on Storage {
/// Verifies bucket existence and access.
Expand Down Expand Up @@ -115,18 +108,19 @@ extension BucketExt on Bucket {
}

/// Deletes [name] if it exists, ignores 404 otherwise.
Future<void> tryDelete(String name) async {
return await retry(
Future<bool> tryDeleteWithRetry(String name) async {
return await _retry(
() async {
try {
return await delete(name);
await delete(name);
return true;
} on DetailedApiRequestError catch (e) {
if (e.status == 404) return null;
if (e.status == 404) {
return false;
}
rethrow;
}
},
maxAttempts: 3,
retryIf: _retryIf,
);
}

Expand Down Expand Up @@ -158,7 +152,7 @@ extension BucketExt on Bucket {
if (maxSize != null && length != null && maxSize < length) {
throw MaximumSizeExceeded(maxSize);
}
return retry(
return _retry(
() async {
final timeout = Duration(seconds: 30);
final deadline = clock.now().add(timeout);
Expand All @@ -175,8 +169,6 @@ extension BucketExt on Bucket {
}
return builder.toBytes();
},
maxAttempts: 3,
retryIf: _retryIf,
);
}

Expand Down Expand Up @@ -270,8 +262,17 @@ extension PageExt<T> on Page<T> {
}
}

Future<R> _retry<R>(Future<R> Function() fn) async {
return await retry(fn, maxAttempts: 3, retryIf: _retryIf);
Future<R> _retry<R>(
Future<R> Function() fn, {
FutureOr<void> Function(Exception)? onRetry,
}) async {
return await retry(
fn,
maxAttempts: 3,
delayFactor: Duration(seconds: 2),
retryIf: _retryIf,
onRetry: onRetry,
);
}

bool _retryIf(Exception e) {
Expand All @@ -295,32 +296,6 @@ bool _retryIf(Exception e) {
String bucketUri(Bucket bucket, String path) =>
'gs://${bucket.bucketName}/$path';

/// Deletes a single object from the [bucket].
///
/// Returns `true` if the object was deleted by this operation, `false` if it
/// didn't exist at the time of the operation.
Future<bool> deleteFromBucket(Bucket bucket, String objectName) async {
Future<bool> delete() async {
try {
await bucket.delete(objectName);
return true;
} on DetailedApiRequestError catch (e) {
if (e.status != 404) {
rethrow;
}
return false;
}
}

return await retry(
delete,
delayFactor: Duration(seconds: 10),
maxAttempts: 3,
retryIf: (e) =>
e is DetailedApiRequestError && _retryStatusCodes.contains(e.status),
);
}

Future<void> updateContentDispositionToAttachment(
ObjectInfo info, Bucket bucket) async {
if (info.metadata.contentDisposition != 'attachment') {
Expand Down Expand Up @@ -351,23 +326,19 @@ Future<int> deleteBucketFolderRecursively(
var count = 0;
Page<BucketEntry>? page;
while (page == null || !page.isLast) {
page = await retry(
page = await _retry(
() async {
return page == null
? await bucket.pageWithRetry(
prefix: folder, delimiter: '', pageSize: 100)
: await page.nextWithRetry(pageSize: 100);
},
delayFactor: Duration(seconds: 10),
maxAttempts: 3,
retryIf: (e) =>
e is DetailedApiRequestError && _retryStatusCodes.contains(e.status),
);
final futures = <Future>[];
final pool = Pool(concurrency ?? 1);
for (final entry in page!.items) {
final f = pool.withResource(() async {
final deleted = await deleteFromBucket(bucket, entry.name);
final deleted = await bucket.tryDeleteWithRetry(entry.name);
if (deleted) count++;
});
futures.add(f);
Expand All @@ -382,7 +353,7 @@ Future<int> deleteBucketFolderRecursively(
Future uploadWithRetry(Bucket bucket, String objectName, int length,
Stream<List<int>> Function() openStream,
{ObjectMetadata? metadata}) async {
await retryAsync(
await _retry(
() async {
final sink = bucket.write(objectName,
length: length,
Expand All @@ -391,9 +362,9 @@ Future uploadWithRetry(Bucket bucket, String objectName, int length,
await sink.addStream(openStream());
await sink.close();
},
description: 'Upload to $objectName',
shouldRetryOnError: _retryIf,
sleep: Duration(seconds: 10),
onRetry: (e) {
_logger.info('Upload to $objectName failed.', e, StackTrace.current);
},
);
}

Expand Down Expand Up @@ -506,7 +477,7 @@ class VersionedJsonStorage {
final age = clock.now().difference(info.updated);
if (minAgeThreshold == null || age > minAgeThreshold) {
deleted++;
await deleteFromBucket(_bucket, entry.name);
await _bucket.tryDeleteWithRetry(entry.name);
}
}
});
Expand Down
26 changes: 0 additions & 26 deletions app/lib/shared/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import 'dart:typed_data';

import 'package:appengine/appengine.dart';
import 'package:intl/intl.dart';
import 'package:logging/logging.dart';
// ignore: implementation_imports
import 'package:mime/src/default_extension_map.dart' as mime;
import 'package:path/path.dart' as p;
Expand All @@ -28,7 +27,6 @@ final Duration twoYears = const Duration(days: 2 * 365);
/// Appengine.
const _cloudTraceContextHeader = 'X-Cloud-Trace-Context';

final Logger _logger = Logger('pub.utils');
final _random = Random.secure();

final DateFormat shortDateFormat = DateFormat.yMMMd();
Expand Down Expand Up @@ -171,30 +169,6 @@ List<T> boundedList<T>(List<T> list, {int? offset, int? limit}) {
return iterable.toList();
}

/// Executes [body] and returns with the same result.
/// When it throws an exception, it will be re-run until [maxAttempt] is reached.
Future<R> retryAsync<R>(
Future<R> Function() body, {
int maxAttempt = 3,
bool Function(Exception)? shouldRetryOnError,
String description = 'Async operation',
Duration sleep = const Duration(seconds: 1),
}) async {
for (int i = 1;; i++) {
try {
return await body();
} on Exception catch (e, st) {
_logger.info('$description failed (attempt: $i of $maxAttempt).', e, st);
if (i < maxAttempt &&
(shouldRetryOnError == null || shouldRetryOnError(e))) {
await Future.delayed(sleep);
continue;
}
rethrow;
}
}
}

/// Returns a UUID in v4 format as a `String`.
///
/// If [bytes] is provided, it must be length 16 and have values between `0` and
Expand Down

0 comments on commit 45de682

Please sign in to comment.