[infra] Remove old pub/sub processing of results

The results are now updated by the builder, so the builder status
does not need to poll the database for the results to finish processing.

The script is also migrated to null safety.

Change-Id: I7913f7fb0a8eedc074a4b79eb75c1c2813525c17
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/221780
Commit-Queue: William Hesse <whesse@google.com>
Reviewed-by: Alexander Thomas <athom@google.com>
This commit is contained in:
William Hesse 2021-12-02 13:14:27 +00:00 committed by Commit Bot
parent aa27868c84
commit bc9f2bdc88
2 changed files with 58 additions and 209 deletions

View file

@ -8,8 +8,6 @@
// These cloud functions write a success/failure result to the
// builder table based on the approvals in Firestore.
// @dart = 2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
@ -20,27 +18,26 @@ import 'package:http/http.dart' as http;
const numAttempts = 20;
const failuresPerConfiguration = 20;
/*late*/ bool useStagingDatabase;
late bool useStagingDatabase;
Uri get _queryUrl {
var project = useStagingDatabase ? 'dart-ci-staging' : 'dart-ci';
final project = useStagingDatabase ? 'dart-ci-staging' : 'dart-ci';
return Uri.https('firestore.googleapis.com',
'/v1/projects/$project/databases/(default)/documents:runQuery');
}
/*late*/ String builder;
/*late*/ String builderBase;
/*late*/ int buildNumber;
/*late*/ String token;
/*late*/ http.Client client;
late String builder;
late String builderBase;
late int buildNumber;
late String token;
late http.Client client;
String get buildTable => builder.endsWith('-try') ? 'try_builds' : 'builds';
String get resultsTable => builder.endsWith('-try') ? 'try_results' : 'results';
bool booleanFieldOrFalse(Map<String, dynamic> document, String field) {
Map<String, dynamic> fieldObject = document['fields'][field];
if (fieldObject == null) return false;
return fieldObject['booleanValue'] ?? false;
final fieldObject = document['fields'][field];
return fieldObject?['booleanValue'] ?? false;
}
void usage(ArgParser parser) {
@ -56,12 +53,12 @@ ${parser.usage}''');
}
Future<String> readGcloudAuthToken(String path) async {
String token = await File(path).readAsString();
return token.split("\n").first;
final token = await File(path).readAsString();
return token.split('\n').first;
}
main(List<String> args) async {
final parser = new ArgParser();
void main(List<String> args) async {
final parser = ArgParser();
parser.addFlag('help', help: 'Show the program usage.', negatable: false);
parser.addOption('auth_token',
abbr: 'a', help: 'Authorization token with cloud-platform scope');
@ -75,9 +72,9 @@ main(List<String> args) async {
usage(parser);
}
useStagingDatabase = options['staging'] /*!*/;
builder = options['builder'] /*!*/;
buildNumber = int.parse(options['build_number'] /*!*/);
useStagingDatabase = options['staging'];
builder = options['builder'];
buildNumber = int.parse(options['build_number']);
builderBase = builder.replaceFirst(RegExp('-try\$'), '');
if (options['auth_token'] == null) {
print('Option "--auth_token (-a)" is required\n');
@ -85,83 +82,60 @@ main(List<String> args) async {
}
token = await readGcloudAuthToken(options['auth_token']);
client = http.Client();
for (int count = 0; count < numAttempts; ++count) {
if (count > 0) {
await Future.delayed(Duration(seconds: 10));
}
final response = await runFirestoreQuery(buildQuery());
if (response.statusCode == HttpStatus.ok) {
final documents = jsonDecode(response.body);
final document = documents.first['document'];
if (document != null) {
bool success = booleanFieldOrFalse(document, 'success');
bool completed = booleanFieldOrFalse(document, 'completed');
if (completed) {
print(success
? 'No new unapproved failures'
: 'There are new unapproved failures on this build');
if (builder.endsWith('-try')) exit(success ? 0 : 1);
final configurations = await getConfigurations();
final failures = await fetchActiveFailures(configurations);
if (failures.isNotEmpty) {
print('There are unapproved failures');
printActiveFailures(failures);
exit(1);
} else {
print('There are no unapproved failures');
exit(0);
}
}
String chunks =
(document['fields']['num_chunks'] ?? const {})['integerValue'];
String processedChunks = (document['fields']['processed_chunks'] ??
const {})['integerValue'];
if (processedChunks != null) {
print([
'Received',
processedChunks,
if (chunks != null) ...['out of', chunks],
'chunks.'
].join(' '));
}
} else {
print('No results received for build $buildNumber of $builder');
}
} else {
print('HTTP status ${response.statusCode} received '
'when fetching build data');
}
final response = await runFirestoreQuery(buildQuery());
if (response.statusCode != HttpStatus.ok) {
print('HTTP status ${response.statusCode} received '
'when fetching build data');
exit(2);
}
final documents = jsonDecode(response.body);
final document = documents.first['document'];
if (document == null) {
print('No results received for build $buildNumber of $builder');
exit(2);
}
final success = booleanFieldOrFalse(document, 'success');
print(success
? 'No new unapproved failures'
: 'There are new unapproved failures on this build');
if (builder.endsWith('-try')) exit(success ? 0 : 1);
final configurations = await getConfigurations();
final failures = await fetchActiveFailures(configurations);
if (failures.isNotEmpty) {
print('There are unapproved failures');
printActiveFailures(failures);
exit(1);
} else {
print('There are no unapproved failures');
exit(0);
}
print('No status received for build $buildNumber of $builder '
'after $numAttempts attempts, with 10 second waits.');
exit(2);
}
Future<List<String>> getConfigurations() async {
final response = await runFirestoreQuery(configurationsQuery());
if (response.statusCode == HttpStatus.ok) {
final documents = jsonDecode(response.body);
final groups = <String /*!*/ >{
for (Map document in documents)
if (document.containsKey('document'))
document['document']['name'].split('/').last
};
return groups.toList();
if (response.statusCode != HttpStatus.ok) {
print('Could not fetch configurations for $builderBase');
return [];
}
print('Could not fetch configurations for $builderBase');
return [];
final documents = jsonDecode(response.body);
final groups = <String>{
for (Map document in documents)
if (document.containsKey('document'))
document['document']['name'].split('/').last
};
return groups.toList();
}
Map<int, Future<String>> commitHashes = {};
Future<String> commitHash(int index) =>
commitHashes.putIfAbsent(index, () => fetchCommitHash(index));
Future<String /*!*/ > fetchCommitHash(int index) async {
Future<String> fetchCommitHash(int index) async {
final response = await runFirestoreQuery(commitQuery(index));
if (response.statusCode == HttpStatus.ok) {
final document = jsonDecode(response.body).first['document'];
if (document != null) {
return document['name'] /*!*/ .split('/').last;
return document['name'].split('/').last;
}
}
print('Could not fetch commit with index $index');
@ -169,7 +143,7 @@ Future<String /*!*/ > fetchCommitHash(int index) async {
}
Future<Map<String, List<Map<String, dynamic>>>> fetchActiveFailures(
List<String /*!*/ > configurations) async {
List<String> configurations) async {
final failures = <String, List<Map<String, dynamic>>>{};
for (final configuration in configurations) {
final response =
@ -197,9 +171,9 @@ Future<Map<String, List<Map<String, dynamic>>>> fetchActiveFailures(
}
void printActiveFailures(Map<String, List<Map<String, dynamic>>> failures) {
for (final configuration in failures.keys) {
failures.forEach((configuration, failureList) {
print('($configuration):');
for (final failure in failures[configuration]) {
for (final failure in failureList) {
print([
' ',
failure['name'],
@ -217,7 +191,7 @@ void printActiveFailures(Map<String, List<Map<String, dynamic>>> failures) {
]
].join(''));
}
}
});
}
Future<http.Response> runFirestoreQuery(String query) {

View file

@ -1,125 +0,0 @@
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Post results from Dart tryjobs and CI builders to Cloud Pub/Sub.
//
// Reads a results.json input file, sends only the changed results from
// that file to the Pub/Sub channel 'results' in the 'dart-ci' project.
// Multiple messages are sent if there are more than 100 changed results,
// so the cloud function only needs to process 100 records within its time
// limit of 60s. Because of this, we never approach the limit of 10 MB
// base64-encoded data bytes per message.
import 'dart:convert';
import 'dart:io';
import 'package:args/args.dart';
import 'package:http/http.dart' as http;
void usage(ArgParser parser, {exitCode = 0}) {
print('''
Usage: post_results_to_pubsub.dart [OPTIONS]
Posts Dart CI results as messages to Google Cloud Pub/Sub
The options are as follows:
${parser.usage}''');
exit(exitCode);
}
const resultsPerMessage = 50;
Uri _postUrl(String project) => Uri.https(
'pubsub.googleapis.com', 'v1/projects/$project/topics/results:publish');
main(List<String> args) async {
final parser = new ArgParser();
parser.addFlag('help', help: 'Show the program usage.', negatable: false);
parser.addOption('auth_token',
abbr: 'a',
help: 'Authorization token with a scope including pubsub publish.');
parser.addOption('result_file',
abbr: 'f', help: 'File containing the results to send');
parser.addOption('id', abbr: 'i', help: 'Buildbucket ID of this build');
parser.addOption('base_revision', help: 'A try build\'s patch base');
parser.addFlag('staging',
abbr: 's', help: 'Publish to the staging system', defaultsTo: false);
final options = parser.parse(args);
if (options['help']) {
usage(parser);
}
if (options['result_file'] == null) {
print('Error: option "result_file" is required.\n');
usage(parser, exitCode: 1);
}
if (options['auth_token'] == null) {
print('Error: option "auth_token" is required.\n');
usage(parser, exitCode: 1);
}
final project = options['staging'] ? "dart-ci-staging" : "dart-ci";
final client = http.Client();
final lines = await File(options['result_file']).readAsLines();
final token = await File(options['auth_token']).readAsString();
final buildbucketID = options['id'];
final baseRevision = options['base_revision'];
if (lines.isEmpty) {
print('No results in input file');
return;
}
// TODO(karlklose): parse and validate data before sending it.
final changedPattern = '"changed":true';
List<String> changedResults =
lines.where((change) => change.contains(changedPattern)).toList();
// We need to send at least one result, to save build metadata to Firestore.
// Send an unchanged result - the cloud function filters these out.
if (changedResults.isEmpty) changedResults = lines.sublist(0, 1);
final chunks = <List<String>>[];
var position = 0;
final lastFullChunkStart = changedResults.length - resultsPerMessage;
while (position <= lastFullChunkStart) {
chunks.add(changedResults.sublist(position, position += resultsPerMessage));
}
if (position < changedResults.length)
chunks.add(changedResults.sublist(position));
// Send pubsub messages.
for (final chunk in chunks) {
// Space messages out to reduce scaling problems
const chunkDelay = Duration(seconds: 2);
if (chunk != chunks.first) {
await Future.delayed(chunkDelay);
}
final message = '[\n${chunk.join(",\n")}\n]';
final base64data = base64Encode(utf8.encode(message.toString()));
final attributes = {
if (chunk == chunks.last) 'num_chunks': chunks.length.toString(),
if (buildbucketID != null) 'buildbucket_id': buildbucketID,
if (baseRevision != null) 'base_revision': baseRevision,
};
final jsonMessage = jsonEncode({
'messages': [
{'attributes': attributes, 'data': base64data}
]
});
final headers = {'Authorization': 'Bearer $token'};
final postUrl = _postUrl(project);
final response =
await client.post(postUrl, headers: headers, body: jsonMessage);
print('Sent pubsub message containing ${chunk.length} results');
print('Status ${response.statusCode}');
print('Response: ${response.body}');
}
print('Number of Pub/Sub messages sent: ${chunks.length}');
client.close();
}