Workaround for AZDO artifact throttling (#198545)

* download and extract in main thread, spin off other threads for rest of IO

* remove limiters: everything runs on workers now

* add docs
This commit is contained in:
João Moreno 2023-11-17 18:54:55 +01:00 committed by GitHub
parent 74d6858c83
commit fc6b857956
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 131 additions and 203 deletions

File diff suppressed because one or more lines are too long

View file

@ -17,6 +17,7 @@ import { CosmosClient } from '@azure/cosmos';
import { ClientSecretCredential } from '@azure/identity';
import * as cp from 'child_process';
import * as os from 'os';
import { Worker, isMainThread, workerData } from 'node:worker_threads';
function e(name: string): string {
const result = process.env[name];
@ -48,49 +49,6 @@ class Temp {
}
}
export class Limiter {
private _size = 0;
private runningPromises: number;
private readonly maxDegreeOfParalellism: number;
private readonly outstandingPromises: { factory: () => Promise<any>; c: (v: any) => void; e: (err: Error) => void }[];
constructor(maxDegreeOfParalellism: number) {
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
this.outstandingPromises = [];
this.runningPromises = 0;
}
queue<T>(factory: () => Promise<T>): Promise<T> {
this._size++;
return new Promise<T>((c, e) => {
this.outstandingPromises.push({ factory, c, e });
this.consume();
});
}
private consume(): void {
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
const iLimitedTask = this.outstandingPromises.shift()!;
this.runningPromises++;
const promise = iLimitedTask.factory();
promise.then(iLimitedTask.c, iLimitedTask.e);
promise.then(() => this.consumed(), () => this.consumed());
}
}
private consumed(): void {
this._size--;
this.runningPromises--;
if (this.outstandingPromises.length > 0) {
this.consume();
}
}
}
interface RequestOptions {
readonly body?: string;
}
@ -196,8 +154,6 @@ interface ReleaseDetailsResult {
class ESRPClient {
private static Limiter = new Limiter(1);
private readonly authPath: string;
constructor(
@ -232,10 +188,8 @@ class ESRPClient {
version: string,
filePath: string
): Promise<Release> {
const submitReleaseResult = await ESRPClient.Limiter.queue(async () => {
this.log(`Submitting release for ${version}: ${filePath}`);
return await this.SubmitRelease(version, filePath);
});
this.log(`Submitting release for ${version}: ${filePath}`);
const submitReleaseResult = await this.SubmitRelease(version, filePath);
if (submitReleaseResult.submissionResponse.statusCode !== 'pass') {
throw new Error(`Unexpected status code: ${submitReleaseResult.submissionResponse.statusCode}`);
@ -674,9 +628,6 @@ function getRealType(type: string) {
}
}
const azureLimiter = new Limiter(1);
const mooncakeLimiter = new Limiter(1);
async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string, commit: string, filePath: string): Promise<{ assetUrl: string; mooncakeUrl: string }> {
const fileName = path.basename(filePath);
const blobName = commit + '/' + fileName;
@ -704,11 +655,11 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
if (await retry(() => blobClient.exists())) {
throw new Error(`Blob ${quality}, ${blobName} already exists, not publishing again.`);
} else {
await retry(attempt => azureLimiter.queue(async () => {
await retry(async attempt => {
log(`Uploading blobs to Azure storage (attempt ${attempt})...`);
await blobClient.uploadFile(filePath, blobOptions);
log('Blob successfully uploaded to Azure storage.');
}));
});
}
})());
@ -726,11 +677,11 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
if (await retry(() => mooncakeBlobClient.exists())) {
throw new Error(`Mooncake Blob ${quality}, ${blobName} already exists, not publishing again.`);
} else {
await retry(attempt => mooncakeLimiter.queue(async () => {
await retry(async attempt => {
log(`Uploading blobs to Mooncake Azure storage (attempt ${attempt})...`);
await mooncakeBlobClient.uploadFile(filePath, blobOptions);
log('Blob successfully uploaded to Mooncake Azure storage.');
}));
});
}
})());
}
@ -748,69 +699,33 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
throw rejectedPromiseResults[0]?.reason;
}
const assetUrl = `${e('AZURE_CDN_URL')}/${quality}/${blobName}`;
const assetUrl = `${e('AZURE_CDN_URL')} / ${quality} / ${blobName}`;
const blobPath = new URL(assetUrl).pathname;
const mooncakeUrl = `${e('MOONCAKE_CDN_URL')}${blobPath}`;
return { assetUrl, mooncakeUrl };
}
const downloadLimiter = new Limiter(5);
const cosmosLimiter = new Limiter(1);
async function processArtifact(artifact: Artifact): Promise<void> {
async function processArtifact(artifact: Artifact, artifactFilePath: string): Promise<void> {
const log = (...args: any[]) => console.log(`[${artifact.name}]`, ...args);
const match = /^vscode_(?<product>[^_]+)_(?<os>[^_]+)_(?<arch>[^_]+)_(?<unprocessedType>[^_]+)$/.exec(artifact.name);
if (!match) {
throw new Error(`Invalid artifact name: ${artifact.name}`);
}
const { product, os, arch, unprocessedType } = match.groups!;
const log = (...args: any[]) => console.log(`[${product} ${os} ${arch} ${unprocessedType}]`, ...args);
const start = Date.now();
const filePath = await retry(async attempt => {
const artifactZipPath = path.join(e('AGENT_TEMPDIRECTORY'), `${artifact.name}.zip`);
const start = Date.now();
log(`Downloading ${artifact.resource.downloadUrl} (attempt ${attempt})...`);
try {
await downloadLimiter.queue(() => downloadArtifact(artifact, artifactZipPath));
} catch (err) {
log(`Download failed: ${err.message}`);
throw err;
}
const archiveSize = fs.statSync(artifactZipPath).size;
const downloadDurationS = (Date.now() - start) / 1000;
const downloadSpeedKBS = Math.round((archiveSize / 1024) / downloadDurationS);
log(`Successfully downloaded ${artifact.resource.downloadUrl} after ${Math.floor(downloadDurationS)} seconds (${downloadSpeedKBS} KB/s).`);
const filePath = await unzip(artifactZipPath, e('AGENT_TEMPDIRECTORY'));
const artifactSize = fs.statSync(filePath).size;
if (artifactSize !== Number(artifact.resource.properties.artifactsize)) {
log(`Artifact size mismatch. Expected ${artifact.resource.properties.artifactsize}. Actual ${artifactSize}`);
throw new Error(`Artifact size mismatch.`);
}
return filePath;
});
log(`Successfully downloaded and extracted after ${(Date.now() - start) / 1000} seconds.`);
// getPlatform needs the unprocessedType
const quality = e('VSCODE_QUALITY');
const commit = e('BUILD_SOURCEVERSION');
const { product, os, arch, unprocessedType } = match.groups!;
const platform = getPlatform(product, os, arch, unprocessedType);
const type = getRealType(unprocessedType);
const size = fs.statSync(filePath).size;
const stream = fs.createReadStream(filePath);
const size = fs.statSync(artifactFilePath).size;
const stream = fs.createReadStream(artifactFilePath);
const [sha1hash, sha256hash] = await Promise.all([hashStream('sha1', stream), hashStream('sha256', stream)]);
const [{ assetUrl, mooncakeUrl }, prssUrl] = await Promise.all([
uploadAssetLegacy(log, quality, commit, filePath),
uploadAssetLegacy(log, quality, commit, artifactFilePath),
releaseAndProvision(
log,
e('RELEASE_TENANT_ID'),
@ -822,7 +737,7 @@ async function processArtifact(artifact: Artifact): Promise<void> {
e('PROVISION_AAD_PASSWORD'),
commit,
quality,
filePath
artifactFilePath
)
]);
@ -830,19 +745,29 @@ async function processArtifact(artifact: Artifact): Promise<void> {
log('Creating asset...', JSON.stringify(asset));
await retry(async (attempt) => {
await cosmosLimiter.queue(async () => {
log(`Creating asset in Cosmos DB (attempt ${attempt})...`);
const aadCredentials = new ClientSecretCredential(e('AZURE_TENANT_ID'), e('AZURE_CLIENT_ID'), e('AZURE_CLIENT_SECRET'));
const client = new CosmosClient({ endpoint: e('AZURE_DOCUMENTDB_ENDPOINT'), aadCredentials });
const scripts = client.database('builds').container(quality).scripts;
await scripts.storedProcedure('createAsset').execute('', [commit, asset, true]);
});
log(`Creating asset in Cosmos DB(attempt ${attempt})...`);
const aadCredentials = new ClientSecretCredential(e('AZURE_TENANT_ID'), e('AZURE_CLIENT_ID'), e('AZURE_CLIENT_SECRET'));
const client = new CosmosClient({ endpoint: e('AZURE_DOCUMENTDB_ENDPOINT'), aadCredentials });
const scripts = client.database('builds').container(quality).scripts;
await scripts.storedProcedure('createAsset').execute('', [commit, asset, true]);
});
log('Asset successfully created');
}
// It is VERY important that we don't download artifacts too much too fast from AZDO.
// AZDO throttles us SEVERELY if we do. Not just that, but they also close open
// sockets, so the whole things turns to a grinding halt. So, downloading and extracting
// happens serially in the main thread, making the downloads are spaced out
// properly. For each extracted artifact, we spawn a worker thread to upload it to
// the CDN and finally update the build in Cosmos DB.
async function main() {
if (!isMainThread) {
const { artifact, artifactFilePath } = workerData;
await processArtifact(artifact, artifactFilePath);
return;
}
const done = new State();
const processing = new Set<string>();
@ -880,12 +805,45 @@ async function main() {
continue;
}
console.log(`Found new artifact: ${artifact.name}`);
console.log(`${artifact.name} Found new artifact`);
const artifactZipPath = path.join(e('AGENT_TEMPDIRECTORY'), `${artifact.name}.zip`);
await retry(async (attempt) => {
const start = Date.now();
console.log(`[${artifact.name}]Downloading(attempt ${attempt})...`);
await downloadArtifact(artifact, artifactZipPath);
const archiveSize = fs.statSync(artifactZipPath).size;
const downloadDurationS = (Date.now() - start) / 1000;
const downloadSpeedKBS = Math.round((archiveSize / 1024) / downloadDurationS);
console.log(`[${artifact.name}] Successfully downloaded after ${Math.floor(downloadDurationS)} seconds(${downloadSpeedKBS} KB / s).`);
});
const artifactFilePath = await unzip(artifactZipPath, e('AGENT_TEMPDIRECTORY'));
const artifactSize = fs.statSync(artifactFilePath).size;
if (artifactSize !== Number(artifact.resource.properties.artifactsize)) {
console.log(`[${artifact.name}] Artifact size mismatch.Expected ${artifact.resource.properties.artifactsize}. Actual ${artifactSize} `);
throw new Error(`Artifact size mismatch.`);
}
processing.add(artifact.name);
const operation = processArtifact(artifact).then(() => {
const promise = new Promise<void>((resolve, reject) => {
const worker = new Worker(__filename, { workerData: { artifact, artifactFilePath } });
worker.on('error', reject);
worker.on('exit', code => {
if (code === 0) {
resolve();
} else {
reject(new Error('Worker stopped with exit code ${code}'));
}
});
});
const operation = promise.then(() => {
processing.delete(artifact.name);
done.add(artifact.name);
console.log(`\u2705 ${artifact.name}`);
console.log(`\u2705 ${artifact.name} `);
});
operations.push({ name: artifact.name, operation });