diff --git a/app/lib/package/api_export/exported_api.dart b/app/lib/package/api_export/exported_api.dart index 475ce5777..fd4448239 100644 --- a/app/lib/package/api_export/exported_api.dart +++ b/app/lib/package/api_export/exported_api.dart @@ -229,7 +229,7 @@ final class ExportedApi { required String prefix, required String delimiter, }) async { - var p = await _pool.withResource(() async => await _bucket.page( + var p = await _pool.withResource(() async => await _bucket.pageWithRetry( prefix: prefix, delimiter: delimiter, pageSize: 1000, @@ -240,7 +240,8 @@ final class ExportedApi { })); if (p.isLast) break; - p = await _pool.withResource(() async => await p.next(pageSize: 1000)); + p = await _pool + .withResource(() async => await p.nextWithRetry(pageSize: 1000)); } } } @@ -632,7 +633,7 @@ final class ExportedBlob extends ExportedObject { final retouchDeadline = clock.agoBy(_updateValidatedAfter); if (destinationInfo.metadata.validated.isBefore(retouchDeadline)) { try { - await _owner._bucket.updateMetadata(dst, _metadata()); + await _owner._bucket.updateMetadataWithRetry(dst, _metadata()); } catch (e, st) { // This shouldn't happen, but if a metadata update does fail, it's // hardly the end of the world. @@ -646,7 +647,7 @@ final class ExportedBlob extends ExportedObject { // If dst or source doesn't exist, then we shall attempt to make a copy. // (if source doesn't exist we'll consistently get an error from here!) - await _owner._storage.copyObject( + await _owner._storage.copyObjectWithRetry( source.absoluteObjectName, _owner._bucket.absoluteObjectName(dst), metadata: _metadata(), @@ -667,7 +668,7 @@ extension on Bucket { if (info.metadata.validated .isBefore(clock.agoBy(_updateValidatedAfter))) { try { - await updateMetadata(name, metadata); + await updateMetadataWithRetry(name, metadata); } catch (e, st) { // This shouldn't happen, but if a metadata update does fail, it's // hardly the end of the world. diff --git a/app/lib/package/backend.dart b/app/lib/package/backend.dart index 5aabe52ec..f328a4aba 100644 --- a/app/lib/package/backend.dart +++ b/app/lib/package/backend.dart @@ -996,7 +996,7 @@ class PackageBackend { _logger.info('Removing temporary object $guid.'); sw.reset(); - await _incomingBucket.delete(tmpObjectName(guid)); + await _incomingBucket.deleteWithRetry(tmpObjectName(guid)); _logger.info('Temporary object removed in ${sw.elapsed}.'); return version; }); diff --git a/app/lib/service/download_counts/backend.dart b/app/lib/service/download_counts/backend.dart index 1ddb97600..e891e0891 100644 --- a/app/lib/service/download_counts/backend.dart +++ b/app/lib/service/download_counts/backend.dart @@ -15,6 +15,7 @@ import 'package:pub_dev/shared/cached_value.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/redis_cache.dart'; +import 'package:pub_dev/shared/storage.dart'; /// Sets the download counts backend service. void registerDownloadCountsBackend(DownloadCountsBackend backend) => @@ -42,7 +43,7 @@ class DownloadCountsBackend { try { final info = await storageService .bucket(activeConfiguration.reportsBucketName!) - .info(downloadCounts30DaysTotalsFileName); + .infoWithRetry(downloadCounts30DaysTotalsFileName); if (_lastData.etag == info.etag) { return _lastData.data; diff --git a/app/lib/service/services.dart b/app/lib/service/services.dart index 5cc872b18..075b5a983 100644 --- a/app/lib/service/services.dart +++ b/app/lib/service/services.dart @@ -9,6 +9,7 @@ import 'package:appengine/appengine.dart'; import 'package:clock/clock.dart'; import 'package:fake_gcloud/mem_datastore.dart'; import 'package:fake_gcloud/mem_storage.dart'; +import 'package:fake_gcloud/retry_enforcer_storage.dart'; import 'package:gcloud/service_scope.dart'; import 'package:gcloud/storage.dart'; import 'package:googleapis_auth/auth_io.dart' as auth; @@ -148,7 +149,7 @@ Future withFakeServices({ return await fork(() async { register(#appengine.context, FakeClientContext()); registerDbService(DatastoreDB(datastore!)); - registerStorageService(storage!); + registerStorageService(RetryEnforcerStorage(storage!)); IOServer? frontendServer; IOServer? searchServer; if (configuration == null) { diff --git a/app/lib/shared/storage.dart b/app/lib/shared/storage.dart index e9e936109..ef320c729 100644 --- a/app/lib/shared/storage.dart +++ b/app/lib/shared/storage.dart @@ -44,7 +44,7 @@ extension StorageExt on Storage { /// local environment. Future verifyBucketExistenceAndAccess(String bucketName) async { // check bucket existence - if (!await bucketExists(bucketName)) { + if (!await bucketExistsWithRetry(bucketName)) { final message = 'Bucket "$bucketName" does not exists!'; _logger.shout(message); if (envConfig.isRunningLocally) { @@ -69,10 +69,29 @@ extension StorageExt on Storage { return; } } + + /// Check whether a cloud storage bucket exists with the default retry. + Future bucketExistsWithRetry(String bucketName) async { + return await _retry(() => bucketExists(bucketName)); + } + + /// Copy an object with the default retry. + Future copyObjectWithRetry( + String src, + String dest, { + ObjectMetadata? metadata, + }) async { + return await _retry(() async => await copyObject(src, dest)); + } } /// Additional methods on buckets. extension BucketExt on Bucket { + /// Lookup object metadata with default retry. + Future infoWithRetry(String name) async { + return await _retry(() => info(name)); + } + /// Returns an [ObjectInfo] if [name] exists, `null` otherwise. Future tryInfo(String name) async { return await retry( @@ -89,6 +108,11 @@ extension BucketExt on Bucket { ); } + /// Delete an object with default retry. + Future deleteWithRetry(String name) async { + return await _retry(() => delete(name)); + } + /// Deletes [name] if it exists, ignores 404 otherwise. Future tryDelete(String name) async { return await retry( @@ -159,6 +183,35 @@ extension BucketExt on Bucket { String objectUrl(String objectName) { return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName'; } + + /// Update object metadata with default retry rules. + Future updateMetadataWithRetry( + String objectName, ObjectMetadata metadata) async { + return await _retry(() async => await updateMetadata(objectName, metadata)); + } + + /// Start paging through objects in the bucket with the default retry. + Future> pageWithRetry( + {String? prefix, String? delimiter, int pageSize = 50}) async { + return await _retry( + () async => await page( + prefix: prefix, + delimiter: delimiter, + pageSize: pageSize, + ), + ); + } +} + +extension PageExt on Page { + /// Move to the next page with default retry. + Future> nextWithRetry({int pageSize = 50}) async { + return await _retry(() => next(pageSize: pageSize)); + } +} + +Future _retry(Future Function() fn) async { + return await retry(fn, maxAttempts: 3, retryIf: _retryIf); } bool _retryIf(Exception e) { @@ -212,7 +265,7 @@ Future updateContentDispositionToAttachment( ObjectInfo info, Bucket bucket) async { if (info.metadata.contentDisposition != 'attachment') { try { - await bucket.updateMetadata( + await bucket.updateMetadataWithRetry( info.name, info.metadata.replace(contentDisposition: 'attachment')); } on Exception catch (e, st) { _logger.warning( diff --git a/app/test/admin/exported_api_sync_test.dart b/app/test/admin/exported_api_sync_test.dart index 91a40bb51..0d68a5f87 100644 --- a/app/test/admin/exported_api_sync_test.dart +++ b/app/test/admin/exported_api_sync_test.dart @@ -181,7 +181,7 @@ void main() { final oldData = oldRoot[path] as Map; final bucket = storageService.bucket(activeConfiguration.exportedApiBucketName!); - await bucket.updateMetadata( + await bucket.updateMetadataWithRetry( path, ObjectMetadata( contentType: 'text/plain', diff --git a/pkg/fake_gcloud/lib/retry_enforcer_storage.dart b/pkg/fake_gcloud/lib/retry_enforcer_storage.dart new file mode 100644 index 000000000..38a9d3e53 --- /dev/null +++ b/pkg/fake_gcloud/lib/retry_enforcer_storage.dart @@ -0,0 +1,247 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:gcloud/storage.dart'; + +void _verifyRetryOnStack() { + final st = StackTrace.current.toString(); + if (st.contains('package:retry/')) return; + if (st.contains('retryAsync')) return; // lib/shared/utils.dart + + // detect direct test calls + final linesWithoutThisFile = st + .split('\n') + .where((l) => !l.contains('retry_enforcer_storage.dart')) + .toList(); + if (linesWithoutThisFile.isNotEmpty && + linesWithoutThisFile.first.contains('_test.dart')) { + return; + } + + print('Missing retry detected:\n$st\n'); + throw AssertionError('retry is not present in stacktrace: $st'); +} + +Future _verifyRetry( + Future Function() fn, { + bool ignore = false, +}) async { + if (!ignore) { + _verifyRetryOnStack(); + } + return await fn(); +} + +/// A storage implementation that enforces (or just report) retry wrapper +/// on [Storage] API calls. +class RetryEnforcerStorage implements Storage { + final Storage _storage; + + RetryEnforcerStorage(this._storage); + + @override + Bucket bucket( + String bucketName, { + PredefinedAcl? defaultPredefinedObjectAcl, + Acl? defaultObjectAcl, + }) { + return _RetryEnforcerBucket(_storage.bucket( + bucketName, + defaultObjectAcl: defaultObjectAcl, + defaultPredefinedObjectAcl: defaultPredefinedObjectAcl, + )); + } + + @override + Future bucketExists(String bucketName) async { + return await _verifyRetry( + () => _storage.bucketExists(bucketName), + ignore: true, + ); + } + + @override + Future bucketInfo(String bucketName) async { + return await _verifyRetry( + () async => await _storage.bucketInfo(bucketName), + ignore: true, + ); + } + + @override + Future copyObject(String src, String dest, {ObjectMetadata? metadata}) async { + return await _verifyRetry( + () => _storage.copyObject(src, dest, metadata: metadata), + ignore: true, + ); + } + + @override + Future createBucket( + String bucketName, { + PredefinedAcl? predefinedAcl, + Acl? acl, + }) async { + return await _verifyRetry( + () async => await _storage.createBucket( + bucketName, + predefinedAcl: predefinedAcl, + acl: acl, + ), + ignore: true, + ); + } + + @override + Future deleteBucket(String bucketName) async { + return await _verifyRetry( + () => _storage.deleteBucket(bucketName), + ignore: true, + ); + } + + @override + Stream listBucketNames() { + return _storage.listBucketNames(); + } + + @override + Future> pageBucketNames({int pageSize = 50}) async { + return await _verifyRetry( + () => _storage.pageBucketNames(pageSize: pageSize), + ignore: true, + ); + } +} + +class _RetryEnforcerBucket implements Bucket { + final Bucket _bucket; + + _RetryEnforcerBucket(this._bucket); + + @override + String absoluteObjectName(String objectName) { + return _bucket.absoluteObjectName(objectName); + } + + @override + String get bucketName => _bucket.bucketName; + + @override + Future delete(String name) async { + return await _verifyRetry( + () async => await _bucket.delete(name), + ignore: true, + ); + } + + @override + Future info(String name) async { + return await _verifyRetry( + () async => await _bucket.info(name), + ignore: true, + ); + } + + @override + Stream list({String? prefix, String? delimiter}) { + // TODO: verify retry wrapper here + return _bucket.list( + prefix: prefix, + delimiter: delimiter, + ); + } + + @override + Future> page({ + String? prefix, + String? delimiter, + int pageSize = 50, + }) async { + return await _verifyRetry( + () async => _RetryEnforcerPage(await _bucket.page( + prefix: prefix, + delimiter: delimiter, + pageSize: pageSize, + )), + ignore: true, + ); + } + + @override + Stream> read(String objectName, {int? offset, int? length}) { + // TODO: verify retry wrapper here + return _bucket.read(objectName, offset: offset, length: length); + } + + @override + Future updateMetadata(String objectName, ObjectMetadata metadata) async { + return await _verifyRetry( + () async => await _bucket.updateMetadata(objectName, metadata), + ); + } + + @override + StreamSink> write( + String objectName, { + int? length, + ObjectMetadata? metadata, + Acl? acl, + PredefinedAcl? predefinedAcl, + String? contentType, + }) { + _verifyRetryOnStack(); + return _bucket.write( + objectName, + length: length, + metadata: metadata, + acl: acl, + predefinedAcl: predefinedAcl, + contentType: contentType, + ); + } + + @override + Future writeBytes( + String name, + List bytes, { + ObjectMetadata? metadata, + Acl? acl, + PredefinedAcl? predefinedAcl, + String? contentType, + }) async { + return await _verifyRetry( + () async => await _bucket.writeBytes( + name, + bytes, + metadata: metadata, + acl: acl, + predefinedAcl: predefinedAcl, + contentType: contentType, + ), + ignore: true, + ); + } +} + +class _RetryEnforcerPage implements Page { + final Page _page; + _RetryEnforcerPage(this._page); + + @override + bool get isLast => _page.isLast; + + @override + List get items => _page.items; + + @override + Future> next({int pageSize = 50}) async { + return await _verifyRetry( + () async => _RetryEnforcerPage(await _page.next(pageSize: pageSize)), + ignore: true, + ); + } +}