Fix issue in counting artifacts on retries, improve parallelization (#198488)

* try to parallelize downloads

* improved logs

* allow 5 parallel downloads

* fake browser requests

* fix artifacts processed file in retries
This commit is contained in:
João Moreno 2023-11-17 15:03:32 +01:00 committed by GitHub
parent f3dfca443d
commit aa61b56229
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 172 additions and 68 deletions

File diff suppressed because one or more lines are too long

View file

@ -48,12 +48,46 @@ class Temp {
}
}
class Sequencer {
export class Limiter {
private current: Promise<unknown> = Promise.resolve(null);
private _size = 0;
private runningPromises: number;
private readonly maxDegreeOfParalellism: number;
private readonly outstandingPromises: { factory: () => Promise<any>; c: (v: any) => void; e: (err: Error) => void }[];
queue<T>(promiseTask: () => Promise<T>): Promise<T> {
return this.current = this.current.then(() => promiseTask(), () => promiseTask());
constructor(maxDegreeOfParalellism: number) {
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
this.outstandingPromises = [];
this.runningPromises = 0;
}
queue<T>(factory: () => Promise<T>): Promise<T> {
this._size++;
return new Promise<T>((c, e) => {
this.outstandingPromises.push({ factory, c, e });
this.consume();
});
}
private consume(): void {
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
const iLimitedTask = this.outstandingPromises.shift()!;
this.runningPromises++;
const promise = iLimitedTask.factory();
promise.then(iLimitedTask.c, iLimitedTask.e);
promise.then(() => this.consumed(), () => this.consumed());
}
}
private consumed(): void {
this._size--;
this.runningPromises--;
if (this.outstandingPromises.length > 0) {
this.consume();
}
}
}
@ -162,7 +196,7 @@ interface ReleaseDetailsResult {
class ESRPClient {
private static Sequencer = new Sequencer();
private static Limiter = new Limiter(1);
private readonly authPath: string;
@ -198,7 +232,7 @@ class ESRPClient {
version: string,
filePath: string
): Promise<Release> {
const submitReleaseResult = await ESRPClient.Sequencer.queue(async () => {
const submitReleaseResult = await ESRPClient.Limiter.queue(async () => {
this.log(`Submitting release for ${version}: ${filePath}`);
return await this.SubmitRelease(version, filePath);
});
@ -392,7 +426,7 @@ class State {
const stageAttempt = e('SYSTEM_STAGEATTEMPT');
this.statePath = path.join(pipelineWorkspacePath, `artifacts_processed_${stageAttempt}`, `artifacts_processed_${stageAttempt}.txt`);
fs.mkdirSync(path.dirname(this.statePath), { recursive: true });
fs.writeFileSync(this.statePath, [...this.set.values()].join('\n'));
fs.writeFileSync(this.statePath, [...this.set.values()].map(name => `${name}\n`).join(''));
}
get size(): number {
@ -413,7 +447,17 @@ class State {
}
}
const azdoFetchOptions = { headers: { Authorization: `Bearer ${e('SYSTEM_ACCESSTOKEN')}` } };
const azdoFetchOptions = {
headers: {
// Pretend we're a web browser to avoid download rate limits
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://dev.azure.com',
Authorization: `Bearer ${e('SYSTEM_ACCESSTOKEN')}`
}
};
async function requestAZDOAPI<T>(path: string): Promise<T> {
const abortController = new AbortController();
@ -461,7 +505,7 @@ async function getPipelineTimeline(): Promise<Timeline> {
async function downloadArtifact(artifact: Artifact, downloadPath: string): Promise<void> {
const abortController = new AbortController();
const timeout = setTimeout(() => abortController.abort(), 6 * 60 * 1000);
const timeout = setTimeout(() => abortController.abort(), 4 * 60 * 1000);
try {
const res = await fetch(artifact.resource.downloadUrl, { ...azdoFetchOptions, signal: abortController.signal });
@ -630,8 +674,8 @@ function getRealType(type: string) {
}
}
const azureSequencer = new Sequencer();
const mooncakeSequencer = new Sequencer();
const azureLimiter = new Limiter(1);
const mooncakeLimiter = new Limiter(1);
async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string, commit: string, filePath: string): Promise<{ assetUrl: string; mooncakeUrl: string }> {
const fileName = path.basename(filePath);
@ -660,7 +704,7 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
if (await retry(() => blobClient.exists())) {
throw new Error(`Blob ${quality}, ${blobName} already exists, not publishing again.`);
} else {
await retry(attempt => azureSequencer.queue(async () => {
await retry(attempt => azureLimiter.queue(async () => {
log(`Uploading blobs to Azure storage (attempt ${attempt})...`);
await blobClient.uploadFile(filePath, blobOptions);
log('Blob successfully uploaded to Azure storage.');
@ -682,7 +726,7 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
if (await retry(() => mooncakeBlobClient.exists())) {
throw new Error(`Mooncake Blob ${quality}, ${blobName} already exists, not publishing again.`);
} else {
await retry(attempt => mooncakeSequencer.queue(async () => {
await retry(attempt => mooncakeLimiter.queue(async () => {
log(`Uploading blobs to Mooncake Azure storage (attempt ${attempt})...`);
await mooncakeBlobClient.uploadFile(filePath, blobOptions);
log('Blob successfully uploaded to Mooncake Azure storage.');
@ -711,8 +755,8 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
return { assetUrl, mooncakeUrl };
}
const downloadSequencer = new Sequencer();
const cosmosSequencer = new Sequencer();
const downloadLimiter = new Limiter(5);
const cosmosLimiter = new Limiter(1);
async function processArtifact(artifact: Artifact): Promise<void> {
const match = /^vscode_(?<product>[^_]+)_(?<os>[^_]+)_(?<arch>[^_]+)_(?<unprocessedType>[^_]+)$/.exec(artifact.name);
@ -723,25 +767,39 @@ async function processArtifact(artifact: Artifact): Promise<void> {
const { product, os, arch, unprocessedType } = match.groups!;
const log = (...args: any[]) => console.log(`[${product} ${os} ${arch} ${unprocessedType}]`, ...args);
const start = Date.now();
const filePath = await retry(async attempt => {
const artifactZipPath = path.join(e('AGENT_TEMPDIRECTORY'), `${artifact.name}.zip`);
await downloadSequencer.queue(async () => {
log(`Downloading ${artifact.resource.downloadUrl} (attempt ${attempt})...`);
await downloadArtifact(artifact, artifactZipPath);
});
log(`Extracting (attempt ${attempt}) ...`);
const start = Date.now();
log(`Downloading ${artifact.resource.downloadUrl} (attempt ${attempt})...`);
try {
await downloadLimiter.queue(() => downloadArtifact(artifact, artifactZipPath));
} catch (err) {
log(`Download failed: ${err.message}`);
throw err;
}
const archiveSize = fs.statSync(artifactZipPath).size;
const downloadDurationS = (Date.now() - start) / 1000;
const downloadSpeedKBS = Math.round((archiveSize / 1024) / downloadDurationS);
log(`Successfully downloaded ${artifact.resource.downloadUrl} after ${Math.floor(downloadDurationS)} seconds (${downloadSpeedKBS} KB/s).`);
const filePath = await unzip(artifactZipPath, e('AGENT_TEMPDIRECTORY'));
const artifactSize = fs.statSync(filePath).size;
if (artifactSize !== Number(artifact.resource.properties.artifactsize)) {
throw new Error(`Artifact size mismatch. Expected ${artifact.resource.properties.artifactsize}. Actual ${artifactSize}`);
log(`Artifact size mismatch. Expected ${artifact.resource.properties.artifactsize}. Actual ${artifactSize}`);
throw new Error(`Artifact size mismatch.`);
}
return filePath;
});
log(`Successfully downloaded and extracted after ${(Date.now() - start) / 1000} seconds.`);
// getPlatform needs the unprocessedType
const quality = e('VSCODE_QUALITY');
const commit = e('BUILD_SOURCEVERSION');
@ -751,8 +809,6 @@ async function processArtifact(artifact: Artifact): Promise<void> {
const stream = fs.createReadStream(filePath);
const [sha1hash, sha256hash] = await Promise.all([hashStream('sha1', stream), hashStream('sha256', stream)]);
log(`Publishing (size = ${size}, SHA1 = ${sha1hash}, SHA256 = ${sha256hash})...`);
const [{ assetUrl, mooncakeUrl }, prssUrl] = await Promise.all([
uploadAssetLegacy(log, quality, commit, filePath),
releaseAndProvision(
@ -774,7 +830,7 @@ async function processArtifact(artifact: Artifact): Promise<void> {
log('Creating asset...', JSON.stringify(asset));
await retry(async (attempt) => {
await cosmosSequencer.queue(async () => {
await cosmosLimiter.queue(async () => {
log(`Creating asset in Cosmos DB (attempt ${attempt})...`);
const aadCredentials = new ClientSecretCredential(e('AZURE_TENANT_ID'), e('AZURE_CLIENT_ID'), e('AZURE_CLIENT_SECRET'));
const client = new CosmosClient({ endpoint: e('AZURE_DOCUMENTDB_ENDPOINT'), aadCredentials });
@ -794,7 +850,7 @@ async function main() {
console.log(`\u2705 ${name}`);
}
const stages = new Set<string>();
const stages = new Set<string>(['Compile', 'CompileCLI']);
if (e('VSCODE_BUILD_STAGE_WINDOWS') === 'True') { stages.add('Windows'); }
if (e('VSCODE_BUILD_STAGE_LINUX') === 'True') { stages.add('Linux'); }
if (e('VSCODE_BUILD_STAGE_ALPINE') === 'True') { stages.add('Alpine'); }
@ -806,21 +862,17 @@ async function main() {
while (true) {
const [timeline, artifacts] = await Promise.all([retry(() => getPipelineTimeline()), retry(() => getPipelineArtifacts())]);
const stagesCompleted = new Set<string>(timeline.records.filter(r => r.type === 'Stage' && r.state === 'completed' && stages.has(r.name)).map(r => r.name));
const stagesInProgress = [...stages].filter(s => !stagesCompleted.has(s));
if (stagesInProgress.length > 0) {
console.log('Stages in progress:', stagesInProgress.join(', '));
}
const artifactsInProgress = artifacts.filter(a => processing.has(a.name));
if (artifactsInProgress.length > 0) {
console.log('Artifacts in progress:', artifactsInProgress.map(a => a.name).join(', '));
}
if (stagesCompleted.size === stages.size && artifacts.length === done.size + processing.size) {
if (stagesInProgress.length === 0 && artifacts.length === done.size + processing.size) {
break;
} else if (stagesInProgress.length > 0) {
console.log('Stages in progress:', stagesInProgress.join(', '));
} else if (artifactsInProgress.length > 0) {
console.log('Artifacts in progress:', artifactsInProgress.map(a => a.name).join(', '));
} else {
console.log(`Waiting for a total of ${artifacts.length}, ${done.size} done, ${processing.size} in progress...`);
}
for (const artifact of artifacts) {

View file

@ -633,8 +633,7 @@ stages:
- ${{ if eq(variables['VSCODE_PUBLISH'], 'true') }}:
- stage: Publish
dependsOn:
- Compile
dependsOn: []
pool: 1es-windows-2019-x64
variables:
- name: BUILDS_API_URL