Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify download counts processing #7977

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 76 additions & 74 deletions app/lib/service/download_counts/sync_download_counts.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,113 +74,115 @@ Future<bool> processDownloadCounts(
}

bool hasFailedLines = false;
bool hasPartiallyFailedLines = false;
// Before '2024-05-03' the query generating the download count data had a bug
// in the reg exp causing incorrect versions to be stored.
final regExpQueryFixDate = DateTime.parse('2024-05-03');
final processedPackages = <String>{};
var lines = <String>[];

try {
lines = utf8.decode(bytes).split('\n');
} on FormatException catch (e) {
_logger.severe('Failed to utf8 decode bytes of $downloadCountsFileName/n'
'$e');
return false;
}

final pool = Pool(10);
await Future.wait(lines.map((line) async {
return await pool.withResource(() async {
if (line.isBlank) {
return;
}
String package;
final String package;
final Map<String, int> dayCounts;
try {
final data = json.decode(line) as Map<String, dynamic>;
final dayCounts = _extractDayCounts(data);
final data = json.decode(line);
if (data is! Map<String, dynamic>) {
throw FormatException('Download counts data is not valid json');
}

if (data['package'] is! String) {
throw FormatException('"package" must be a String');
}
package = data['package'] as String;
List<String> versions;
try {
// Validate that the package and version exist.
// First do it via the cached data, fall back to query for invisible
// and moderated packages.
versions = (await packageBackend.listVersionsCached(package))
.versions
.map((e) => e.version)
.toList();

final nonExistingVersions = <String>[];
dayCounts.keys.forEach((version) {
if (!versions.contains(version)) {
nonExistingVersions.add(version);
if (date.isBefore(regExpQueryFixDate)) {
// If the data is generated before the fix of the query, we
// ignore versions that do not exist.
_logger.warning(
'$package-$version appeared in download counts data but does'
' not exist');
} else {
hasPartiallyFailedLines = true;
_logger.severe(
'$package-$version appeared in download counts data but does'
' not exist');
}
}
});

nonExistingVersions.forEach((v) => dayCounts.remove(v));
} on NotFoundException catch (e) {
final pkg = await packageBackend.lookupPackage(package);
// The package is neither invisible or tombstoned, hence there is
// probably an error in the generated data.
if (pkg == null &&
(await packageBackend.lookupModeratedPackage(package)) == null) {
_logger.severe(
'Package $package appeared in download counts data for file '
'$downloadCountsFileName but does not exist.\n'
'Error: $e');
hasPartiallyFailedLines = true;
return;
} // else {
// The package is either invisible, tombstoned or has no versions.
// }
}
await downloadCountsBackend.updateDownloadCounts(
package,
dayCounts,
date,
);
processedPackages.add(package);

dayCounts = _extractDayCounts(data);
} on FormatException catch (e) {
_logger.severe(
'Failed to proccess line $line of file $downloadCountsFileName \n'
'$e');
hasFailedLines = true;
return;
}

List<String> versions;
try {
// Validate that the package and version exist and ignore the
// non-existing packages and versions.
// First do it via the cached data, fall back to query for invisible
// and moderated packages.
versions = (await packageBackend.listVersionsCached(package))
.versions
.map((e) => e.version)
.toList();

final nonExistingVersions = <String>[];
dayCounts.keys.forEach((version) {
if (!versions.contains(version)) {
nonExistingVersions.add(version);
if (date.isBefore(regExpQueryFixDate)) {
// If the data is generated before the fix of the query, we
// ignore versions that do not exist.
_logger.warning(
'$package-$version appeared in download counts data but does'
' not exist');
} else {
_logger.severe(
'$package-$version appeared in download counts data but does'
' not exist');
}
}
});

nonExistingVersions.forEach((v) => dayCounts.remove(v));
} on NotFoundException catch (e) {
final pkg = await packageBackend.lookupPackage(package);
// The package is neither invisible or tombstoned, hence there is
// probably an error in the generated data.
if (pkg == null &&
(await packageBackend.lookupModeratedPackage(package)) == null) {
_logger.severe(
'Package $package appeared in download counts data for file '
'$downloadCountsFileName but does not exist.\n'
'Error: $e');
return;
} // else {
// The package is either invisible, tombstoned or has no versions.
// }
}

await downloadCountsBackend.updateDownloadCounts(
package,
dayCounts,
date,
);
});
}));

if (hasFailedLines) {
return false;
} else {
// We only add '0's for unmentioned packages if all lines have been
// succesfully processed. Otherwise we risk adding '0' for a package that
// has no data due to some error during processing.
final allPackageNames = await packageBackend.allPackageNames().toSet();
final missingPackages =
allPackageNames.difference(processedPackages.toSet());

await Future.wait(missingPackages.map((package) async {
return await pool.withResource(() async {
// Calling 'updateDownloadCounts' for 'package' with an empty dataset
// causes '0' to be added for all versions, hereby indicating 0 downloads.
await downloadCountsBackend.updateDownloadCounts(package, {}, date);
});
}));
return !hasPartiallyFailedLines;
}
// Record zero downloads for this date for packages not mentioned in the
// query output.
final allPackageNames = await packageBackend.allPackageNames().toSet();
final missingPackages = allPackageNames.difference(processedPackages.toSet());
await Future.wait(missingPackages.map((package) async {
return await pool.withResource(() async {
// Calling 'updateDownloadCounts' for 'package' with an empty dataset
// causes '0' to be added for all versions, hereby indicating 0 downloads.
await downloadCountsBackend.updateDownloadCounts(package, {}, date);
});
}));
return !hasFailedLines;
}

const defaultNumberOfSyncDays = 5;
Expand Down
2 changes: 1 addition & 1 deletion app/test/service/download_counts/download_counts_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void main() {
} finally {
await subscription.cancel();
}
expect(succeeded, false);
expect(succeeded, true);
expect(messages.first, contains('Could not find `package "hest"`.'));
// We still process the lines that are possible
final countData =
Expand Down
Loading