diff --git a/src/vs/platform/files/common/diskFileSystemProvider.ts b/src/vs/platform/files/common/diskFileSystemProvider.ts index 5f5b623201b..b45744820b7 100644 --- a/src/vs/platform/files/common/diskFileSystemProvider.ts +++ b/src/vs/platform/files/common/diskFileSystemProvider.ts @@ -211,6 +211,10 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen this._onDidWatchError.fire(msg.message); } + this.logWatcherMessage(msg); + } + + protected logWatcherMessage(msg: ILogMessage): void { this.logService[msg.type](msg.message); } diff --git a/src/vs/platform/files/common/watcher.ts b/src/vs/platform/files/common/watcher.ts index 3aeba5d80ad..844eda6cac0 100644 --- a/src/vs/platform/files/common/watcher.ts +++ b/src/vs/platform/files/common/watcher.ts @@ -5,7 +5,7 @@ import { Event } from 'vs/base/common/event'; import { GLOBSTAR, IRelativePattern, parse, ParsedPattern } from 'vs/base/common/glob'; -import { Disposable, DisposableStore, MutableDisposable } from 'vs/base/common/lifecycle'; +import { Disposable, DisposableStore, IDisposable, MutableDisposable } from 'vs/base/common/lifecycle'; import { isAbsolute } from 'vs/base/common/path'; import { isLinux } from 'vs/base/common/platform'; import { URI } from 'vs/base/common/uri'; @@ -122,6 +122,20 @@ export interface IRecursiveWatcher extends IWatcher { watch(requests: IRecursiveWatchRequest[]): Promise; } +export interface IRecursiveWatcherWithSubscribe extends IRecursiveWatcher { + + /** + * Subscribe to file events for the given path. The callback is called + * whenever a file event occurs for the path. I fthe watcher failed, + * the error parameter is set to `true`. + * + * @returns an `IDisposable` to stop listening to events or `undefined` + * if no events can be watched for the path given the current set of + * recursive watch requests. + */ + subscribe(path: string, callback: (error: boolean, change?: IFileChange) => void): IDisposable | undefined; +} + export interface IRecursiveWatcherOptions { /** diff --git a/src/vs/platform/files/node/watcher/baseWatcher.ts b/src/vs/platform/files/node/watcher/baseWatcher.ts index 9b9c35ae6c3..0f78d72b729 100644 --- a/src/vs/platform/files/node/watcher/baseWatcher.ts +++ b/src/vs/platform/files/node/watcher/baseWatcher.ts @@ -5,10 +5,11 @@ import { watchFile, unwatchFile, Stats } from 'fs'; import { Disposable, DisposableMap, DisposableStore, toDisposable } from 'vs/base/common/lifecycle'; -import { ILogMessage, IUniversalWatchRequest, IWatchRequestWithCorrelation, IWatcher, isWatchRequestWithCorrelation } from 'vs/platform/files/common/watcher'; +import { ILogMessage, IRecursiveWatcherWithSubscribe, IUniversalWatchRequest, IWatchRequestWithCorrelation, IWatcher, isWatchRequestWithCorrelation } from 'vs/platform/files/common/watcher'; import { Emitter, Event } from 'vs/base/common/event'; import { FileChangeType, IFileChange } from 'vs/platform/files/common/files'; import { URI } from 'vs/base/common/uri'; +import { DeferredPromise } from 'vs/base/common/async'; export abstract class BaseWatcher extends Disposable implements IWatcher { @@ -25,9 +26,12 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { private readonly allCorrelatedWatchRequests = new Map(); private readonly suspendedWatchRequests = this._register(new DisposableMap()); + private readonly suspendedWatchRequestsWithPolling = new Set(); protected readonly suspendedWatchRequestPollingInterval: number = 5007; // node.js default + private joinWatch = new DeferredPromise(); + constructor() { super(); @@ -41,6 +45,11 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { // to experiment with this feature in a controlled way. Monitoring requests // requires us to install polling watchers (via `fs.watchFile()`) and thus // should be used sparingly. + // + // TODO@bpasero revisit this in the future to have a more general approach + // for suspend/resume and drop the `legacyMonitorRequest` in parcel. + // One issue is that we need to be able to uniquely identify a request and + // without correlation that is actually harder... return; } @@ -53,26 +62,36 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { } async watch(requests: IUniversalWatchRequest[]): Promise { - this.allCorrelatedWatchRequests.clear(); - this.allNonCorrelatedWatchRequests.clear(); - - // Figure out correlated vs. non-correlated requests - for (const request of requests) { - if (this.isCorrelated(request)) { - this.allCorrelatedWatchRequests.set(request.correlationId, request); - } else { - this.allNonCorrelatedWatchRequests.add(request); - } + if (!this.joinWatch.isSettled) { + this.joinWatch.complete(); } + this.joinWatch = new DeferredPromise(); - // Remove all suspended correlated watch requests that are no longer watched - for (const [correlationId] of this.suspendedWatchRequests) { - if (!this.allCorrelatedWatchRequests.has(correlationId)) { - this.suspendedWatchRequests.deleteAndDispose(correlationId); + try { + this.allCorrelatedWatchRequests.clear(); + this.allNonCorrelatedWatchRequests.clear(); + + // Figure out correlated vs. non-correlated requests + for (const request of requests) { + if (this.isCorrelated(request)) { + this.allCorrelatedWatchRequests.set(request.correlationId, request); + } else { + this.allNonCorrelatedWatchRequests.add(request); + } } - } - return this.updateWatchers(); + // Remove all suspended correlated watch requests that are no longer watched + for (const [correlationId] of this.suspendedWatchRequests) { + if (!this.allCorrelatedWatchRequests.has(correlationId)) { + this.suspendedWatchRequests.deleteAndDispose(correlationId); + this.suspendedWatchRequestsWithPolling.delete(correlationId); + } + } + + return await this.updateWatchers(); + } finally { + this.joinWatch.complete(); + } } private updateWatchers(): Promise { @@ -82,7 +101,15 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { ]); } - private suspendWatchRequest(request: IWatchRequestWithCorrelation): void { + isSuspended(request: IUniversalWatchRequest): 'polling' | boolean { + if (typeof request.correlationId !== 'number') { + return false; + } + + return this.suspendedWatchRequestsWithPolling.has(request.correlationId) ? 'polling' : this.suspendedWatchRequests.has(request.correlationId); + } + + private async suspendWatchRequest(request: IWatchRequestWithCorrelation): Promise { if (this.suspendedWatchRequests.has(request.correlationId)) { return; // already suspended } @@ -90,6 +117,17 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { const disposables = new DisposableStore(); this.suspendedWatchRequests.set(request.correlationId, disposables); + // It is possible that a watch request fails right during watch() + // phase while other requests succeed. To increase the chance of + // reusing another watcher for suspend/resume tracking, we await + // all watch requests having processed. + + await this.joinWatch.p; + + if (disposables.isDisposed) { + return; + } + this.monitorSuspendedWatchRequest(request, disposables); this.updateWatchers(); @@ -97,14 +135,44 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { private resumeWatchRequest(request: IWatchRequestWithCorrelation): void { this.suspendedWatchRequests.deleteAndDispose(request.correlationId); + this.suspendedWatchRequestsWithPolling.delete(request.correlationId); this.updateWatchers(); } - private monitorSuspendedWatchRequest(request: IWatchRequestWithCorrelation, disposables: DisposableStore) { - const resource = URI.file(request.path); - const that = this; + private monitorSuspendedWatchRequest(request: IWatchRequestWithCorrelation, disposables: DisposableStore): void { + if (this.doMonitorWithExistingWatcher(request, disposables)) { + this.trace(`reusing an existing recursive watcher to monitor ${request.path}`); + this.suspendedWatchRequestsWithPolling.delete(request.correlationId); + } else { + this.doMonitorWithNodeJS(request, disposables); + this.suspendedWatchRequestsWithPolling.add(request.correlationId); + } + } + private doMonitorWithExistingWatcher(request: IWatchRequestWithCorrelation, disposables: DisposableStore): boolean { + const subscription = this.recursiveWatcher?.subscribe(request.path, (error, change) => { + if (disposables.isDisposed) { + return; // return early if already disposed + } + + if (error) { + this.monitorSuspendedWatchRequest(request, disposables); + } else if (change?.type === FileChangeType.ADDED) { + this.onMonitoredPathAdded(request); + } + }); + + if (subscription) { + disposables.add(subscription); + + return true; + } + + return false; + } + + private doMonitorWithNodeJS(request: IWatchRequestWithCorrelation, disposables: DisposableStore): void { let pathNotFound = false; const watchFileCallback: (curr: Stats, prev: Stats) => void = (curr, prev) => { @@ -119,15 +187,7 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { // Watch path created: resume watching request if (!currentPathNotFound && (previousPathNotFound || oldPathNotFound)) { - this.trace(`fs.watchFile() detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`); - - // Emit as event - const event: IFileChange = { resource, type: FileChangeType.ADDED, cId: request.correlationId }; - that._onDidChangeFile.fire([event]); - this.traceEvent(event, request); - - // Resume watching - this.resumeWatchRequest(request); + this.onMonitoredPathAdded(request); } }; @@ -149,12 +209,25 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { })); } + private onMonitoredPathAdded(request: IWatchRequestWithCorrelation) { + this.trace(`detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`); + + // Emit as event + const event: IFileChange = { resource: URI.file(request.path), type: FileChangeType.ADDED, cId: request.correlationId }; + this._onDidChangeFile.fire([event]); + this.traceEvent(event, request); + + // Resume watching + this.resumeWatchRequest(request); + } + private isPathNotFound(stats: Stats): boolean { return stats.ctimeMs === 0 && stats.ino === 0; } async stop(): Promise { this.suspendedWatchRequests.clearAndDisposeAll(); + this.suspendedWatchRequestsWithPolling.clear(); } protected traceEvent(event: IFileChange, request: IUniversalWatchRequest): void { @@ -168,6 +241,8 @@ export abstract class BaseWatcher extends Disposable implements IWatcher { protected abstract doWatch(requests: IUniversalWatchRequest[]): Promise; + protected abstract readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined; + protected abstract trace(message: string): void; protected abstract warn(message: string): void; diff --git a/src/vs/platform/files/node/watcher/nodejs/nodejsClient.ts b/src/vs/platform/files/node/watcher/nodejs/nodejsClient.ts index 2a662eb7e05..3a2f6446996 100644 --- a/src/vs/platform/files/node/watcher/nodejs/nodejsClient.ts +++ b/src/vs/platform/files/node/watcher/nodejs/nodejsClient.ts @@ -21,6 +21,6 @@ export class NodeJSWatcherClient extends AbstractNonRecursiveWatcherClient { } protected override createWatcher(disposables: DisposableStore): INonRecursiveWatcher { - return disposables.add(new NodeJSWatcher()) satisfies INonRecursiveWatcher; + return disposables.add(new NodeJSWatcher(undefined /* no recursive watching support here */)) satisfies INonRecursiveWatcher; } } diff --git a/src/vs/platform/files/node/watcher/nodejs/nodejsWatcher.ts b/src/vs/platform/files/node/watcher/nodejs/nodejsWatcher.ts index 197c975a465..7f5ddeeb544 100644 --- a/src/vs/platform/files/node/watcher/nodejs/nodejsWatcher.ts +++ b/src/vs/platform/files/node/watcher/nodejs/nodejsWatcher.ts @@ -7,7 +7,7 @@ import { Event } from 'vs/base/common/event'; import { patternsEquals } from 'vs/base/common/glob'; import { BaseWatcher } from 'vs/platform/files/node/watcher/baseWatcher'; import { isLinux } from 'vs/base/common/platform'; -import { INonRecursiveWatchRequest, INonRecursiveWatcher } from 'vs/platform/files/common/watcher'; +import { INonRecursiveWatchRequest, INonRecursiveWatcher, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher'; import { NodeJSFileWatcherLibrary } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcherLib'; import { isEqual } from 'vs/base/common/extpath'; @@ -28,10 +28,14 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher { readonly onDidError = Event.None; - protected readonly watchers = new Set(); + readonly watchers = new Set(); private verboseLogging = false; + constructor(protected readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined) { + super(); + } + protected override async doWatch(requests: INonRecursiveWatchRequest[]): Promise { // Figure out duplicates to remove from the requests @@ -47,7 +51,6 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher { } else { requestsToStart.push(request); // start watching } - } // Logging @@ -95,7 +98,7 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher { private startWatching(request: INonRecursiveWatchRequest): void { // Start via node.js lib - const instance = new NodeJSFileWatcherLibrary(request, changes => this._onDidChangeFile.fire(changes), () => this._onDidWatchFail.fire(request), msg => this._onDidLogMessage.fire(msg), this.verboseLogging); + const instance = new NodeJSFileWatcherLibrary(request, this.recursiveWatcher, changes => this._onDidChangeFile.fire(changes), () => this._onDidWatchFail.fire(request), msg => this._onDidLogMessage.fire(msg), this.verboseLogging); // Remember as watcher instance const watcher: INodeJSWatcherInstance = { request, instance }; diff --git a/src/vs/platform/files/node/watcher/nodejs/nodejsWatcherLib.ts b/src/vs/platform/files/node/watcher/nodejs/nodejsWatcherLib.ts index ba9edc88f0f..f5a2f747340 100644 --- a/src/vs/platform/files/node/watcher/nodejs/nodejsWatcherLib.ts +++ b/src/vs/platform/files/node/watcher/nodejs/nodejsWatcherLib.ts @@ -16,7 +16,7 @@ import { URI } from 'vs/base/common/uri'; import { realcase } from 'vs/base/node/extpath'; import { Promises } from 'vs/base/node/pfs'; import { FileChangeType, IFileChange } from 'vs/platform/files/common/files'; -import { ILogMessage, coalesceEvents, INonRecursiveWatchRequest, parseWatcherPatterns } from 'vs/platform/files/common/watcher'; +import { ILogMessage, coalesceEvents, INonRecursiveWatchRequest, parseWatcherPatterns, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher'; export class NodeJSFileWatcherLibrary extends Disposable { @@ -56,8 +56,15 @@ export class NodeJSFileWatcherLibrary extends Disposable { readonly ready = this.watch(); + private _isReusingRecursiveWatcher = false; + get isReusingRecursiveWatcher(): boolean { return this._isReusingRecursiveWatcher; } + + private didFail = false; + get failed(): boolean { return this.didFail; } + constructor( private readonly request: INonRecursiveWatchRequest, + private readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined, private readonly onDidFilesChange: (changes: IFileChange[]) => void, private readonly onDidWatchFail?: () => void, private readonly onLogMessage?: (msg: ILogMessage) => void, @@ -88,10 +95,16 @@ export class NodeJSFileWatcherLibrary extends Disposable { this.trace(`ignoring a path for watching who's stat info failed to resolve: ${this.request.path} (error: ${error})`); } - this.onDidWatchFail?.(); + this.notifyWatchFailed(); } } + private notifyWatchFailed(): void { + this.didFail = true; + + this.onDidWatchFail?.(); + } + private async normalizePath(request: INonRecursiveWatchRequest): Promise { let realPath = request.path; @@ -117,41 +130,95 @@ export class NodeJSFileWatcherLibrary extends Disposable { return realPath; } - private async doWatch(path: string, isDirectory: boolean): Promise { + private async doWatch(realPath: string, isDirectory: boolean): Promise { + const disposables = new DisposableStore(); + + if (this.doWatchWithExistingWatcher(realPath, isDirectory, disposables)) { + this.trace(`reusing an existing recursive watcher for ${this.request.path}`); + this._isReusingRecursiveWatcher = true; + } else { + this._isReusingRecursiveWatcher = false; + await this.doWatchWithNodeJS(realPath, isDirectory, disposables); + } + + return disposables; + } + + private doWatchWithExistingWatcher(realPath: string, isDirectory: boolean, disposables: DisposableStore): boolean { + if (isDirectory) { + return false; // only supported for files where we have the full path known upfront + } + + const resource = URI.file(this.request.path); + const subscription = this.recursiveWatcher?.subscribe(this.request.path, async (error, change) => { + if (disposables.isDisposed) { + return; // return early if already disposed + } + + if (error) { + const watchDisposable = await this.doWatch(realPath, isDirectory); + if (!disposables.isDisposed) { + disposables.add(watchDisposable); + } else { + watchDisposable.dispose(); + } + } else if (change) { + if (typeof change.cId === 'number' || typeof this.request.correlationId === 'number') { + // Re-emit this change with the correlation id of the request + // so that the client can correlate the event with the request + // properly. Without correlation, we do not have to do that + // because the event will appear on the global listener already. + this.onDidFilesChange([{ resource, type: change.type, cId: this.request.correlationId }]); + } + } + }); + + if (subscription) { + disposables.add(subscription); + + return true; + } + + return false; + } + + private async doWatchWithNodeJS(realPath: string, isDirectory: boolean, disposables: DisposableStore): Promise { // macOS: watching samba shares can crash VSCode so we do // a simple check for the file path pointing to /Volumes // (https://github.com/microsoft/vscode/issues/106879) // TODO@electron this needs a revisit when the crash is // fixed or mitigated upstream. - if (isMacintosh && isEqualOrParent(path, '/Volumes/', true)) { - this.error(`Refusing to watch ${path} for changes using fs.watch() for possibly being a network share where watching is unreliable and unstable.`); + if (isMacintosh && isEqualOrParent(realPath, '/Volumes/', true)) { + this.error(`Refusing to watch ${realPath} for changes using fs.watch() for possibly being a network share where watching is unreliable and unstable.`); - return Disposable.None; + return; } const cts = new CancellationTokenSource(this.cts.token); + disposables.add(toDisposable(() => cts.dispose(true))); - const disposables = new DisposableStore(); + const watcherDisposables = new DisposableStore(); // we need a separate disposable store because we re-create the watcher from within in some cases + disposables.add(watcherDisposables); try { const requestResource = URI.file(this.request.path); - const pathBasename = basename(path); + const pathBasename = basename(realPath); // Creating watcher can fail with an exception - const watcher = watch(path); - disposables.add(toDisposable(() => { + const watcher = watch(realPath); + watcherDisposables.add(toDisposable(() => { watcher.removeAllListeners(); watcher.close(); })); - this.trace(`Started watching: '${path}'`); + this.trace(`Started watching: '${realPath}'`); // Folder: resolve children to emit proper events const folderChildren = new Set(); if (isDirectory) { try { - for (const child of await Promises.readdir(path)) { + for (const child of await Promises.readdir(realPath)) { folderChildren.add(child); } } catch (error) { @@ -159,8 +226,12 @@ export class NodeJSFileWatcherLibrary extends Disposable { } } + if (cts.token.isCancellationRequested) { + return; + } + const mapPathToStatDisposable = new Map(); - disposables.add(toDisposable(() => { + watcherDisposables.add(toDisposable(() => { for (const [, disposable] of mapPathToStatDisposable) { disposable.dispose(); } @@ -168,9 +239,13 @@ export class NodeJSFileWatcherLibrary extends Disposable { })); watcher.on('error', (code: number, signal: string) => { - this.error(`Failed to watch ${path} for changes using fs.watch() (${code}, ${signal})`); + if (cts.token.isCancellationRequested) { + return; + } - this.onDidWatchFail?.(); + this.error(`Failed to watch ${realPath} for changes using fs.watch() (${code}, ${signal})`); + + this.notifyWatchFailed(); }); watcher.on('change', (type, raw) => { @@ -228,17 +303,21 @@ export class NodeJSFileWatcherLibrary extends Disposable { // file watching specifically we want to handle // the atomic-write cases where the file is being // deleted and recreated with different contents. - if (changedFileName === pathBasename && !await Promises.exists(path)) { + if (changedFileName === pathBasename && !await Promises.exists(realPath)) { this.onWatchedPathDeleted(requestResource); return; } + if (cts.token.isCancellationRequested) { + return; + } + // In order to properly detect renames on a case-insensitive // file system, we need to use `existsChildStrictCase` helper // because otherwise we would wrongly assume a file exists // when it was renamed to same name but different case. - const fileExists = await this.existsChildStrictCase(join(path, changedFileName)); + const fileExists = await this.existsChildStrictCase(join(realPath, changedFileName)); if (cts.token.isCancellationRequested) { return; // ignore if disposed by now @@ -310,7 +389,7 @@ export class NodeJSFileWatcherLibrary extends Disposable { // because the watcher is disposed then. const timeoutHandle = setTimeout(async () => { - const fileExists = await Promises.exists(path); + const fileExists = await Promises.exists(realPath); if (cts.token.isCancellationRequested) { return; // ignore if disposed by now @@ -320,7 +399,7 @@ export class NodeJSFileWatcherLibrary extends Disposable { if (fileExists) { this.onFileChange({ resource: requestResource, type: FileChangeType.UPDATED, cId: this.request.correlationId }, true /* skip excludes/includes (file is explicitly watched) */); - disposables.add(await this.doWatch(path, false)); + watcherDisposables.add(await this.doWatch(realPath, false)); } // File seems to be really gone, so emit a deleted and failed event @@ -331,8 +410,8 @@ export class NodeJSFileWatcherLibrary extends Disposable { // Very important to dispose the watcher which now points to a stale inode // and wire in a new disposable that tracks our timeout that is installed - disposables.clear(); - disposables.add(toDisposable(() => clearTimeout(timeoutHandle))); + watcherDisposables.clear(); + watcherDisposables.add(toDisposable(() => clearTimeout(timeoutHandle))); } // File changed @@ -343,16 +422,11 @@ export class NodeJSFileWatcherLibrary extends Disposable { }); } catch (error) { if (!cts.token.isCancellationRequested) { - this.error(`Failed to watch ${path} for changes using fs.watch() (${error.toString()})`); + this.error(`Failed to watch ${realPath} for changes using fs.watch() (${error.toString()})`); } - this.onDidWatchFail?.(); + this.notifyWatchFailed(); } - - return toDisposable(() => { - cts.dispose(true); - disposables.dispose(); - }); } private onWatchedPathDeleted(resource: URI): void { @@ -362,7 +436,7 @@ export class NodeJSFileWatcherLibrary extends Disposable { this.onFileChange({ resource, type: FileChangeType.DELETED, cId: this.request.correlationId }, true /* skip excludes/includes (file is explicitly watched) */); this.fileChangesAggregator.flush(); - this.onDidWatchFail?.(); + this.notifyWatchFailed(); } private onFileChange(event: IFileChange, skipIncludeExcludeChecks = false): void { @@ -476,7 +550,7 @@ export async function watchFileContents(path: string, onData: (chunk: Uint8Array let isReading = false; const request: INonRecursiveWatchRequest = { path, excludes: [], recursive: false }; - const watcher = new NodeJSFileWatcherLibrary(request, changes => { + const watcher = new NodeJSFileWatcherLibrary(request, undefined, changes => { (async () => { for (const { type } of changes) { if (type === FileChangeType.UPDATED) { diff --git a/src/vs/platform/files/node/watcher/parcel/parcelWatcher.ts b/src/vs/platform/files/node/watcher/parcel/parcelWatcher.ts index 3ba2370fbe2..f8e743f1163 100644 --- a/src/vs/platform/files/node/watcher/parcel/parcelWatcher.ts +++ b/src/vs/platform/files/node/watcher/parcel/parcelWatcher.ts @@ -10,9 +10,9 @@ import { URI } from 'vs/base/common/uri'; import { DeferredPromise, RunOnceScheduler, RunOnceWorker, ThrottledWorker } from 'vs/base/common/async'; import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation'; import { toErrorMessage } from 'vs/base/common/errorMessage'; -import { Emitter } from 'vs/base/common/event'; -import { randomPath, isEqual } from 'vs/base/common/extpath'; -import { GLOBSTAR, ParsedPattern, patternsEquals } from 'vs/base/common/glob'; +import { Emitter, Event } from 'vs/base/common/event'; +import { randomPath, isEqual, isEqualOrParent } from 'vs/base/common/extpath'; +import { GLOBSTAR, patternsEquals } from 'vs/base/common/glob'; import { BaseWatcher } from 'vs/platform/files/node/watcher/baseWatcher'; import { TernarySearchTree } from 'vs/base/common/ternarySearchTree'; import { normalizeNFC } from 'vs/base/common/normalization'; @@ -21,44 +21,121 @@ import { isLinux, isMacintosh, isWindows } from 'vs/base/common/platform'; import { realcaseSync, realpathSync } from 'vs/base/node/extpath'; import { NodeJSFileWatcherLibrary } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcherLib'; import { FileChangeType, IFileChange } from 'vs/platform/files/common/files'; -import { coalesceEvents, IRecursiveWatchRequest, IRecursiveWatcher, parseWatcherPatterns } from 'vs/platform/files/common/watcher'; +import { coalesceEvents, IRecursiveWatchRequest, parseWatcherPatterns, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher'; +import { Disposable, DisposableStore, IDisposable, toDisposable } from 'vs/base/common/lifecycle'; -export interface IParcelWatcherInstance { +export class ParcelWatcherInstance extends Disposable { - /** - * Signals when the watcher is ready to watch. - */ - readonly ready: Promise; + private readonly _onDidStop = this._register(new Emitter<{ joinRestart?: Promise }>()); + readonly onDidStop = this._onDidStop.event; - /** - * The watch request associated to the watcher. - */ - readonly request: IRecursiveWatchRequest; + private readonly _onDidFail = this._register(new Emitter()); + readonly onDidFail = this._onDidFail.event; - /** - * How often this watcher has been restarted in case of an unexpected - * shutdown. - */ - readonly restarts: number; + private didFail = false; + get failed(): boolean { return this.didFail; } - /** - * The cancellation token associated with the lifecycle of the watcher. - */ - readonly token: CancellationToken; + private didStop = false; + get stopped(): boolean { return this.didStop; } - /** - * An event aggregator to coalesce events and reduce duplicates. - */ - readonly worker: RunOnceWorker; + private readonly includes = this.request.includes ? parseWatcherPatterns(this.request.path, this.request.includes) : undefined; + private readonly excludes = this.request.excludes ? parseWatcherPatterns(this.request.path, this.request.excludes) : undefined; - /** - * Stops and disposes the watcher. This operation is async to await - * unsubscribe call in Parcel. - */ - stop(): Promise; + private readonly subscriptions = new Map void>>(); + + constructor( + /** + * Signals when the watcher is ready to watch. + */ + readonly ready: Promise, + readonly request: IRecursiveWatchRequest, + /** + * How often this watcher has been restarted in case of an unexpected + * shutdown. + */ + readonly restarts: number, + /** + * The cancellation token associated with the lifecycle of the watcher. + */ + readonly token: CancellationToken, + /** + * An event aggregator to coalesce events and reduce duplicates. + */ + readonly worker: RunOnceWorker, + private readonly stopFn: () => Promise + ) { + super(); + + this._register(toDisposable(() => this.subscriptions.clear())); + } + + subscribe(path: string, callback: (change: IFileChange) => void): IDisposable { + path = URI.file(path).fsPath; // make sure to store the path in `fsPath` form to match it with events later + + let subscriptions = this.subscriptions.get(path); + if (!subscriptions) { + subscriptions = new Set(); + this.subscriptions.set(path, subscriptions); + } + + subscriptions.add(callback); + + return toDisposable(() => { + const subscriptions = this.subscriptions.get(path); + if (subscriptions) { + subscriptions.delete(callback); + + if (subscriptions.size === 0) { + this.subscriptions.delete(path); + } + } + }); + } + + get subscriptionsCount(): number { + return this.subscriptions.size; + } + + notifyFileChange(path: string, change: IFileChange): void { + const subscriptions = this.subscriptions.get(path); + if (subscriptions) { + for (const subscription of subscriptions) { + subscription(change); + } + } + } + + notifyWatchFailed(): void { + this.didFail = true; + + this._onDidFail.fire(); + } + + include(path: string): boolean { + if (!this.includes || this.includes.length === 0) { + return true; // no specific includes defined, include all + } + + return this.includes.some(include => include(path)); + } + + exclude(path: string): boolean { + return Boolean(this.excludes?.some(exclude => exclude(path))); + } + + async stop(joinRestart: Promise | undefined): Promise { + this.didStop = true; + + try { + await this.stopFn(); + } finally { + this._onDidStop.fire({ joinRestart }); + this.dispose(); + } + } } -export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { +export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcherWithSubscribe { private static readonly MAP_PARCEL_WATCHER_ACTION_TO_FILE_CHANGE = new Map( [ @@ -73,7 +150,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { private readonly _onDidError = this._register(new Emitter()); readonly onDidError = this._onDidError.event; - protected readonly watchers = new Set(); + readonly watchers = new Set(); // A delay for collecting file changes from Parcel // before collecting them for coalescing and emitting. @@ -155,7 +232,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } } - private findWatcher(request: IRecursiveWatchRequest): IParcelWatcherInstance | undefined { + private findWatcher(request: IRecursiveWatchRequest): ParcelWatcherInstance | undefined { for (const watcher of this.watchers) { // Requests or watchers with correlation always match on that @@ -184,13 +261,13 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { const snapshotFile = randomPath(tmpdir(), 'vscode-watcher-snapshot'); // Remember as watcher instance - const watcher: IParcelWatcherInstance = { + const watcher: ParcelWatcherInstance = new ParcelWatcherInstance( + instance.p, request, - ready: instance.p, restarts, - token: cts.token, - worker: new RunOnceWorker(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY), - stop: async () => { + cts.token, + new RunOnceWorker(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY), + async () => { cts.dispose(true); watcher.worker.flush(); @@ -199,15 +276,12 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { pollingWatcher.dispose(); unlinkSync(snapshotFile); } - }; + ); this.watchers.add(watcher); // Path checks for symbolic links / wrong casing const { realPath, realPathDiffers, realPathLength } = this.normalizePath(request); - // Warm up include patterns for usage - const includePatterns = request.includes ? parseWatcherPatterns(request.path, request.includes) : undefined; - this.trace(`Started watching: '${realPath}' with polling interval '${pollingInterval}'`); let counter = 0; @@ -228,7 +302,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } // Handle & emit events - this.onParcelEvents(parcelEvents, watcher, includePatterns, realPathDiffers, realPathLength); + this.onParcelEvents(parcelEvents, watcher, realPathDiffers, realPathLength); } // Store a snapshot of files to the snapshot file @@ -255,13 +329,13 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { const instance = new DeferredPromise(); // Remember as watcher instance - const watcher: IParcelWatcherInstance = { + const watcher: ParcelWatcherInstance = new ParcelWatcherInstance( + instance.p, request, - ready: instance.p, restarts, - token: cts.token, - worker: new RunOnceWorker(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY), - stop: async () => { + cts.token, + new RunOnceWorker(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY), + async () => { cts.dispose(true); watcher.worker.flush(); @@ -270,15 +344,12 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { const watcherInstance = await instance.p; await watcherInstance?.unsubscribe(); } - }; + ); this.watchers.add(watcher); // Path checks for symbolic links / wrong casing const { realPath, realPathDiffers, realPathLength } = this.normalizePath(request); - // Warm up include patterns for usage - const includePatterns = request.includes ? parseWatcherPatterns(request.path, request.includes) : undefined; - parcelWatcher.subscribe(realPath, (error, parcelEvents) => { if (watcher.token.isCancellationRequested) { return; // return early when disposed @@ -293,7 +364,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } // Handle & emit events - this.onParcelEvents(parcelEvents, watcher, includePatterns, realPathDiffers, realPathLength); + this.onParcelEvents(parcelEvents, watcher, realPathDiffers, realPathLength); }, { backend: ParcelWatcher.PARCEL_WATCHER_BACKEND, ignore: watcher.request.excludes @@ -306,11 +377,12 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { instance.complete(undefined); + watcher.notifyWatchFailed(); this._onDidWatchFail.fire(request); }); } - private onParcelEvents(parcelEvents: parcelWatcher.Event[], watcher: IParcelWatcherInstance, includes: ParsedPattern[] | undefined, realPathDiffers: boolean, realPathLength: number): void { + private onParcelEvents(parcelEvents: parcelWatcher.Event[], watcher: ParcelWatcherInstance, realPathDiffers: boolean, realPathLength: number): void { if (parcelEvents.length === 0) { return; } @@ -321,7 +393,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { this.normalizeEvents(parcelEvents, watcher.request, realPathDiffers, realPathLength); // Check for includes - const includedEvents = this.handleIncludes(watcher, parcelEvents, includes); + const includedEvents = this.handleIncludes(watcher, parcelEvents); // Add to event aggregator for later processing for (const includedEvent of includedEvents) { @@ -329,7 +401,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } } - private handleIncludes(watcher: IParcelWatcherInstance, parcelEvents: parcelWatcher.Event[], includes: ParsedPattern[] | undefined): IFileChange[] { + private handleIncludes(watcher: ParcelWatcherInstance, parcelEvents: parcelWatcher.Event[]): IFileChange[] { const events: IFileChange[] = []; for (const { path, type: parcelEventType } of parcelEvents) { @@ -339,7 +411,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } // Apply include filter if any - if (includes && includes.length > 0 && !includes.some(include => include(path))) { + if (!watcher.include(path)) { if (this.verboseLogging) { this.trace(` >> ignored (not included) ${path}`); } @@ -351,7 +423,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { return events; } - private handleParcelEvents(parcelEvents: IFileChange[], watcher: IParcelWatcherInstance): void { + private handleParcelEvents(parcelEvents: IFileChange[], watcher: ParcelWatcherInstance): void { // Coalesce events: merge events of same kind const coalescedEvents = coalesceEvents(parcelEvents); @@ -368,14 +440,20 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } } - private emitEvents(events: IFileChange[], watcher: IParcelWatcherInstance): void { + private emitEvents(events: IFileChange[], watcher: ParcelWatcherInstance): void { if (events.length === 0) { return; } - // Logging - if (this.verboseLogging) { - for (const event of events) { + for (const event of events) { + + // Emit to instance subscriptions if any + if (watcher.subscriptionsCount > 0) { + watcher.notifyFileChange(event.resource.fsPath, event); + } + + // Logging + if (this.verboseLogging) { this.traceEvent(event, watcher.request); } } @@ -446,7 +524,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } } - private filterEvents(events: IFileChange[], watcher: IParcelWatcherInstance): { events: IFileChange[]; rootDeleted?: boolean } { + private filterEvents(events: IFileChange[], watcher: ParcelWatcherInstance): { events: IFileChange[]; rootDeleted?: boolean } { const filteredEvents: IFileChange[] = []; let rootDeleted = false; @@ -475,29 +553,28 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { return { events: filteredEvents, rootDeleted }; } - private onWatchedPathDeleted(watcher: IParcelWatcherInstance): void { + private onWatchedPathDeleted(watcher: ParcelWatcherInstance): void { this.warn('Watcher shutdown because watched path got deleted', watcher); - this._onDidWatchFail.fire(watcher.request); - - // Do monitoring of the request path parent unless this request - // can be handled via suspend/resume in the super class - // - // TODO@bpasero we should remove this logic in favor of the - // support in the super class so that we have 1 consistent - // solution for handling this. - + let legacyMonitored = false; if (!this.isCorrelated(watcher.request)) { - this.legacyMonitorRequest(watcher); + // Do monitoring of the request path parent unless this request + // can be handled via suspend/resume in the super class + legacyMonitored = this.legacyMonitorRequest(watcher); + } + + if (!legacyMonitored) { + watcher.notifyWatchFailed(); + this._onDidWatchFail.fire(watcher.request); } } - private legacyMonitorRequest(watcher: IParcelWatcherInstance): void { + private legacyMonitorRequest(watcher: ParcelWatcherInstance): boolean { const parentPath = dirname(watcher.request.path); if (existsSync(parentPath)) { this.trace('Trying to watch on the parent path to restart the watcher...', watcher); - const nodeWatcher = new NodeJSFileWatcherLibrary({ path: parentPath, excludes: [], recursive: false, correlationId: watcher.request.correlationId }, changes => { + const nodeWatcher = new NodeJSFileWatcherLibrary({ path: parentPath, excludes: [], recursive: false, correlationId: watcher.request.correlationId }, undefined, changes => { if (watcher.token.isCancellationRequested) { return; // return early when disposed } @@ -522,10 +599,14 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { // Make sure to stop watching when the watcher is disposed watcher.token.onCancellationRequested(() => nodeWatcher.dispose()); + + return true; } + + return false; } - private onUnexpectedError(error: unknown, watcher?: IParcelWatcherInstance): void { + private onUnexpectedError(error: unknown, watcher?: ParcelWatcherInstance): void { const msg = toErrorMessage(error); // Specially handle ENOSPC errors that can happen when @@ -559,7 +640,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { } } - protected restartWatching(watcher: IParcelWatcherInstance, delay = 800): void { + protected restartWatching(watcher: ParcelWatcherInstance, delay = 800): void { // Restart watcher delayed to accomodate for // changes on disk that have triggered the @@ -569,15 +650,21 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { return; // return early when disposed } - // Await the watcher having stopped, as this is - // needed to properly re-watch the same path - await this.stopWatching(watcher); + const restartPromise = new DeferredPromise(); + try { - // Start watcher again counting the restarts - if (watcher.request.pollingInterval) { - this.startPolling(watcher.request, watcher.request.pollingInterval, watcher.restarts + 1); - } else { - this.startWatching(watcher.request, watcher.restarts + 1); + // Await the watcher having stopped, as this is + // needed to properly re-watch the same path + await this.stopWatching(watcher, restartPromise.p); + + // Start watcher again counting the restarts + if (watcher.request.pollingInterval) { + this.startPolling(watcher.request, watcher.request.pollingInterval, watcher.restarts + 1); + } else { + this.startWatching(watcher.request, watcher.restarts + 1); + } + } finally { + restartPromise.complete(); } }, delay); @@ -585,13 +672,13 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { watcher.token.onCancellationRequested(() => scheduler.dispose()); } - private async stopWatching(watcher: IParcelWatcherInstance): Promise { + private async stopWatching(watcher: ParcelWatcherInstance, joinRestart?: Promise): Promise { this.trace(`stopping file watcher`, watcher); this.watchers.delete(watcher); try { - await watcher.stop(); + await watcher.stop(joinRestart); } catch (error) { this.error(`Unexpected error stopping watcher: ${toErrorMessage(error)}`, watcher); } @@ -694,25 +781,63 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher { return true; } + subscribe(path: string, callback: (error: boolean, change?: IFileChange) => void): IDisposable | undefined { + for (const watcher of this.watchers) { + if (watcher.failed) { + continue; // watcher has already failed + } + + if (!isEqualOrParent(path, watcher.request.path, !isLinux)) { + continue; // watcher does not consider this path + } + + if ( + watcher.exclude(path) || + !watcher.include(path) + ) { + continue; // parcel instance does not consider this path + } + + const disposables = new DisposableStore(); + + disposables.add(Event.once(watcher.onDidStop)(async e => { + await e.joinRestart; // if we are restarting, await that so that we can possibly reuse this watcher again + if (disposables.isDisposed) { + return; + } + + callback(true /* error */); + })); + disposables.add(Event.once(watcher.onDidFail)(() => callback(true /* error */))); + disposables.add(watcher.subscribe(path, change => callback(false, change))); + + return disposables; + } + + return undefined; + } + async setVerboseLogging(enabled: boolean): Promise { this.verboseLogging = enabled; } - protected trace(message: string, watcher?: IParcelWatcherInstance): void { + protected trace(message: string, watcher?: ParcelWatcherInstance): void { if (this.verboseLogging) { this._onDidLogMessage.fire({ type: 'trace', message: this.toMessage(message, watcher) }); } } - protected warn(message: string, watcher?: IParcelWatcherInstance) { + protected warn(message: string, watcher?: ParcelWatcherInstance) { this._onDidLogMessage.fire({ type: 'warn', message: this.toMessage(message, watcher) }); } - private error(message: string, watcher: IParcelWatcherInstance | undefined) { + private error(message: string, watcher: ParcelWatcherInstance | undefined) { this._onDidLogMessage.fire({ type: 'error', message: this.toMessage(message, watcher) }); } - private toMessage(message: string, watcher?: IParcelWatcherInstance): string { + private toMessage(message: string, watcher?: ParcelWatcherInstance): string { return watcher ? `[File Watcher (parcel)] ${message} (path: ${watcher.request.path})` : `[File Watcher (parcel)] ${message}`; } + + protected get recursiveWatcher() { return this; } } diff --git a/src/vs/platform/files/node/watcher/watcher.ts b/src/vs/platform/files/node/watcher/watcher.ts index d0b563e540c..3ea9fc61213 100644 --- a/src/vs/platform/files/node/watcher/watcher.ts +++ b/src/vs/platform/files/node/watcher/watcher.ts @@ -4,29 +4,61 @@ *--------------------------------------------------------------------------------------------*/ import { Disposable } from 'vs/base/common/lifecycle'; -import { IUniversalWatcher, IUniversalWatchRequest } from 'vs/platform/files/common/watcher'; -import { Event } from 'vs/base/common/event'; +import { ILogMessage, IUniversalWatcher, IUniversalWatchRequest } from 'vs/platform/files/common/watcher'; +import { Emitter, Event } from 'vs/base/common/event'; import { ParcelWatcher } from 'vs/platform/files/node/watcher/parcel/parcelWatcher'; import { NodeJSWatcher } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcher'; import { Promises } from 'vs/base/common/async'; +import { computeStats } from 'vs/platform/files/node/watcher/watcherStats'; export class UniversalWatcher extends Disposable implements IUniversalWatcher { private readonly recursiveWatcher = this._register(new ParcelWatcher()); - private readonly nonRecursiveWatcher = this._register(new NodeJSWatcher()); + private readonly nonRecursiveWatcher = this._register(new NodeJSWatcher(this.recursiveWatcher)); readonly onDidChangeFile = Event.any(this.recursiveWatcher.onDidChangeFile, this.nonRecursiveWatcher.onDidChangeFile); - readonly onDidLogMessage = Event.any(this.recursiveWatcher.onDidLogMessage, this.nonRecursiveWatcher.onDidLogMessage); readonly onDidError = Event.any(this.recursiveWatcher.onDidError, this.nonRecursiveWatcher.onDidError); + private readonly _onDidLogMessage = this._register(new Emitter()); + readonly onDidLogMessage = Event.any(this._onDidLogMessage.event, this.recursiveWatcher.onDidLogMessage, this.nonRecursiveWatcher.onDidLogMessage); + + private requests: IUniversalWatchRequest[] = []; + async watch(requests: IUniversalWatchRequest[]): Promise { - await Promises.settled([ - this.recursiveWatcher.watch(requests.filter(request => request.recursive)), - this.nonRecursiveWatcher.watch(requests.filter(request => !request.recursive)) - ]); + this.requests = requests; + + // Watch recursively first to give recursive watchers a chance + // to step in for non-recursive watch requests, thus reducing + // watcher duplication. + + let error: Error | undefined; + try { + await this.recursiveWatcher.watch(requests.filter(request => request.recursive)); + } catch (e) { + error = e; + } + + try { + await this.nonRecursiveWatcher.watch(requests.filter(request => !request.recursive)); + } catch (e) { + if (!error) { + error = e; + } + } + + if (error) { + throw error; + } } async setVerboseLogging(enabled: boolean): Promise { + + // Log stats + if (enabled && this.requests.length > 0) { + this._onDidLogMessage.fire({ type: 'trace', message: computeStats(this.requests, this.recursiveWatcher, this.nonRecursiveWatcher) }); + } + + // Forward to watchers await Promises.settled([ this.recursiveWatcher.setVerboseLogging(enabled), this.nonRecursiveWatcher.setVerboseLogging(enabled) diff --git a/src/vs/platform/files/node/watcher/watcherStats.ts b/src/vs/platform/files/node/watcher/watcherStats.ts new file mode 100644 index 00000000000..31bcddf9cd0 --- /dev/null +++ b/src/vs/platform/files/node/watcher/watcherStats.ts @@ -0,0 +1,207 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +import { IUniversalWatchRequest } from 'vs/platform/files/common/watcher'; +import { INodeJSWatcherInstance, NodeJSWatcher } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcher'; +import { ParcelWatcher, ParcelWatcherInstance } from 'vs/platform/files/node/watcher/parcel/parcelWatcher'; + +export function computeStats( + requests: IUniversalWatchRequest[], + recursiveWatcher: ParcelWatcher, + nonRecursiveWatcher: NodeJSWatcher +): string { + const lines: string[] = []; + + const recursiveRequests = sortByPathPrefix(requests.filter(request => request.recursive)); + const recursiveRequestsStatus = computeRequestStatus(recursiveRequests, recursiveWatcher); + const recursiveWatcherStatus = computeRecursiveWatchStatus(recursiveWatcher); + + const nonRecursiveRequests = sortByPathPrefix(requests.filter(request => !request.recursive)); + const nonRecursiveRequestsStatus = computeRequestStatus(nonRecursiveRequests, nonRecursiveWatcher); + const nonRecursiveWatcherStatus = computeNonRecursiveWatchStatus(nonRecursiveWatcher); + + lines.push('[Summary]'); + lines.push(`- Recursive Requests: total: ${recursiveRequests.length}, suspended: ${recursiveRequestsStatus.suspended}, polling: ${recursiveRequestsStatus.polling}`); + lines.push(`- Non-Recursive Requests: total: ${nonRecursiveRequests.length}, suspended: ${nonRecursiveRequestsStatus.suspended}, polling: ${nonRecursiveRequestsStatus.polling}`); + lines.push(`- Recursive Watchers: total: ${recursiveWatcher.watchers.size}, active: ${recursiveWatcherStatus.active}, failed: ${recursiveWatcherStatus.failed}, stopped: ${recursiveWatcherStatus.stopped}`); + lines.push(`- Non-Recursive Watchers: total: ${nonRecursiveWatcher.watchers.size}, active: ${nonRecursiveWatcherStatus.active}, failed: ${nonRecursiveWatcherStatus.failed}, reusing: ${nonRecursiveWatcherStatus.reusing}`); + lines.push(`- I/O Handles Impact: total: ${recursiveRequestsStatus.polling + nonRecursiveRequestsStatus.polling + recursiveWatcherStatus.active + nonRecursiveWatcherStatus.active}`); + + lines.push(`\n[Recursive Requests (${recursiveRequests.length}, suspended: ${recursiveRequestsStatus.suspended}, polling: ${recursiveRequestsStatus.polling})]:`); + for (const request of recursiveRequests) { + fillRequestStats(lines, request, recursiveWatcher); + } + + lines.push(`\n[Non-Recursive Requests (${nonRecursiveRequests.length}, suspended: ${nonRecursiveRequestsStatus.suspended}, polling: ${nonRecursiveRequestsStatus.polling})]:`); + for (const request of nonRecursiveRequests) { + fillRequestStats(lines, request, nonRecursiveWatcher); + } + + fillRecursiveWatcherStats(lines, recursiveWatcher); + fillNonRecursiveWatcherStats(lines, nonRecursiveWatcher); + + let maxLength = 0; + for (const line of lines) { + maxLength = Math.max(maxLength, line.split('\t')[0].length); + } + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const parts = line.split('\t'); + if (parts.length === 2) { + const padding = ' '.repeat(maxLength - parts[0].length); + lines[i] = `${parts[0]}${padding}\t${parts[1]}`; + } + } + + return `\n\n[File Watcher] request stats:\n\n${lines.join('\n')}\n\n`; +} + +function computeRequestStatus(requests: IUniversalWatchRequest[], watcher: ParcelWatcher | NodeJSWatcher): { suspended: number; polling: number } { + let polling = 0; + let suspended = 0; + + for (const request of requests) { + const isSuspended = watcher.isSuspended(request); + if (isSuspended === false) { + continue; + } + + suspended++; + + if (isSuspended === 'polling') { + polling++; + } + } + + return { suspended, polling }; +} + +function computeRecursiveWatchStatus(recursiveWatcher: ParcelWatcher): { active: number; failed: number; stopped: number } { + let active = 0; + let failed = 0; + let stopped = 0; + + for (const watcher of recursiveWatcher.watchers.values()) { + if (!watcher.failed && !watcher.stopped) { + active++; + } + if (watcher.failed) { + failed++; + } + if (watcher.stopped) { + stopped++; + } + } + + return { active, failed, stopped }; +} + +function computeNonRecursiveWatchStatus(nonRecursiveWatcher: NodeJSWatcher): { active: number; failed: number; reusing: number } { + let active = 0; + let failed = 0; + let reusing = 0; + + for (const watcher of nonRecursiveWatcher.watchers) { + if (!watcher.instance.failed && !watcher.instance.isReusingRecursiveWatcher) { + active++; + } + if (watcher.instance.failed) { + failed++; + } + if (watcher.instance.isReusingRecursiveWatcher) { + reusing++; + } + } + + return { active, failed, reusing }; +} + +function sortByPathPrefix(requests: IUniversalWatchRequest[]): IUniversalWatchRequest[]; +function sortByPathPrefix(requests: INodeJSWatcherInstance[]): INodeJSWatcherInstance[]; +function sortByPathPrefix(requests: ParcelWatcherInstance[]): ParcelWatcherInstance[]; +function sortByPathPrefix(requests: IUniversalWatchRequest[] | INodeJSWatcherInstance[] | ParcelWatcherInstance[]): IUniversalWatchRequest[] | INodeJSWatcherInstance[] | ParcelWatcherInstance[] { + requests.sort((r1, r2) => { + const p1 = isUniversalWatchRequest(r1) ? r1.path : r1.request.path; + const p2 = isUniversalWatchRequest(r2) ? r2.path : r2.request.path; + + const minLength = Math.min(p1.length, p2.length); + for (let i = 0; i < minLength; i++) { + if (p1[i] !== p2[i]) { + return (p1[i] < p2[i]) ? -1 : 1; + } + } + + return p1.length - p2.length; + }); + + return requests; +} + +function isUniversalWatchRequest(obj: unknown): obj is IUniversalWatchRequest { + const candidate = obj as IUniversalWatchRequest | undefined; + + return typeof candidate?.path === 'string'; +} + +function fillRequestStats(lines: string[], request: IUniversalWatchRequest, watcher: ParcelWatcher | NodeJSWatcher): void { + const decorations = []; + const suspended = watcher.isSuspended(request); + if (suspended !== false) { + if (suspended === 'polling') { + decorations.push('[SUSPENDED ]'); + } else { + decorations.push('[SUSPENDED ]'); + } + } + + lines.push(`${request.path}\t${decorations.length > 0 ? decorations.join(' ') + ' ' : ''}(${requestDetailsToString(request)})`); +} + +function requestDetailsToString(request: IUniversalWatchRequest): string { + return `excludes: ${request.excludes.length > 0 ? request.excludes : ''}, includes: ${request.includes && request.includes.length > 0 ? JSON.stringify(request.includes) : ''}, correlationId: ${typeof request.correlationId === 'number' ? request.correlationId : ''}`; +} + +function fillRecursiveWatcherStats(lines: string[], recursiveWatcher: ParcelWatcher): void { + const watchers = sortByPathPrefix(Array.from(recursiveWatcher.watchers.values())); + + const { active, failed, stopped } = computeRecursiveWatchStatus(recursiveWatcher); + lines.push(`\n[Recursive Watchers (${watchers.length}, active: ${active}, failed: ${failed}, stopped: ${stopped})]:`); + + for (const watcher of watchers) { + const decorations = []; + if (watcher.failed) { + decorations.push('[FAILED]'); + } + if (watcher.stopped) { + decorations.push('[STOPPED]'); + } + if (watcher.subscriptionsCount > 0) { + decorations.push(`[SUBSCRIBED:${watcher.subscriptionsCount}]`); + } + if (watcher.restarts > 0) { + decorations.push(`[RESTARTED:${watcher.restarts}]`); + } + lines.push(`${watcher.request.path}\t${decorations.length > 0 ? decorations.join(' ') + ' ' : ''}(${requestDetailsToString(watcher.request)})`); + } +} + +function fillNonRecursiveWatcherStats(lines: string[], nonRecursiveWatcher: NodeJSWatcher): void { + const watchers = sortByPathPrefix(Array.from(nonRecursiveWatcher.watchers.values())); + + const { active, failed, reusing } = computeNonRecursiveWatchStatus(nonRecursiveWatcher); + lines.push(`\n[Non-Recursive Watchers (${watchers.length}, active: ${active}, failed: ${failed}, reusing: ${reusing})]:`); + + for (const watcher of watchers) { + const decorations = []; + if (watcher.instance.failed) { + decorations.push('[FAILED]'); + } + if (watcher.instance.isReusingRecursiveWatcher) { + decorations.push('[REUSING]'); + } + lines.push(`${watcher.request.path}\t${decorations.length > 0 ? decorations.join(' ') + ' ' : ''}(${requestDetailsToString(watcher.request)})`); + } +} diff --git a/src/vs/platform/files/test/node/nodejsWatcher.integrationTest.ts b/src/vs/platform/files/test/node/nodejsWatcher.integrationTest.ts index 177756f62eb..678faf11941 100644 --- a/src/vs/platform/files/test/node/nodejsWatcher.integrationTest.ts +++ b/src/vs/platform/files/test/node/nodejsWatcher.integrationTest.ts @@ -3,12 +3,13 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ +import * as assert from 'assert'; import { tmpdir } from 'os'; import { basename, dirname, join } from 'vs/base/common/path'; import { Promises, RimRafMode } from 'vs/base/node/pfs'; import { flakySuite, getRandomTestPath } from 'vs/base/test/node/testUtils'; import { FileChangeType } from 'vs/platform/files/common/files'; -import { INonRecursiveWatchRequest } from 'vs/platform/files/common/watcher'; +import { INonRecursiveWatchRequest, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher'; import { watchFileContents } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcherLib'; import { isLinux, isMacintosh, isWindows } from 'vs/base/common/platform'; import { getDriveLetter } from 'vs/base/common/extpath'; @@ -21,6 +22,7 @@ import { extUriBiasedIgnorePathCase } from 'vs/base/common/resources'; import { URI } from 'vs/base/common/uri'; import { addUNCHostToAllowlist } from 'vs/base/node/unc'; import { Emitter, Event } from 'vs/base/common/event'; +import { TestParcelWatcher } from 'vs/platform/files/test/node/parcelWatcher.integrationTest'; // this suite has shown flaky runs in Azure pipelines where // tasks would just hang and timeout after a while (not in @@ -61,7 +63,20 @@ import { Emitter, Event } from 'vs/base/common/event'; enableLogging(false); setup(async () => { - watcher = new TestNodeJSWatcher(); + await createWatcher(undefined); + + testDir = URI.file(getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher')).fsPath; + + const sourceDir = FileAccess.asFileUri('vs/platform/files/test/node/fixtures/service').fsPath; + + await Promises.copy(sourceDir, testDir, { preserveSymlinks: false }); + }); + + async function createWatcher(accessor: IRecursiveWatcherWithSubscribe | undefined) { + await watcher?.stop(); + watcher?.dispose(); + + watcher = new TestNodeJSWatcher(accessor); watcher?.setVerboseLogging(loggingEnabled); watcher.onDidLogMessage(e => { @@ -75,13 +90,7 @@ import { Emitter, Event } from 'vs/base/common/event'; console.log(`[non-recursive watcher test error] ${e}`); } }); - - testDir = getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher'); - - const sourceDir = FileAccess.asFileUri('vs/platform/files/test/node/fixtures/service').fsPath; - - await Promises.copy(sourceDir, testDir, { preserveSymlinks: false }); - }); + } teardown(async () => { await watcher.stop(); @@ -128,7 +137,13 @@ import { Emitter, Event } from 'vs/base/common/event'; } test('basics (folder watch)', async function () { - await watcher.watch([{ path: testDir, excludes: [], recursive: false }]); + const request = { path: testDir, excludes: [], recursive: false }; + await watcher.watch([request]); + assert.strictEqual(watcher.isSuspended(request), false); + + const instance = Array.from(watcher.watchers)[0].instance; + assert.strictEqual(instance.isReusingRecursiveWatcher, false); + assert.strictEqual(instance.failed, false); // New file const newFilePath = join(testDir, 'newFile.txt'); @@ -236,7 +251,13 @@ import { Emitter, Event } from 'vs/base/common/event'; test('basics (file watch)', async function () { const filePath = join(testDir, 'lorem.txt'); - await watcher.watch([{ path: filePath, excludes: [], recursive: false }]); + const request = { path: filePath, excludes: [], recursive: false }; + await watcher.watch([request]); + assert.strictEqual(watcher.isSuspended(request), false); + + const instance = Array.from(watcher.watchers)[0].instance; + assert.strictEqual(instance.isReusingRecursiveWatcher, false); + assert.strictEqual(instance.failed, false); // Change file let changeFuture = awaitEvent(watcher, filePath, FileChangeType.UPDATED); @@ -554,17 +575,20 @@ import { Emitter, Event } from 'vs/base/common/event'; await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId: 1 }]); + const instance = Array.from(watcher.watchers)[0].instance; + const onDidWatchFail = Event.toPromise(watcher.onWatchFail); const changeFuture = awaitEvent(watcher, filePath, FileChangeType.DELETED, 1); Promises.unlink(filePath); await onDidWatchFail; await changeFuture; + assert.strictEqual(instance.failed, true); }); - (isMacintosh /* macOS: does not seem to report deletes on folders */ ? test.skip : test)('deleting watched path emits watcher fail and delete event when correlated (folder watch)', async function () { + (isMacintosh || isWindows /* macOS: does not seem to report deletes on folders | Windows: reports on('error') event only */ ? test.skip : test)('deleting watched path emits watcher fail and delete event when correlated (folder watch)', async function () { const folderPath = join(testDir, 'deep'); - await watcher.watch([{ path: folderPath, excludes: [], recursive: false }]); + await watcher.watch([{ path: folderPath, excludes: [], recursive: false, correlationId: 1 }]); const onDidWatchFail = Event.toPromise(watcher.onWatchFail); const changeFuture = awaitEvent(watcher, folderPath, FileChangeType.DELETED, 1); @@ -577,8 +601,10 @@ import { Emitter, Event } from 'vs/base/common/event'; const filePath = join(testDir, 'not-found.txt'); const onDidWatchFail = Event.toPromise(watcher.onWatchFail); - await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId: 1 }]); + const request = { path: filePath, excludes: [], recursive: false, correlationId: 1 }; + await watcher.watch([request]); await onDidWatchFail; + assert.strictEqual(watcher.isSuspended(request), 'polling'); await basicCrudTest(filePath, undefined, 1, undefined, true); await basicCrudTest(filePath, undefined, 1, undefined, true); @@ -586,11 +612,13 @@ import { Emitter, Event } from 'vs/base/common/event'; test('correlated watch requests support suspend/resume (file, exists in beginning)', async function () { const filePath = join(testDir, 'lorem.txt'); - await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId: 1 }]); + const request = { path: filePath, excludes: [], recursive: false, correlationId: 1 }; + await watcher.watch([request]); const onDidWatchFail = Event.toPromise(watcher.onWatchFail); await basicCrudTest(filePath, true, 1); await onDidWatchFail; + assert.strictEqual(watcher.isSuspended(request), 'polling'); await basicCrudTest(filePath, undefined, 1, undefined, true); }); @@ -599,8 +627,10 @@ import { Emitter, Event } from 'vs/base/common/event'; let onDidWatchFail = Event.toPromise(watcher.onWatchFail); const folderPath = join(testDir, 'not-found'); - await watcher.watch([{ path: folderPath, excludes: [], recursive: false, correlationId: 1 }]); + const request = { path: folderPath, excludes: [], recursive: false, correlationId: 1 }; + await watcher.watch([request]); await onDidWatchFail; + assert.strictEqual(watcher.isSuspended(request), 'polling'); let changeFuture = awaitEvent(watcher, folderPath, FileChangeType.ADDED, 1); let onDidWatch = Event.toPromise(watcher.onDidWatch); @@ -608,6 +638,8 @@ import { Emitter, Event } from 'vs/base/common/event'; await changeFuture; await onDidWatch; + assert.strictEqual(watcher.isSuspended(request), false); + const filePath = join(folderPath, 'newFile.txt'); await basicCrudTest(filePath, undefined, 1); @@ -649,4 +681,83 @@ import { Emitter, Event } from 'vs/base/common/event'; await basicCrudTest(filePath, undefined, 1); }); + + test('parcel watcher reused when present for non-recursive file watching (uncorrelated)', function () { + return testParcelWatcherReused(undefined); + }); + + test('parcel watcher reused when present for non-recursive file watching (correlated)', function () { + return testParcelWatcherReused(2); + }); + + function createParcelWatcher() { + const recursiveWatcher = new TestParcelWatcher(); + recursiveWatcher.setVerboseLogging(loggingEnabled); + recursiveWatcher.onDidLogMessage(e => { + if (loggingEnabled) { + console.log(`[recursive watcher test message] ${e.message}`); + } + }); + + recursiveWatcher.onDidError(e => { + if (loggingEnabled) { + console.log(`[recursive watcher test error] ${e}`); + } + }); + + return recursiveWatcher; + } + + async function testParcelWatcherReused(correlationId: number | undefined) { + const recursiveWatcher = createParcelWatcher(); + await recursiveWatcher.watch([{ path: testDir, excludes: [], recursive: true, correlationId: 1 }]); + + const recursiveInstance = Array.from(recursiveWatcher.watchers)[0]; + assert.strictEqual(recursiveInstance.subscriptionsCount, 0); + + await createWatcher(recursiveWatcher); + + const filePath = join(testDir, 'deep', 'conway.js'); + await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId }]); + + const { instance } = Array.from(watcher.watchers)[0]; + assert.strictEqual(instance.isReusingRecursiveWatcher, true); + assert.strictEqual(recursiveInstance.subscriptionsCount, 1); + + let changeFuture = awaitEvent(watcher, filePath, isMacintosh /* somehow fsevents seems to report still on the initial create from test setup */ ? FileChangeType.ADDED : FileChangeType.UPDATED, correlationId); + await Promises.writeFile(filePath, 'Hello World'); + await changeFuture; + + await recursiveWatcher.stop(); + recursiveWatcher.dispose(); + + await timeout(500); // give the watcher some time to restart + + changeFuture = awaitEvent(watcher, filePath, FileChangeType.UPDATED, correlationId); + await Promises.writeFile(filePath, 'Hello World'); + await changeFuture; + + assert.strictEqual(instance.isReusingRecursiveWatcher, false); + } + + test('correlated watch requests support suspend/resume (file, does not exist in beginning, parcel watcher reused)', async function () { + const recursiveWatcher = createParcelWatcher(); + await recursiveWatcher.watch([{ path: testDir, excludes: [], recursive: true }]); + + await createWatcher(recursiveWatcher); + + const filePath = join(testDir, 'not-found-2.txt'); + + const onDidWatchFail = Event.toPromise(watcher.onWatchFail); + const request = { path: filePath, excludes: [], recursive: false, correlationId: 1 }; + await watcher.watch([request]); + await onDidWatchFail; + assert.strictEqual(watcher.isSuspended(request), true); + + const changeFuture = awaitEvent(watcher, filePath, FileChangeType.ADDED, 1); + await Promises.writeFile(filePath, 'Hello World'); + await changeFuture; + + assert.strictEqual(watcher.isSuspended(request), false); + }); }); diff --git a/src/vs/platform/files/test/node/parcelWatcher.integrationTest.ts b/src/vs/platform/files/test/node/parcelWatcher.integrationTest.ts index 18a1a538433..749b6392c64 100644 --- a/src/vs/platform/files/test/node/parcelWatcher.integrationTest.ts +++ b/src/vs/platform/files/test/node/parcelWatcher.integrationTest.ts @@ -21,6 +21,40 @@ import { extUriBiasedIgnorePathCase } from 'vs/base/common/resources'; import { URI } from 'vs/base/common/uri'; import { addUNCHostToAllowlist } from 'vs/base/node/unc'; import { Emitter, Event } from 'vs/base/common/event'; +import { DisposableStore } from 'vs/base/common/lifecycle'; + +export class TestParcelWatcher extends ParcelWatcher { + + protected override readonly suspendedWatchRequestPollingInterval = 100; + + private readonly _onDidWatch = this._register(new Emitter()); + readonly onDidWatch = this._onDidWatch.event; + + readonly onWatchFail = this._onDidWatchFail.event; + + testRemoveDuplicateRequests(paths: string[], excludes: string[] = []): string[] { + + // Work with strings as paths to simplify testing + const requests: IRecursiveWatchRequest[] = paths.map(path => { + return { path, excludes, recursive: true }; + }); + + return this.removeDuplicateRequests(requests, false /* validate paths skipped for tests */).map(request => request.path); + } + + protected override async doWatch(requests: IRecursiveWatchRequest[]): Promise { + await super.doWatch(requests); + await this.whenReady(); + + this._onDidWatch.fire(); + } + + async whenReady(): Promise { + for (const watcher of this.watchers) { + await watcher.ready; + } + } +} // this suite has shown flaky runs in Azure pipelines where // tasks would just hang and timeout after a while (not in @@ -29,39 +63,6 @@ import { Emitter, Event } from 'vs/base/common/event'; ((process.env['BUILD_SOURCEVERSION'] || process.env['CI']) ? suite.skip : flakySuite)('File Watcher (parcel)', () => { - class TestParcelWatcher extends ParcelWatcher { - - protected override readonly suspendedWatchRequestPollingInterval = 100; - - private readonly _onDidWatch = this._register(new Emitter()); - readonly onDidWatch = this._onDidWatch.event; - - readonly onWatchFail = this._onDidWatchFail.event; - - testRemoveDuplicateRequests(paths: string[], excludes: string[] = []): string[] { - - // Work with strings as paths to simplify testing - const requests: IRecursiveWatchRequest[] = paths.map(path => { - return { path, excludes, recursive: true }; - }); - - return this.removeDuplicateRequests(requests, false /* validate paths skipped for tests */).map(request => request.path); - } - - protected override async doWatch(requests: IRecursiveWatchRequest[]): Promise { - await super.doWatch(requests); - await this.whenReady(); - - this._onDidWatch.fire(); - } - - async whenReady(): Promise { - for (const watcher of this.watchers) { - await watcher.ready; - } - } - } - let testDir: string; let watcher: TestParcelWatcher; @@ -90,7 +91,7 @@ import { Emitter, Event } from 'vs/base/common/event'; } }); - testDir = getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher'); + testDir = URI.file(getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher')).fsPath; const sourceDir = FileAccess.asFileUri('vs/platform/files/test/node/fixtures/service').fsPath; @@ -98,7 +99,18 @@ import { Emitter, Event } from 'vs/base/common/event'; }); teardown(async () => { + const watchers = watcher.watchers.size; + let stoppedInstances = 0; + for (const instance of watcher.watchers) { + Event.once(instance.onDidStop)(() => { + if (instance.stopped) { + stoppedInstances++; + } + }); + } + await watcher.stop(); + assert.strictEqual(stoppedInstances, watchers, 'All watchers must be stopped before the test ends'); watcher.dispose(); // Possible that the file watcher is still holding @@ -170,37 +182,68 @@ import { Emitter, Event } from 'vs/base/common/event'; } test('basics', async function () { - await watcher.watch([{ path: testDir, excludes: [], recursive: true }]); + const request = { path: testDir, excludes: [], recursive: true }; + await watcher.watch([request]); + assert.strictEqual(watcher.watchers.size, watcher.watchers.size); + + const instance = Array.from(watcher.watchers)[0]; + assert.strictEqual(request, instance.request); + assert.strictEqual(instance.failed, false); + assert.strictEqual(instance.stopped, false); + + const disposables = new DisposableStore(); + + const subscriptions1 = new Map(); + const subscriptions2 = new Map(); // New file const newFilePath = join(testDir, 'deep', 'newFile.txt'); + disposables.add(instance.subscribe(newFilePath, change => subscriptions1.set(change.resource.fsPath, change.type))); + disposables.add(instance.subscribe(newFilePath, change => subscriptions2.set(change.resource.fsPath, change.type))); // can subscribe multiple times + assert.strictEqual(instance.include(newFilePath), true); + assert.strictEqual(instance.exclude(newFilePath), false); let changeFuture: Promise = awaitEvent(watcher, newFilePath, FileChangeType.ADDED); await Promises.writeFile(newFilePath, 'Hello World'); await changeFuture; + assert.strictEqual(subscriptions1.get(newFilePath), FileChangeType.ADDED); + assert.strictEqual(subscriptions2.get(newFilePath), FileChangeType.ADDED); // New folder const newFolderPath = join(testDir, 'deep', 'New Folder'); + disposables.add(instance.subscribe(newFolderPath, change => subscriptions1.set(change.resource.fsPath, change.type))); + const disposable = instance.subscribe(newFolderPath, change => subscriptions2.set(change.resource.fsPath, change.type)); + disposable.dispose(); + assert.strictEqual(instance.include(newFolderPath), true); + assert.strictEqual(instance.exclude(newFolderPath), false); changeFuture = awaitEvent(watcher, newFolderPath, FileChangeType.ADDED); await Promises.mkdir(newFolderPath); await changeFuture; + assert.strictEqual(subscriptions1.get(newFolderPath), FileChangeType.ADDED); + assert.strictEqual(subscriptions2.has(newFolderPath), false /* subscription was disposed before the event */); // Rename file let renamedFilePath = join(testDir, 'deep', 'renamedFile.txt'); + disposables.add(instance.subscribe(renamedFilePath, change => subscriptions1.set(change.resource.fsPath, change.type))); changeFuture = Promise.all([ awaitEvent(watcher, newFilePath, FileChangeType.DELETED), awaitEvent(watcher, renamedFilePath, FileChangeType.ADDED) ]); await Promises.rename(newFilePath, renamedFilePath); await changeFuture; + assert.strictEqual(subscriptions1.get(newFilePath), FileChangeType.DELETED); + assert.strictEqual(subscriptions1.get(renamedFilePath), FileChangeType.ADDED); // Rename folder let renamedFolderPath = join(testDir, 'deep', 'Renamed Folder'); + disposables.add(instance.subscribe(renamedFolderPath, change => subscriptions1.set(change.resource.fsPath, change.type))); changeFuture = Promise.all([ awaitEvent(watcher, newFolderPath, FileChangeType.DELETED), awaitEvent(watcher, renamedFolderPath, FileChangeType.ADDED) ]); await Promises.rename(newFolderPath, renamedFolderPath); await changeFuture; + assert.strictEqual(subscriptions1.get(newFolderPath), FileChangeType.DELETED); + assert.strictEqual(subscriptions1.get(renamedFolderPath), FileChangeType.ADDED); // Rename file (same name, different case) const caseRenamedFilePath = join(testDir, 'deep', 'RenamedFile.txt'); @@ -280,13 +323,19 @@ import { Emitter, Event } from 'vs/base/common/event'; // Delete file changeFuture = awaitEvent(watcher, copiedFilepath, FileChangeType.DELETED); + disposables.add(instance.subscribe(copiedFilepath, change => subscriptions1.set(change.resource.fsPath, change.type))); await Promises.unlink(copiedFilepath); await changeFuture; + assert.strictEqual(subscriptions1.get(copiedFilepath), FileChangeType.DELETED); // Delete folder changeFuture = awaitEvent(watcher, copiedFolderpath, FileChangeType.DELETED); + disposables.add(instance.subscribe(copiedFolderpath, change => subscriptions1.set(change.resource.fsPath, change.type))); await Promises.rmdir(copiedFolderpath); await changeFuture; + assert.strictEqual(subscriptions1.get(copiedFolderpath), FileChangeType.DELETED); + + disposables.dispose(); }); (isMacintosh /* this test seems not possible with fsevents backend */ ? test.skip : test)('basics (atomic writes)', async function () { @@ -676,26 +725,59 @@ import { Emitter, Event } from 'vs/base/common/event'; await watcher.watch([{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]); + let failed = false; + const instance = Array.from(watcher.watchers)[0]; + assert.strictEqual(instance.include(folderPath), true); + instance.onDidFail(() => failed = true); + const onDidWatchFail = Event.toPromise(watcher.onWatchFail); const changeFuture = awaitEvent(watcher, folderPath, FileChangeType.DELETED, undefined, 1); Promises.rm(folderPath, RimRafMode.UNLINK); await onDidWatchFail; await changeFuture; + assert.strictEqual(failed, true); + assert.strictEqual(instance.failed, true); }); - test('correlated watch requests support suspend/resume (folder, does not exist in beginning)', async () => { + test('correlated watch requests support suspend/resume (folder, does not exist in beginning, not reusing watcher)', async () => { + await testCorrelatedWatchFolderDoesNotExist(false); + }); + + test('correlated watch requests support suspend/resume (folder, does not exist in beginning, reusing watcher)', async () => { + await testCorrelatedWatchFolderDoesNotExist(true); + }); + + async function testCorrelatedWatchFolderDoesNotExist(reuseExistingWatcher: boolean) { let onDidWatchFail = Event.toPromise(watcher.onWatchFail); const folderPath = join(testDir, 'not-found'); - await watcher.watch([{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]); + + const requests: IRecursiveWatchRequest[] = []; + if (reuseExistingWatcher) { + requests.push({ path: testDir, excludes: [], recursive: true }); + await watcher.watch(requests); + } + + const request: IRecursiveWatchRequest = { path: folderPath, excludes: [], recursive: true, correlationId: 1 }; + requests.push(request); + + await watcher.watch(requests); await onDidWatchFail; + if (reuseExistingWatcher) { + assert.strictEqual(watcher.isSuspended(request), true); + } else { + assert.strictEqual(watcher.isSuspended(request), 'polling'); + } + let changeFuture = awaitEvent(watcher, folderPath, FileChangeType.ADDED, undefined, 1); let onDidWatch = Event.toPromise(watcher.onDidWatch); await Promises.mkdir(folderPath); await changeFuture; await onDidWatch; + assert.strictEqual(watcher.isSuspended(request), false); + const filePath = join(folderPath, 'newFile.txt'); await basicCrudTest(filePath, 1); @@ -710,11 +792,25 @@ import { Emitter, Event } from 'vs/base/common/event'; await onDidWatch; await basicCrudTest(filePath, 1); + } + + test('correlated watch requests support suspend/resume (folder, exist in beginning, not reusing watcher)', async () => { + await testCorrelatedWatchFolderExists(false); }); - test('correlated watch requests support suspend/resume (folder, exist in beginning)', async () => { + test('correlated watch requests support suspend/resume (folder, exist in beginning, reusing watcher)', async () => { + await testCorrelatedWatchFolderExists(true); + }); + + async function testCorrelatedWatchFolderExists(reuseExistingWatcher: boolean) { const folderPath = join(testDir, 'deep'); - await watcher.watch([{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]); + + const requests: IRecursiveWatchRequest[] = [{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]; + if (reuseExistingWatcher) { + requests.push({ path: testDir, excludes: [], recursive: true }); + } + + await watcher.watch(requests); const filePath = join(folderPath, 'newFile.txt'); await basicCrudTest(filePath, 1); @@ -730,5 +826,25 @@ import { Emitter, Event } from 'vs/base/common/event'; await onDidWatch; await basicCrudTest(filePath, 1); + } + + test('watch request reuses another recursive watcher even when requests are coming in at the same time', async function () { + const folderPath1 = join(testDir, 'deep', 'not-existing1'); + const folderPath2 = join(testDir, 'deep', 'not-existing2'); + const folderPath3 = join(testDir, 'not-existing3'); + + const requests: IRecursiveWatchRequest[] = [ + { path: folderPath1, excludes: [], recursive: true, correlationId: 1 }, + { path: folderPath2, excludes: [], recursive: true, correlationId: 2 }, + { path: folderPath3, excludes: [], recursive: true, correlationId: 3 }, + { path: join(testDir, 'deep'), excludes: [], recursive: true } + ]; + + await watcher.watch(requests); + + assert.strictEqual(watcher.isSuspended(requests[0]), true); + assert.strictEqual(watcher.isSuspended(requests[1]), true); + assert.strictEqual(watcher.isSuspended(requests[2]), 'polling'); + assert.strictEqual(watcher.isSuspended(requests[3]), false); }); }); diff --git a/src/vs/workbench/electron-sandbox/desktop.main.ts b/src/vs/workbench/electron-sandbox/desktop.main.ts index 1118dd5d38f..90b3fb50616 100644 --- a/src/vs/workbench/electron-sandbox/desktop.main.ts +++ b/src/vs/workbench/electron-sandbox/desktop.main.ts @@ -240,7 +240,7 @@ export class DesktopMain extends Disposable { serviceCollection.set(IRemoteAuthorityResolverService, remoteAuthorityResolverService); // Local Files - const diskFileSystemProvider = this._register(new DiskFileSystemProvider(mainProcessService, utilityProcessWorkerWorkbenchService, logService)); + const diskFileSystemProvider = this._register(new DiskFileSystemProvider(mainProcessService, utilityProcessWorkerWorkbenchService, logService, loggerService)); fileService.registerProvider(Schemas.file, diskFileSystemProvider); // URI Identity diff --git a/src/vs/workbench/services/files/electron-sandbox/diskFileSystemProvider.ts b/src/vs/workbench/services/files/electron-sandbox/diskFileSystemProvider.ts index 69924c80b7b..b399f7db908 100644 --- a/src/vs/workbench/services/files/electron-sandbox/diskFileSystemProvider.ts +++ b/src/vs/workbench/services/files/electron-sandbox/diskFileSystemProvider.ts @@ -3,6 +3,7 @@ * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ +import { localize } from 'vs/nls'; import { Event } from 'vs/base/common/event'; import { isLinux } from 'vs/base/common/platform'; import { FileSystemProviderCapabilities, IFileDeleteOptions, IStat, FileType, IFileReadStreamOptions, IFileWriteOptions, IFileOpenOptions, IFileOverwriteOptions, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileAtomicReadCapability, IFileAtomicReadOptions, IFileSystemProviderWithFileCloneCapability, IFileChange } from 'vs/platform/files/common/files'; @@ -14,8 +15,9 @@ import { URI } from 'vs/base/common/uri'; import { DiskFileSystemProviderClient, LOCAL_FILE_SYSTEM_CHANNEL_NAME } from 'vs/platform/files/common/diskFileSystemProviderClient'; import { ILogMessage, AbstractUniversalWatcherClient } from 'vs/platform/files/common/watcher'; import { UniversalWatcherClient } from 'vs/workbench/services/files/electron-sandbox/watcherClient'; -import { ILogService } from 'vs/platform/log/common/log'; +import { ILoggerService, ILogService } from 'vs/platform/log/common/log'; import { IUtilityProcessWorkerWorkbenchService } from 'vs/workbench/services/utilityProcess/electron-sandbox/utilityProcessWorkerWorkbenchService'; +import { LogService } from 'vs/platform/log/common/logService'; /** * A sandbox ready disk file system provider that delegates almost all calls @@ -35,7 +37,8 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple constructor( private readonly mainProcessService: IMainProcessService, private readonly utilityProcessWorkerWorkbenchService: IUtilityProcessWorkerWorkbenchService, - logService: ILogService + logService: ILogService, + private readonly loggerService: ILoggerService ) { super(logService, { watcher: { forceUniversal: true /* send all requests to universal watcher process */ } }); @@ -143,5 +146,22 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple throw new Error('Method not implemented in sandbox.'); // we never expect this to be called given we set `forceUniversal: true` } + private _watcherLogService: ILogService | undefined = undefined; + private get watcherLogService(): ILogService { + if (!this._watcherLogService) { + this._watcherLogService = new LogService(this.loggerService.createLogger('fileWatcher', { name: localize('fileWatcher', "File Watcher") })); + } + + return this._watcherLogService; + } + + protected override logWatcherMessage(msg: ILogMessage): void { + this.watcherLogService[msg.type](msg.message); + + if (msg.type !== 'trace' && msg.type !== 'debug') { + super.logWatcherMessage(msg); // allow non-verbose log messages in window log + } + } + //#endregion }