Skip to content

Commit

Permalink
retry more operations when accessing storage + enforcer in tests (dar…
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos authored Dec 16, 2024
1 parent b1cecfe commit 6352f10
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 11 deletions.
11 changes: 6 additions & 5 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ final class ExportedApi {
required String prefix,
required String delimiter,
}) async {
var p = await _pool.withResource(() async => await _bucket.page(
var p = await _pool.withResource(() async => await _bucket.pageWithRetry(
prefix: prefix,
delimiter: delimiter,
pageSize: 1000,
Expand All @@ -240,7 +240,8 @@ final class ExportedApi {
}));

if (p.isLast) break;
p = await _pool.withResource(() async => await p.next(pageSize: 1000));
p = await _pool
.withResource(() async => await p.nextWithRetry(pageSize: 1000));
}
}
}
Expand Down Expand Up @@ -632,7 +633,7 @@ final class ExportedBlob extends ExportedObject {
final retouchDeadline = clock.agoBy(_updateValidatedAfter);
if (destinationInfo.metadata.validated.isBefore(retouchDeadline)) {
try {
await _owner._bucket.updateMetadata(dst, _metadata());
await _owner._bucket.updateMetadataWithRetry(dst, _metadata());
} catch (e, st) {
// This shouldn't happen, but if a metadata update does fail, it's
// hardly the end of the world.
Expand All @@ -646,7 +647,7 @@ final class ExportedBlob extends ExportedObject {

// If dst or source doesn't exist, then we shall attempt to make a copy.
// (if source doesn't exist we'll consistently get an error from here!)
await _owner._storage.copyObject(
await _owner._storage.copyObjectWithRetry(
source.absoluteObjectName,
_owner._bucket.absoluteObjectName(dst),
metadata: _metadata(),
Expand All @@ -667,7 +668,7 @@ extension on Bucket {
if (info.metadata.validated
.isBefore(clock.agoBy(_updateValidatedAfter))) {
try {
await updateMetadata(name, metadata);
await updateMetadataWithRetry(name, metadata);
} catch (e, st) {
// This shouldn't happen, but if a metadata update does fail, it's
// hardly the end of the world.
Expand Down
2 changes: 1 addition & 1 deletion app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ class PackageBackend {
_logger.info('Removing temporary object $guid.');

sw.reset();
await _incomingBucket.delete(tmpObjectName(guid));
await _incomingBucket.deleteWithRetry(tmpObjectName(guid));
_logger.info('Temporary object removed in ${sw.elapsed}.');
return version;
});
Expand Down
3 changes: 2 additions & 1 deletion app/lib/service/download_counts/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'package:pub_dev/shared/cached_value.dart';
import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/redis_cache.dart';
import 'package:pub_dev/shared/storage.dart';

/// Sets the download counts backend service.
void registerDownloadCountsBackend(DownloadCountsBackend backend) =>
Expand Down Expand Up @@ -42,7 +43,7 @@ class DownloadCountsBackend {
try {
final info = await storageService
.bucket(activeConfiguration.reportsBucketName!)
.info(downloadCounts30DaysTotalsFileName);
.infoWithRetry(downloadCounts30DaysTotalsFileName);

if (_lastData.etag == info.etag) {
return _lastData.data;
Expand Down
3 changes: 2 additions & 1 deletion app/lib/service/services.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:appengine/appengine.dart';
import 'package:clock/clock.dart';
import 'package:fake_gcloud/mem_datastore.dart';
import 'package:fake_gcloud/mem_storage.dart';
import 'package:fake_gcloud/retry_enforcer_storage.dart';
import 'package:gcloud/service_scope.dart';
import 'package:gcloud/storage.dart';
import 'package:googleapis_auth/auth_io.dart' as auth;
Expand Down Expand Up @@ -148,7 +149,7 @@ Future<R> withFakeServices<R>({
return await fork(() async {
register(#appengine.context, FakeClientContext());
registerDbService(DatastoreDB(datastore!));
registerStorageService(storage!);
registerStorageService(RetryEnforcerStorage(storage!));
IOServer? frontendServer;
IOServer? searchServer;
if (configuration == null) {
Expand Down
57 changes: 55 additions & 2 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ extension StorageExt on Storage {
/// local environment.
Future<void> verifyBucketExistenceAndAccess(String bucketName) async {
// check bucket existence
if (!await bucketExists(bucketName)) {
if (!await bucketExistsWithRetry(bucketName)) {
final message = 'Bucket "$bucketName" does not exists!';
_logger.shout(message);
if (envConfig.isRunningLocally) {
Expand All @@ -69,10 +69,29 @@ extension StorageExt on Storage {
return;
}
}

/// Check whether a cloud storage bucket exists with the default retry.
Future<bool> bucketExistsWithRetry(String bucketName) async {
return await _retry(() => bucketExists(bucketName));
}

/// Copy an object with the default retry.
Future<void> copyObjectWithRetry(
String src,
String dest, {
ObjectMetadata? metadata,
}) async {
return await _retry(() async => await copyObject(src, dest));
}
}

/// Additional methods on buckets.
extension BucketExt on Bucket {
/// Lookup object metadata with default retry.
Future<ObjectInfo> infoWithRetry(String name) async {
return await _retry(() => info(name));
}

/// Returns an [ObjectInfo] if [name] exists, `null` otherwise.
Future<ObjectInfo?> tryInfo(String name) async {
return await retry(
Expand All @@ -89,6 +108,11 @@ extension BucketExt on Bucket {
);
}

/// Delete an object with default retry.
Future<void> deleteWithRetry(String name) async {
return await _retry(() => delete(name));
}

/// Deletes [name] if it exists, ignores 404 otherwise.
Future<void> tryDelete(String name) async {
return await retry(
Expand Down Expand Up @@ -159,6 +183,35 @@ extension BucketExt on Bucket {
String objectUrl(String objectName) {
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
}

/// Update object metadata with default retry rules.
Future<void> updateMetadataWithRetry(
String objectName, ObjectMetadata metadata) async {
return await _retry(() async => await updateMetadata(objectName, metadata));
}

/// Start paging through objects in the bucket with the default retry.
Future<Page<BucketEntry>> pageWithRetry(
{String? prefix, String? delimiter, int pageSize = 50}) async {
return await _retry(
() async => await page(
prefix: prefix,
delimiter: delimiter,
pageSize: pageSize,
),
);
}
}

extension PageExt<T> on Page<T> {
/// Move to the next page with default retry.
Future<Page<T>> nextWithRetry({int pageSize = 50}) async {
return await _retry(() => next(pageSize: pageSize));
}
}

Future<R> _retry<R>(Future<R> Function() fn) async {
return await retry(fn, maxAttempts: 3, retryIf: _retryIf);
}

bool _retryIf(Exception e) {
Expand Down Expand Up @@ -212,7 +265,7 @@ Future<void> updateContentDispositionToAttachment(
ObjectInfo info, Bucket bucket) async {
if (info.metadata.contentDisposition != 'attachment') {
try {
await bucket.updateMetadata(
await bucket.updateMetadataWithRetry(
info.name, info.metadata.replace(contentDisposition: 'attachment'));
} on Exception catch (e, st) {
_logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion app/test/admin/exported_api_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void main() {
final oldData = oldRoot[path] as Map;
final bucket =
storageService.bucket(activeConfiguration.exportedApiBucketName!);
await bucket.updateMetadata(
await bucket.updateMetadataWithRetry(
path,
ObjectMetadata(
contentType: 'text/plain',
Expand Down
Loading

0 comments on commit 6352f10

Please sign in to comment.