diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index f7a577547..8d407cf18 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -149,7 +149,7 @@ class SearchBackend { // update or remove the document await retry(() async { try { - final doc = await loadDocument(package); + final doc = await loadDocument(package).timeout(Duration(minutes: 2)); snapshot.add(doc); } on RemovedPackageException catch (_) { snapshot.remove(package); @@ -158,29 +158,34 @@ class SearchBackend { } // initial scan of packages - final pool = Pool(concurrency); - final futures = []; - await for (final package in dbService.query().run()) { - if (package.isNotVisible) { - continue; + await withVmTerminationTimeout(timeout: Duration(hours: 12), () async { + final pool = Pool(concurrency); + final futures = []; + await for (final package in dbService.query().run()) { + if (package.isNotVisible) { + continue; + } + if (!claim.valid) { + break; + } + // This is the first scan, there isn't any existing document that we + // can compare to, ignoring the updated field. + final f = pool.withResource(() => updatePackage(package.name!, null)); + futures.add(f); } + await Future.wait(futures); + await pool.close(); if (!claim.valid) { - break; + return; } - // This is the first scan, there isn't any existing document that we - // can compare to, ignoring the updated field. - final f = pool.withResource(() => updatePackage(package.name!, null)); - futures.add(f); - } - await Future.wait(futures); - futures.clear(); - if (!claim.valid) { - return; - } - snapshot.updateAllScores(); + snapshot.updateAllScores(); + + // first complete snapshot, uploading it + await _snapshotStorage + .uploadDataAsJsonMap(snapshot.toJson()) + .timeout(Duration(minutes: 10)); + }); - // first complete snapshot, uploading it - await _snapshotStorage.uploadDataAsJsonMap(snapshot.toJson()); var lastUploadedSnapshotTimestamp = snapshot.updated!; // start monitoring @@ -193,29 +198,34 @@ class SearchBackend { lastQueryStarted = now; - // query updates - final recentlyUpdated = await _queryRecentlyUpdated(lastQueryStarted); - for (final e in recentlyUpdated.entries) { - if (!claim.valid) { - break; + await withVmTerminationTimeout(timeout: Duration(hours: 1), () async { + final pool = Pool(concurrency); + final futures = []; + // query updates + final recentlyUpdated = await _queryRecentlyUpdated(lastQueryStarted); + for (final e in recentlyUpdated.entries) { + if (!claim.valid) { + break; + } + final f = pool.withResource(() => updatePackage(e.key, e.value)); + futures.add(f); } - final f = pool.withResource(() => updatePackage(e.key, e.value)); - futures.add(f); - } - await Future.wait(futures); - futures.clear(); + await Future.wait(futures); + await pool.close(); + }); if (claim.valid && lastUploadedSnapshotTimestamp != snapshot.updated) { // Updates the normalized scores across all the packages. snapshot.updateAllScores(); - await _snapshotStorage.uploadDataAsJsonMap(snapshot.toJson()); + await _snapshotStorage + .uploadDataAsJsonMap(snapshot.toJson()) + .timeout(Duration(minutes: 10)); lastUploadedSnapshotTimestamp = snapshot.updated!; } await Future.delayed(sleepDuration); } - await pool.close(); } Future> _queryRecentlyUpdated( diff --git a/app/lib/shared/utils.dart b/app/lib/shared/utils.dart index 6990c243f..18f0af3b6 100644 --- a/app/lib/shared/utils.dart +++ b/app/lib/shared/utils.dart @@ -11,9 +11,11 @@ import 'dart:typed_data'; import 'package:appengine/appengine.dart'; import 'package:intl/intl.dart'; +import 'package:logging/logging.dart'; // ignore: implementation_imports import 'package:mime/src/default_extension_map.dart' as mime; import 'package:path/path.dart' as p; +import 'package:pub_dev/shared/monitoring.dart'; import 'package:pub_semver/pub_semver.dart' as semver; export 'package:pana/pana.dart' show exampleFileCandidates; @@ -28,6 +30,7 @@ final Duration twoYears = const Duration(days: 2 * 365); const _cloudTraceContextHeader = 'X-Cloud-Trace-Context'; final _random = Random.secure(); +final _logger = Logger('pub.utils'); final DateFormat shortDateFormat = DateFormat.yMMMd(); @@ -305,3 +308,31 @@ extension ByteFolderExt on Stream> { return buffer.toBytes(); } } + +/// Executes [fn] returning its results, but terminating the Dart VM if that +/// execution takes longer than [timeout]. +Future withVmTerminationTimeout( + Future Function() fn, { + required Duration timeout, +}) async { + final trace = StackTrace.current; + final timer = Timer(timeout, () { + // Give the logging a short time to be stored outside of the machine. + Timer(Duration(seconds: 10), () async { + exit(-1); + }); + + stderr.writeln('Timeout triggering VM termination\n$trace'); + _logger.pubNoticeShout( + 'vm-termination', + 'Timeout triggering VM termination', + Exception(), + trace, + ); + }); + try { + return await fn(); + } finally { + timer.cancel(); + } +} diff --git a/app/test/shared/timer_import_test.dart b/app/test/shared/timer_import_test.dart index 704a0866a..e319099cb 100644 --- a/app/test/shared/timer_import_test.dart +++ b/app/test/shared/timer_import_test.dart @@ -26,7 +26,10 @@ void main() { // TODO: consider refactor/redesign 'lib/shared/redis_cache.dart', - // Uses timer to auto-kill worker isolates. + // Uses timer to kill long-running processing. + 'lib/shared/utils.dart', + + // Uses timer to prevent long GCs. 'lib/service/entrypoint/_isolate.dart', // Uses timer to send stats periodically to the frontend isolate.