Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeouts and VM exit for index building process. #8504

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions app/lib/search/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class SearchBackend {
// update or remove the document
await retry(() async {
try {
final doc = await loadDocument(package);
final doc = await loadDocument(package).timeout(Duration(minutes: 2));
snapshot.add(doc);
} on RemovedPackageException catch (_) {
snapshot.remove(package);
Expand All @@ -158,29 +158,34 @@ class SearchBackend {
}

// initial scan of packages
final pool = Pool(concurrency);
final futures = <Future>[];
await for (final package in dbService.query<Package>().run()) {
if (package.isNotVisible) {
continue;
await withVmTerminationTimeout(timeout: Duration(hours: 12), () async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just add .timeout(Duration(hours: 12)) when calling doCreateAndUpdateSnapshot?

If such a timeout does happen there, isn't that a good place to just not catch the timeout and let it bubble all the way up? Do we need to explicitly kill the VM?

final pool = Pool(concurrency);
final futures = <Future>[];
await for (final package in dbService.query<Package>().run()) {
if (package.isNotVisible) {
continue;
}
if (!claim.valid) {
break;
}
// This is the first scan, there isn't any existing document that we
// can compare to, ignoring the updated field.
final f = pool.withResource(() => updatePackage(package.name!, null));
futures.add(f);
}
await Future.wait(futures);
await pool.close();
if (!claim.valid) {
break;
return;
}
// This is the first scan, there isn't any existing document that we
// can compare to, ignoring the updated field.
final f = pool.withResource(() => updatePackage(package.name!, null));
futures.add(f);
}
await Future.wait(futures);
futures.clear();
if (!claim.valid) {
return;
}
snapshot.updateAllScores();
snapshot.updateAllScores();

// first complete snapshot, uploading it
await _snapshotStorage
.uploadDataAsJsonMap(snapshot.toJson())
.timeout(Duration(minutes: 10));
});

// first complete snapshot, uploading it
await _snapshotStorage.uploadDataAsJsonMap(snapshot.toJson());
var lastUploadedSnapshotTimestamp = snapshot.updated!;

// start monitoring
Expand All @@ -193,29 +198,34 @@ class SearchBackend {

lastQueryStarted = now;

// query updates
final recentlyUpdated = await _queryRecentlyUpdated(lastQueryStarted);
for (final e in recentlyUpdated.entries) {
if (!claim.valid) {
break;
await withVmTerminationTimeout(timeout: Duration(hours: 1), () async {
final pool = Pool(concurrency);
final futures = <Future>[];
// query updates
final recentlyUpdated = await _queryRecentlyUpdated(lastQueryStarted);
for (final e in recentlyUpdated.entries) {
if (!claim.valid) {
break;
}
final f = pool.withResource(() => updatePackage(e.key, e.value));
futures.add(f);
}
final f = pool.withResource(() => updatePackage(e.key, e.value));
futures.add(f);
}
await Future.wait(futures);
futures.clear();
await Future.wait(futures);
await pool.close();
});

if (claim.valid && lastUploadedSnapshotTimestamp != snapshot.updated) {
// Updates the normalized scores across all the packages.
snapshot.updateAllScores();

await _snapshotStorage.uploadDataAsJsonMap(snapshot.toJson());
await _snapshotStorage
.uploadDataAsJsonMap(snapshot.toJson())
.timeout(Duration(minutes: 10));
lastUploadedSnapshotTimestamp = snapshot.updated!;
}

await Future.delayed(sleepDuration);
}
await pool.close();
}

Future<Map<String, DateTime>> _queryRecentlyUpdated(
Expand Down
31 changes: 31 additions & 0 deletions app/lib/shared/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import 'dart:typed_data';

import 'package:appengine/appengine.dart';
import 'package:intl/intl.dart';
import 'package:logging/logging.dart';
// ignore: implementation_imports
import 'package:mime/src/default_extension_map.dart' as mime;
import 'package:path/path.dart' as p;
import 'package:pub_dev/shared/monitoring.dart';
import 'package:pub_semver/pub_semver.dart' as semver;

export 'package:pana/pana.dart' show exampleFileCandidates;
Expand All @@ -28,6 +30,7 @@ final Duration twoYears = const Duration(days: 2 * 365);
const _cloudTraceContextHeader = 'X-Cloud-Trace-Context';

final _random = Random.secure();
final _logger = Logger('pub.utils');

final DateFormat shortDateFormat = DateFormat.yMMMd();

Expand Down Expand Up @@ -305,3 +308,31 @@ extension ByteFolderExt on Stream<List<int>> {
return buffer.toBytes();
}
}

/// Executes [fn] returning its results, but terminating the Dart VM if that
/// execution takes longer than [timeout].
Future<T> withVmTerminationTimeout<T>(
Future<T> Function() fn, {
required Duration timeout,
}) async {
final trace = StackTrace.current;
final timer = Timer(timeout, () {
// Give the logging a short time to be stored outside of the machine.
Timer(Duration(seconds: 10), () async {
exit(-1);
});

stderr.writeln('Timeout triggering VM termination\n$trace');
_logger.pubNoticeShout(
'vm-termination',
'Timeout triggering VM termination',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this terminate the VM or does AppEngine just restart the process?

Exception(),
trace,
);
});
try {
return await fn();
} finally {
timer.cancel();
}
}
5 changes: 4 additions & 1 deletion app/test/shared/timer_import_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ void main() {
// TODO: consider refactor/redesign
'lib/shared/redis_cache.dart',

// Uses timer to auto-kill worker isolates.
// Uses timer to kill long-running processing.
'lib/shared/utils.dart',

// Uses timer to prevent long GCs.
'lib/service/entrypoint/_isolate.dart',

// Uses timer to send stats periodically to the frontend isolate.
Expand Down
Loading