watcher - deduplicate non-recursive file watch requests if possible (#208325)

This commit is contained in:
Benjamin Pasero 2024-03-28 12:09:15 +01:00 committed by GitHub
parent 16a32fa6c4
commit 4b90a20d3a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1006 additions and 225 deletions

View file

@ -211,6 +211,10 @@ export abstract class AbstractDiskFileSystemProvider extends Disposable implemen
this._onDidWatchError.fire(msg.message);
}
this.logWatcherMessage(msg);
}
protected logWatcherMessage(msg: ILogMessage): void {
this.logService[msg.type](msg.message);
}

View file

@ -5,7 +5,7 @@
import { Event } from 'vs/base/common/event';
import { GLOBSTAR, IRelativePattern, parse, ParsedPattern } from 'vs/base/common/glob';
import { Disposable, DisposableStore, MutableDisposable } from 'vs/base/common/lifecycle';
import { Disposable, DisposableStore, IDisposable, MutableDisposable } from 'vs/base/common/lifecycle';
import { isAbsolute } from 'vs/base/common/path';
import { isLinux } from 'vs/base/common/platform';
import { URI } from 'vs/base/common/uri';
@ -122,6 +122,20 @@ export interface IRecursiveWatcher extends IWatcher {
watch(requests: IRecursiveWatchRequest[]): Promise<void>;
}
export interface IRecursiveWatcherWithSubscribe extends IRecursiveWatcher {
/**
* Subscribe to file events for the given path. The callback is called
* whenever a file event occurs for the path. I fthe watcher failed,
* the error parameter is set to `true`.
*
* @returns an `IDisposable` to stop listening to events or `undefined`
* if no events can be watched for the path given the current set of
* recursive watch requests.
*/
subscribe(path: string, callback: (error: boolean, change?: IFileChange) => void): IDisposable | undefined;
}
export interface IRecursiveWatcherOptions {
/**

View file

@ -5,10 +5,11 @@
import { watchFile, unwatchFile, Stats } from 'fs';
import { Disposable, DisposableMap, DisposableStore, toDisposable } from 'vs/base/common/lifecycle';
import { ILogMessage, IUniversalWatchRequest, IWatchRequestWithCorrelation, IWatcher, isWatchRequestWithCorrelation } from 'vs/platform/files/common/watcher';
import { ILogMessage, IRecursiveWatcherWithSubscribe, IUniversalWatchRequest, IWatchRequestWithCorrelation, IWatcher, isWatchRequestWithCorrelation } from 'vs/platform/files/common/watcher';
import { Emitter, Event } from 'vs/base/common/event';
import { FileChangeType, IFileChange } from 'vs/platform/files/common/files';
import { URI } from 'vs/base/common/uri';
import { DeferredPromise } from 'vs/base/common/async';
export abstract class BaseWatcher extends Disposable implements IWatcher {
@ -25,9 +26,12 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
private readonly allCorrelatedWatchRequests = new Map<number /* correlation ID */, IWatchRequestWithCorrelation>();
private readonly suspendedWatchRequests = this._register(new DisposableMap<number /* correlation ID */>());
private readonly suspendedWatchRequestsWithPolling = new Set<number /* correlation ID */>();
protected readonly suspendedWatchRequestPollingInterval: number = 5007; // node.js default
private joinWatch = new DeferredPromise<void>();
constructor() {
super();
@ -41,6 +45,11 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
// to experiment with this feature in a controlled way. Monitoring requests
// requires us to install polling watchers (via `fs.watchFile()`) and thus
// should be used sparingly.
//
// TODO@bpasero revisit this in the future to have a more general approach
// for suspend/resume and drop the `legacyMonitorRequest` in parcel.
// One issue is that we need to be able to uniquely identify a request and
// without correlation that is actually harder...
return;
}
@ -53,26 +62,36 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
}
async watch(requests: IUniversalWatchRequest[]): Promise<void> {
this.allCorrelatedWatchRequests.clear();
this.allNonCorrelatedWatchRequests.clear();
// Figure out correlated vs. non-correlated requests
for (const request of requests) {
if (this.isCorrelated(request)) {
this.allCorrelatedWatchRequests.set(request.correlationId, request);
} else {
this.allNonCorrelatedWatchRequests.add(request);
}
if (!this.joinWatch.isSettled) {
this.joinWatch.complete();
}
this.joinWatch = new DeferredPromise<void>();
// Remove all suspended correlated watch requests that are no longer watched
for (const [correlationId] of this.suspendedWatchRequests) {
if (!this.allCorrelatedWatchRequests.has(correlationId)) {
this.suspendedWatchRequests.deleteAndDispose(correlationId);
try {
this.allCorrelatedWatchRequests.clear();
this.allNonCorrelatedWatchRequests.clear();
// Figure out correlated vs. non-correlated requests
for (const request of requests) {
if (this.isCorrelated(request)) {
this.allCorrelatedWatchRequests.set(request.correlationId, request);
} else {
this.allNonCorrelatedWatchRequests.add(request);
}
}
}
return this.updateWatchers();
// Remove all suspended correlated watch requests that are no longer watched
for (const [correlationId] of this.suspendedWatchRequests) {
if (!this.allCorrelatedWatchRequests.has(correlationId)) {
this.suspendedWatchRequests.deleteAndDispose(correlationId);
this.suspendedWatchRequestsWithPolling.delete(correlationId);
}
}
return await this.updateWatchers();
} finally {
this.joinWatch.complete();
}
}
private updateWatchers(): Promise<void> {
@ -82,7 +101,15 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
]);
}
private suspendWatchRequest(request: IWatchRequestWithCorrelation): void {
isSuspended(request: IUniversalWatchRequest): 'polling' | boolean {
if (typeof request.correlationId !== 'number') {
return false;
}
return this.suspendedWatchRequestsWithPolling.has(request.correlationId) ? 'polling' : this.suspendedWatchRequests.has(request.correlationId);
}
private async suspendWatchRequest(request: IWatchRequestWithCorrelation): Promise<void> {
if (this.suspendedWatchRequests.has(request.correlationId)) {
return; // already suspended
}
@ -90,6 +117,17 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
const disposables = new DisposableStore();
this.suspendedWatchRequests.set(request.correlationId, disposables);
// It is possible that a watch request fails right during watch()
// phase while other requests succeed. To increase the chance of
// reusing another watcher for suspend/resume tracking, we await
// all watch requests having processed.
await this.joinWatch.p;
if (disposables.isDisposed) {
return;
}
this.monitorSuspendedWatchRequest(request, disposables);
this.updateWatchers();
@ -97,14 +135,44 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
private resumeWatchRequest(request: IWatchRequestWithCorrelation): void {
this.suspendedWatchRequests.deleteAndDispose(request.correlationId);
this.suspendedWatchRequestsWithPolling.delete(request.correlationId);
this.updateWatchers();
}
private monitorSuspendedWatchRequest(request: IWatchRequestWithCorrelation, disposables: DisposableStore) {
const resource = URI.file(request.path);
const that = this;
private monitorSuspendedWatchRequest(request: IWatchRequestWithCorrelation, disposables: DisposableStore): void {
if (this.doMonitorWithExistingWatcher(request, disposables)) {
this.trace(`reusing an existing recursive watcher to monitor ${request.path}`);
this.suspendedWatchRequestsWithPolling.delete(request.correlationId);
} else {
this.doMonitorWithNodeJS(request, disposables);
this.suspendedWatchRequestsWithPolling.add(request.correlationId);
}
}
private doMonitorWithExistingWatcher(request: IWatchRequestWithCorrelation, disposables: DisposableStore): boolean {
const subscription = this.recursiveWatcher?.subscribe(request.path, (error, change) => {
if (disposables.isDisposed) {
return; // return early if already disposed
}
if (error) {
this.monitorSuspendedWatchRequest(request, disposables);
} else if (change?.type === FileChangeType.ADDED) {
this.onMonitoredPathAdded(request);
}
});
if (subscription) {
disposables.add(subscription);
return true;
}
return false;
}
private doMonitorWithNodeJS(request: IWatchRequestWithCorrelation, disposables: DisposableStore): void {
let pathNotFound = false;
const watchFileCallback: (curr: Stats, prev: Stats) => void = (curr, prev) => {
@ -119,15 +187,7 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
// Watch path created: resume watching request
if (!currentPathNotFound && (previousPathNotFound || oldPathNotFound)) {
this.trace(`fs.watchFile() detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`);
// Emit as event
const event: IFileChange = { resource, type: FileChangeType.ADDED, cId: request.correlationId };
that._onDidChangeFile.fire([event]);
this.traceEvent(event, request);
// Resume watching
this.resumeWatchRequest(request);
this.onMonitoredPathAdded(request);
}
};
@ -149,12 +209,25 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
}));
}
private onMonitoredPathAdded(request: IWatchRequestWithCorrelation) {
this.trace(`detected ${request.path} exists again, resuming watcher (correlationId: ${request.correlationId})`);
// Emit as event
const event: IFileChange = { resource: URI.file(request.path), type: FileChangeType.ADDED, cId: request.correlationId };
this._onDidChangeFile.fire([event]);
this.traceEvent(event, request);
// Resume watching
this.resumeWatchRequest(request);
}
private isPathNotFound(stats: Stats): boolean {
return stats.ctimeMs === 0 && stats.ino === 0;
}
async stop(): Promise<void> {
this.suspendedWatchRequests.clearAndDisposeAll();
this.suspendedWatchRequestsWithPolling.clear();
}
protected traceEvent(event: IFileChange, request: IUniversalWatchRequest): void {
@ -168,6 +241,8 @@ export abstract class BaseWatcher extends Disposable implements IWatcher {
protected abstract doWatch(requests: IUniversalWatchRequest[]): Promise<void>;
protected abstract readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined;
protected abstract trace(message: string): void;
protected abstract warn(message: string): void;

View file

@ -21,6 +21,6 @@ export class NodeJSWatcherClient extends AbstractNonRecursiveWatcherClient {
}
protected override createWatcher(disposables: DisposableStore): INonRecursiveWatcher {
return disposables.add(new NodeJSWatcher()) satisfies INonRecursiveWatcher;
return disposables.add(new NodeJSWatcher(undefined /* no recursive watching support here */)) satisfies INonRecursiveWatcher;
}
}

View file

@ -7,7 +7,7 @@ import { Event } from 'vs/base/common/event';
import { patternsEquals } from 'vs/base/common/glob';
import { BaseWatcher } from 'vs/platform/files/node/watcher/baseWatcher';
import { isLinux } from 'vs/base/common/platform';
import { INonRecursiveWatchRequest, INonRecursiveWatcher } from 'vs/platform/files/common/watcher';
import { INonRecursiveWatchRequest, INonRecursiveWatcher, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher';
import { NodeJSFileWatcherLibrary } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcherLib';
import { isEqual } from 'vs/base/common/extpath';
@ -28,10 +28,14 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher {
readonly onDidError = Event.None;
protected readonly watchers = new Set<INodeJSWatcherInstance>();
readonly watchers = new Set<INodeJSWatcherInstance>();
private verboseLogging = false;
constructor(protected readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined) {
super();
}
protected override async doWatch(requests: INonRecursiveWatchRequest[]): Promise<void> {
// Figure out duplicates to remove from the requests
@ -47,7 +51,6 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher {
} else {
requestsToStart.push(request); // start watching
}
}
// Logging
@ -95,7 +98,7 @@ export class NodeJSWatcher extends BaseWatcher implements INonRecursiveWatcher {
private startWatching(request: INonRecursiveWatchRequest): void {
// Start via node.js lib
const instance = new NodeJSFileWatcherLibrary(request, changes => this._onDidChangeFile.fire(changes), () => this._onDidWatchFail.fire(request), msg => this._onDidLogMessage.fire(msg), this.verboseLogging);
const instance = new NodeJSFileWatcherLibrary(request, this.recursiveWatcher, changes => this._onDidChangeFile.fire(changes), () => this._onDidWatchFail.fire(request), msg => this._onDidLogMessage.fire(msg), this.verboseLogging);
// Remember as watcher instance
const watcher: INodeJSWatcherInstance = { request, instance };

View file

@ -16,7 +16,7 @@ import { URI } from 'vs/base/common/uri';
import { realcase } from 'vs/base/node/extpath';
import { Promises } from 'vs/base/node/pfs';
import { FileChangeType, IFileChange } from 'vs/platform/files/common/files';
import { ILogMessage, coalesceEvents, INonRecursiveWatchRequest, parseWatcherPatterns } from 'vs/platform/files/common/watcher';
import { ILogMessage, coalesceEvents, INonRecursiveWatchRequest, parseWatcherPatterns, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher';
export class NodeJSFileWatcherLibrary extends Disposable {
@ -56,8 +56,15 @@ export class NodeJSFileWatcherLibrary extends Disposable {
readonly ready = this.watch();
private _isReusingRecursiveWatcher = false;
get isReusingRecursiveWatcher(): boolean { return this._isReusingRecursiveWatcher; }
private didFail = false;
get failed(): boolean { return this.didFail; }
constructor(
private readonly request: INonRecursiveWatchRequest,
private readonly recursiveWatcher: IRecursiveWatcherWithSubscribe | undefined,
private readonly onDidFilesChange: (changes: IFileChange[]) => void,
private readonly onDidWatchFail?: () => void,
private readonly onLogMessage?: (msg: ILogMessage) => void,
@ -88,10 +95,16 @@ export class NodeJSFileWatcherLibrary extends Disposable {
this.trace(`ignoring a path for watching who's stat info failed to resolve: ${this.request.path} (error: ${error})`);
}
this.onDidWatchFail?.();
this.notifyWatchFailed();
}
}
private notifyWatchFailed(): void {
this.didFail = true;
this.onDidWatchFail?.();
}
private async normalizePath(request: INonRecursiveWatchRequest): Promise<string> {
let realPath = request.path;
@ -117,41 +130,95 @@ export class NodeJSFileWatcherLibrary extends Disposable {
return realPath;
}
private async doWatch(path: string, isDirectory: boolean): Promise<IDisposable> {
private async doWatch(realPath: string, isDirectory: boolean): Promise<IDisposable> {
const disposables = new DisposableStore();
if (this.doWatchWithExistingWatcher(realPath, isDirectory, disposables)) {
this.trace(`reusing an existing recursive watcher for ${this.request.path}`);
this._isReusingRecursiveWatcher = true;
} else {
this._isReusingRecursiveWatcher = false;
await this.doWatchWithNodeJS(realPath, isDirectory, disposables);
}
return disposables;
}
private doWatchWithExistingWatcher(realPath: string, isDirectory: boolean, disposables: DisposableStore): boolean {
if (isDirectory) {
return false; // only supported for files where we have the full path known upfront
}
const resource = URI.file(this.request.path);
const subscription = this.recursiveWatcher?.subscribe(this.request.path, async (error, change) => {
if (disposables.isDisposed) {
return; // return early if already disposed
}
if (error) {
const watchDisposable = await this.doWatch(realPath, isDirectory);
if (!disposables.isDisposed) {
disposables.add(watchDisposable);
} else {
watchDisposable.dispose();
}
} else if (change) {
if (typeof change.cId === 'number' || typeof this.request.correlationId === 'number') {
// Re-emit this change with the correlation id of the request
// so that the client can correlate the event with the request
// properly. Without correlation, we do not have to do that
// because the event will appear on the global listener already.
this.onDidFilesChange([{ resource, type: change.type, cId: this.request.correlationId }]);
}
}
});
if (subscription) {
disposables.add(subscription);
return true;
}
return false;
}
private async doWatchWithNodeJS(realPath: string, isDirectory: boolean, disposables: DisposableStore): Promise<void> {
// macOS: watching samba shares can crash VSCode so we do
// a simple check for the file path pointing to /Volumes
// (https://github.com/microsoft/vscode/issues/106879)
// TODO@electron this needs a revisit when the crash is
// fixed or mitigated upstream.
if (isMacintosh && isEqualOrParent(path, '/Volumes/', true)) {
this.error(`Refusing to watch ${path} for changes using fs.watch() for possibly being a network share where watching is unreliable and unstable.`);
if (isMacintosh && isEqualOrParent(realPath, '/Volumes/', true)) {
this.error(`Refusing to watch ${realPath} for changes using fs.watch() for possibly being a network share where watching is unreliable and unstable.`);
return Disposable.None;
return;
}
const cts = new CancellationTokenSource(this.cts.token);
disposables.add(toDisposable(() => cts.dispose(true)));
const disposables = new DisposableStore();
const watcherDisposables = new DisposableStore(); // we need a separate disposable store because we re-create the watcher from within in some cases
disposables.add(watcherDisposables);
try {
const requestResource = URI.file(this.request.path);
const pathBasename = basename(path);
const pathBasename = basename(realPath);
// Creating watcher can fail with an exception
const watcher = watch(path);
disposables.add(toDisposable(() => {
const watcher = watch(realPath);
watcherDisposables.add(toDisposable(() => {
watcher.removeAllListeners();
watcher.close();
}));
this.trace(`Started watching: '${path}'`);
this.trace(`Started watching: '${realPath}'`);
// Folder: resolve children to emit proper events
const folderChildren = new Set<string>();
if (isDirectory) {
try {
for (const child of await Promises.readdir(path)) {
for (const child of await Promises.readdir(realPath)) {
folderChildren.add(child);
}
} catch (error) {
@ -159,8 +226,12 @@ export class NodeJSFileWatcherLibrary extends Disposable {
}
}
if (cts.token.isCancellationRequested) {
return;
}
const mapPathToStatDisposable = new Map<string, IDisposable>();
disposables.add(toDisposable(() => {
watcherDisposables.add(toDisposable(() => {
for (const [, disposable] of mapPathToStatDisposable) {
disposable.dispose();
}
@ -168,9 +239,13 @@ export class NodeJSFileWatcherLibrary extends Disposable {
}));
watcher.on('error', (code: number, signal: string) => {
this.error(`Failed to watch ${path} for changes using fs.watch() (${code}, ${signal})`);
if (cts.token.isCancellationRequested) {
return;
}
this.onDidWatchFail?.();
this.error(`Failed to watch ${realPath} for changes using fs.watch() (${code}, ${signal})`);
this.notifyWatchFailed();
});
watcher.on('change', (type, raw) => {
@ -228,17 +303,21 @@ export class NodeJSFileWatcherLibrary extends Disposable {
// file watching specifically we want to handle
// the atomic-write cases where the file is being
// deleted and recreated with different contents.
if (changedFileName === pathBasename && !await Promises.exists(path)) {
if (changedFileName === pathBasename && !await Promises.exists(realPath)) {
this.onWatchedPathDeleted(requestResource);
return;
}
if (cts.token.isCancellationRequested) {
return;
}
// In order to properly detect renames on a case-insensitive
// file system, we need to use `existsChildStrictCase` helper
// because otherwise we would wrongly assume a file exists
// when it was renamed to same name but different case.
const fileExists = await this.existsChildStrictCase(join(path, changedFileName));
const fileExists = await this.existsChildStrictCase(join(realPath, changedFileName));
if (cts.token.isCancellationRequested) {
return; // ignore if disposed by now
@ -310,7 +389,7 @@ export class NodeJSFileWatcherLibrary extends Disposable {
// because the watcher is disposed then.
const timeoutHandle = setTimeout(async () => {
const fileExists = await Promises.exists(path);
const fileExists = await Promises.exists(realPath);
if (cts.token.isCancellationRequested) {
return; // ignore if disposed by now
@ -320,7 +399,7 @@ export class NodeJSFileWatcherLibrary extends Disposable {
if (fileExists) {
this.onFileChange({ resource: requestResource, type: FileChangeType.UPDATED, cId: this.request.correlationId }, true /* skip excludes/includes (file is explicitly watched) */);
disposables.add(await this.doWatch(path, false));
watcherDisposables.add(await this.doWatch(realPath, false));
}
// File seems to be really gone, so emit a deleted and failed event
@ -331,8 +410,8 @@ export class NodeJSFileWatcherLibrary extends Disposable {
// Very important to dispose the watcher which now points to a stale inode
// and wire in a new disposable that tracks our timeout that is installed
disposables.clear();
disposables.add(toDisposable(() => clearTimeout(timeoutHandle)));
watcherDisposables.clear();
watcherDisposables.add(toDisposable(() => clearTimeout(timeoutHandle)));
}
// File changed
@ -343,16 +422,11 @@ export class NodeJSFileWatcherLibrary extends Disposable {
});
} catch (error) {
if (!cts.token.isCancellationRequested) {
this.error(`Failed to watch ${path} for changes using fs.watch() (${error.toString()})`);
this.error(`Failed to watch ${realPath} for changes using fs.watch() (${error.toString()})`);
}
this.onDidWatchFail?.();
this.notifyWatchFailed();
}
return toDisposable(() => {
cts.dispose(true);
disposables.dispose();
});
}
private onWatchedPathDeleted(resource: URI): void {
@ -362,7 +436,7 @@ export class NodeJSFileWatcherLibrary extends Disposable {
this.onFileChange({ resource, type: FileChangeType.DELETED, cId: this.request.correlationId }, true /* skip excludes/includes (file is explicitly watched) */);
this.fileChangesAggregator.flush();
this.onDidWatchFail?.();
this.notifyWatchFailed();
}
private onFileChange(event: IFileChange, skipIncludeExcludeChecks = false): void {
@ -476,7 +550,7 @@ export async function watchFileContents(path: string, onData: (chunk: Uint8Array
let isReading = false;
const request: INonRecursiveWatchRequest = { path, excludes: [], recursive: false };
const watcher = new NodeJSFileWatcherLibrary(request, changes => {
const watcher = new NodeJSFileWatcherLibrary(request, undefined, changes => {
(async () => {
for (const { type } of changes) {
if (type === FileChangeType.UPDATED) {

View file

@ -10,9 +10,9 @@ import { URI } from 'vs/base/common/uri';
import { DeferredPromise, RunOnceScheduler, RunOnceWorker, ThrottledWorker } from 'vs/base/common/async';
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import { toErrorMessage } from 'vs/base/common/errorMessage';
import { Emitter } from 'vs/base/common/event';
import { randomPath, isEqual } from 'vs/base/common/extpath';
import { GLOBSTAR, ParsedPattern, patternsEquals } from 'vs/base/common/glob';
import { Emitter, Event } from 'vs/base/common/event';
import { randomPath, isEqual, isEqualOrParent } from 'vs/base/common/extpath';
import { GLOBSTAR, patternsEquals } from 'vs/base/common/glob';
import { BaseWatcher } from 'vs/platform/files/node/watcher/baseWatcher';
import { TernarySearchTree } from 'vs/base/common/ternarySearchTree';
import { normalizeNFC } from 'vs/base/common/normalization';
@ -21,44 +21,121 @@ import { isLinux, isMacintosh, isWindows } from 'vs/base/common/platform';
import { realcaseSync, realpathSync } from 'vs/base/node/extpath';
import { NodeJSFileWatcherLibrary } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcherLib';
import { FileChangeType, IFileChange } from 'vs/platform/files/common/files';
import { coalesceEvents, IRecursiveWatchRequest, IRecursiveWatcher, parseWatcherPatterns } from 'vs/platform/files/common/watcher';
import { coalesceEvents, IRecursiveWatchRequest, parseWatcherPatterns, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher';
import { Disposable, DisposableStore, IDisposable, toDisposable } from 'vs/base/common/lifecycle';
export interface IParcelWatcherInstance {
export class ParcelWatcherInstance extends Disposable {
/**
* Signals when the watcher is ready to watch.
*/
readonly ready: Promise<unknown>;
private readonly _onDidStop = this._register(new Emitter<{ joinRestart?: Promise<void> }>());
readonly onDidStop = this._onDidStop.event;
/**
* The watch request associated to the watcher.
*/
readonly request: IRecursiveWatchRequest;
private readonly _onDidFail = this._register(new Emitter<void>());
readonly onDidFail = this._onDidFail.event;
/**
* How often this watcher has been restarted in case of an unexpected
* shutdown.
*/
readonly restarts: number;
private didFail = false;
get failed(): boolean { return this.didFail; }
/**
* The cancellation token associated with the lifecycle of the watcher.
*/
readonly token: CancellationToken;
private didStop = false;
get stopped(): boolean { return this.didStop; }
/**
* An event aggregator to coalesce events and reduce duplicates.
*/
readonly worker: RunOnceWorker<IFileChange>;
private readonly includes = this.request.includes ? parseWatcherPatterns(this.request.path, this.request.includes) : undefined;
private readonly excludes = this.request.excludes ? parseWatcherPatterns(this.request.path, this.request.excludes) : undefined;
/**
* Stops and disposes the watcher. This operation is async to await
* unsubscribe call in Parcel.
*/
stop(): Promise<void>;
private readonly subscriptions = new Map<string, Set<(change: IFileChange) => void>>();
constructor(
/**
* Signals when the watcher is ready to watch.
*/
readonly ready: Promise<unknown>,
readonly request: IRecursiveWatchRequest,
/**
* How often this watcher has been restarted in case of an unexpected
* shutdown.
*/
readonly restarts: number,
/**
* The cancellation token associated with the lifecycle of the watcher.
*/
readonly token: CancellationToken,
/**
* An event aggregator to coalesce events and reduce duplicates.
*/
readonly worker: RunOnceWorker<IFileChange>,
private readonly stopFn: () => Promise<void>
) {
super();
this._register(toDisposable(() => this.subscriptions.clear()));
}
subscribe(path: string, callback: (change: IFileChange) => void): IDisposable {
path = URI.file(path).fsPath; // make sure to store the path in `fsPath` form to match it with events later
let subscriptions = this.subscriptions.get(path);
if (!subscriptions) {
subscriptions = new Set();
this.subscriptions.set(path, subscriptions);
}
subscriptions.add(callback);
return toDisposable(() => {
const subscriptions = this.subscriptions.get(path);
if (subscriptions) {
subscriptions.delete(callback);
if (subscriptions.size === 0) {
this.subscriptions.delete(path);
}
}
});
}
get subscriptionsCount(): number {
return this.subscriptions.size;
}
notifyFileChange(path: string, change: IFileChange): void {
const subscriptions = this.subscriptions.get(path);
if (subscriptions) {
for (const subscription of subscriptions) {
subscription(change);
}
}
}
notifyWatchFailed(): void {
this.didFail = true;
this._onDidFail.fire();
}
include(path: string): boolean {
if (!this.includes || this.includes.length === 0) {
return true; // no specific includes defined, include all
}
return this.includes.some(include => include(path));
}
exclude(path: string): boolean {
return Boolean(this.excludes?.some(exclude => exclude(path)));
}
async stop(joinRestart: Promise<void> | undefined): Promise<void> {
this.didStop = true;
try {
await this.stopFn();
} finally {
this._onDidStop.fire({ joinRestart });
this.dispose();
}
}
}
export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcherWithSubscribe {
private static readonly MAP_PARCEL_WATCHER_ACTION_TO_FILE_CHANGE = new Map<parcelWatcher.EventType, number>(
[
@ -73,7 +150,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
private readonly _onDidError = this._register(new Emitter<string>());
readonly onDidError = this._onDidError.event;
protected readonly watchers = new Set<IParcelWatcherInstance>();
readonly watchers = new Set<ParcelWatcherInstance>();
// A delay for collecting file changes from Parcel
// before collecting them for coalescing and emitting.
@ -155,7 +232,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
}
private findWatcher(request: IRecursiveWatchRequest): IParcelWatcherInstance | undefined {
private findWatcher(request: IRecursiveWatchRequest): ParcelWatcherInstance | undefined {
for (const watcher of this.watchers) {
// Requests or watchers with correlation always match on that
@ -184,13 +261,13 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
const snapshotFile = randomPath(tmpdir(), 'vscode-watcher-snapshot');
// Remember as watcher instance
const watcher: IParcelWatcherInstance = {
const watcher: ParcelWatcherInstance = new ParcelWatcherInstance(
instance.p,
request,
ready: instance.p,
restarts,
token: cts.token,
worker: new RunOnceWorker<IFileChange>(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY),
stop: async () => {
cts.token,
new RunOnceWorker<IFileChange>(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY),
async () => {
cts.dispose(true);
watcher.worker.flush();
@ -199,15 +276,12 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
pollingWatcher.dispose();
unlinkSync(snapshotFile);
}
};
);
this.watchers.add(watcher);
// Path checks for symbolic links / wrong casing
const { realPath, realPathDiffers, realPathLength } = this.normalizePath(request);
// Warm up include patterns for usage
const includePatterns = request.includes ? parseWatcherPatterns(request.path, request.includes) : undefined;
this.trace(`Started watching: '${realPath}' with polling interval '${pollingInterval}'`);
let counter = 0;
@ -228,7 +302,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
// Handle & emit events
this.onParcelEvents(parcelEvents, watcher, includePatterns, realPathDiffers, realPathLength);
this.onParcelEvents(parcelEvents, watcher, realPathDiffers, realPathLength);
}
// Store a snapshot of files to the snapshot file
@ -255,13 +329,13 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
const instance = new DeferredPromise<parcelWatcher.AsyncSubscription | undefined>();
// Remember as watcher instance
const watcher: IParcelWatcherInstance = {
const watcher: ParcelWatcherInstance = new ParcelWatcherInstance(
instance.p,
request,
ready: instance.p,
restarts,
token: cts.token,
worker: new RunOnceWorker<IFileChange>(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY),
stop: async () => {
cts.token,
new RunOnceWorker<IFileChange>(events => this.handleParcelEvents(events, watcher), ParcelWatcher.FILE_CHANGES_HANDLER_DELAY),
async () => {
cts.dispose(true);
watcher.worker.flush();
@ -270,15 +344,12 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
const watcherInstance = await instance.p;
await watcherInstance?.unsubscribe();
}
};
);
this.watchers.add(watcher);
// Path checks for symbolic links / wrong casing
const { realPath, realPathDiffers, realPathLength } = this.normalizePath(request);
// Warm up include patterns for usage
const includePatterns = request.includes ? parseWatcherPatterns(request.path, request.includes) : undefined;
parcelWatcher.subscribe(realPath, (error, parcelEvents) => {
if (watcher.token.isCancellationRequested) {
return; // return early when disposed
@ -293,7 +364,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
// Handle & emit events
this.onParcelEvents(parcelEvents, watcher, includePatterns, realPathDiffers, realPathLength);
this.onParcelEvents(parcelEvents, watcher, realPathDiffers, realPathLength);
}, {
backend: ParcelWatcher.PARCEL_WATCHER_BACKEND,
ignore: watcher.request.excludes
@ -306,11 +377,12 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
instance.complete(undefined);
watcher.notifyWatchFailed();
this._onDidWatchFail.fire(request);
});
}
private onParcelEvents(parcelEvents: parcelWatcher.Event[], watcher: IParcelWatcherInstance, includes: ParsedPattern[] | undefined, realPathDiffers: boolean, realPathLength: number): void {
private onParcelEvents(parcelEvents: parcelWatcher.Event[], watcher: ParcelWatcherInstance, realPathDiffers: boolean, realPathLength: number): void {
if (parcelEvents.length === 0) {
return;
}
@ -321,7 +393,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
this.normalizeEvents(parcelEvents, watcher.request, realPathDiffers, realPathLength);
// Check for includes
const includedEvents = this.handleIncludes(watcher, parcelEvents, includes);
const includedEvents = this.handleIncludes(watcher, parcelEvents);
// Add to event aggregator for later processing
for (const includedEvent of includedEvents) {
@ -329,7 +401,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
}
private handleIncludes(watcher: IParcelWatcherInstance, parcelEvents: parcelWatcher.Event[], includes: ParsedPattern[] | undefined): IFileChange[] {
private handleIncludes(watcher: ParcelWatcherInstance, parcelEvents: parcelWatcher.Event[]): IFileChange[] {
const events: IFileChange[] = [];
for (const { path, type: parcelEventType } of parcelEvents) {
@ -339,7 +411,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
// Apply include filter if any
if (includes && includes.length > 0 && !includes.some(include => include(path))) {
if (!watcher.include(path)) {
if (this.verboseLogging) {
this.trace(` >> ignored (not included) ${path}`);
}
@ -351,7 +423,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
return events;
}
private handleParcelEvents(parcelEvents: IFileChange[], watcher: IParcelWatcherInstance): void {
private handleParcelEvents(parcelEvents: IFileChange[], watcher: ParcelWatcherInstance): void {
// Coalesce events: merge events of same kind
const coalescedEvents = coalesceEvents(parcelEvents);
@ -368,14 +440,20 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
}
private emitEvents(events: IFileChange[], watcher: IParcelWatcherInstance): void {
private emitEvents(events: IFileChange[], watcher: ParcelWatcherInstance): void {
if (events.length === 0) {
return;
}
// Logging
if (this.verboseLogging) {
for (const event of events) {
for (const event of events) {
// Emit to instance subscriptions if any
if (watcher.subscriptionsCount > 0) {
watcher.notifyFileChange(event.resource.fsPath, event);
}
// Logging
if (this.verboseLogging) {
this.traceEvent(event, watcher.request);
}
}
@ -446,7 +524,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
}
private filterEvents(events: IFileChange[], watcher: IParcelWatcherInstance): { events: IFileChange[]; rootDeleted?: boolean } {
private filterEvents(events: IFileChange[], watcher: ParcelWatcherInstance): { events: IFileChange[]; rootDeleted?: boolean } {
const filteredEvents: IFileChange[] = [];
let rootDeleted = false;
@ -475,29 +553,28 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
return { events: filteredEvents, rootDeleted };
}
private onWatchedPathDeleted(watcher: IParcelWatcherInstance): void {
private onWatchedPathDeleted(watcher: ParcelWatcherInstance): void {
this.warn('Watcher shutdown because watched path got deleted', watcher);
this._onDidWatchFail.fire(watcher.request);
// Do monitoring of the request path parent unless this request
// can be handled via suspend/resume in the super class
//
// TODO@bpasero we should remove this logic in favor of the
// support in the super class so that we have 1 consistent
// solution for handling this.
let legacyMonitored = false;
if (!this.isCorrelated(watcher.request)) {
this.legacyMonitorRequest(watcher);
// Do monitoring of the request path parent unless this request
// can be handled via suspend/resume in the super class
legacyMonitored = this.legacyMonitorRequest(watcher);
}
if (!legacyMonitored) {
watcher.notifyWatchFailed();
this._onDidWatchFail.fire(watcher.request);
}
}
private legacyMonitorRequest(watcher: IParcelWatcherInstance): void {
private legacyMonitorRequest(watcher: ParcelWatcherInstance): boolean {
const parentPath = dirname(watcher.request.path);
if (existsSync(parentPath)) {
this.trace('Trying to watch on the parent path to restart the watcher...', watcher);
const nodeWatcher = new NodeJSFileWatcherLibrary({ path: parentPath, excludes: [], recursive: false, correlationId: watcher.request.correlationId }, changes => {
const nodeWatcher = new NodeJSFileWatcherLibrary({ path: parentPath, excludes: [], recursive: false, correlationId: watcher.request.correlationId }, undefined, changes => {
if (watcher.token.isCancellationRequested) {
return; // return early when disposed
}
@ -522,10 +599,14 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
// Make sure to stop watching when the watcher is disposed
watcher.token.onCancellationRequested(() => nodeWatcher.dispose());
return true;
}
return false;
}
private onUnexpectedError(error: unknown, watcher?: IParcelWatcherInstance): void {
private onUnexpectedError(error: unknown, watcher?: ParcelWatcherInstance): void {
const msg = toErrorMessage(error);
// Specially handle ENOSPC errors that can happen when
@ -559,7 +640,7 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
}
}
protected restartWatching(watcher: IParcelWatcherInstance, delay = 800): void {
protected restartWatching(watcher: ParcelWatcherInstance, delay = 800): void {
// Restart watcher delayed to accomodate for
// changes on disk that have triggered the
@ -569,15 +650,21 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
return; // return early when disposed
}
// Await the watcher having stopped, as this is
// needed to properly re-watch the same path
await this.stopWatching(watcher);
const restartPromise = new DeferredPromise<void>();
try {
// Start watcher again counting the restarts
if (watcher.request.pollingInterval) {
this.startPolling(watcher.request, watcher.request.pollingInterval, watcher.restarts + 1);
} else {
this.startWatching(watcher.request, watcher.restarts + 1);
// Await the watcher having stopped, as this is
// needed to properly re-watch the same path
await this.stopWatching(watcher, restartPromise.p);
// Start watcher again counting the restarts
if (watcher.request.pollingInterval) {
this.startPolling(watcher.request, watcher.request.pollingInterval, watcher.restarts + 1);
} else {
this.startWatching(watcher.request, watcher.restarts + 1);
}
} finally {
restartPromise.complete();
}
}, delay);
@ -585,13 +672,13 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
watcher.token.onCancellationRequested(() => scheduler.dispose());
}
private async stopWatching(watcher: IParcelWatcherInstance): Promise<void> {
private async stopWatching(watcher: ParcelWatcherInstance, joinRestart?: Promise<void>): Promise<void> {
this.trace(`stopping file watcher`, watcher);
this.watchers.delete(watcher);
try {
await watcher.stop();
await watcher.stop(joinRestart);
} catch (error) {
this.error(`Unexpected error stopping watcher: ${toErrorMessage(error)}`, watcher);
}
@ -694,25 +781,63 @@ export class ParcelWatcher extends BaseWatcher implements IRecursiveWatcher {
return true;
}
subscribe(path: string, callback: (error: boolean, change?: IFileChange) => void): IDisposable | undefined {
for (const watcher of this.watchers) {
if (watcher.failed) {
continue; // watcher has already failed
}
if (!isEqualOrParent(path, watcher.request.path, !isLinux)) {
continue; // watcher does not consider this path
}
if (
watcher.exclude(path) ||
!watcher.include(path)
) {
continue; // parcel instance does not consider this path
}
const disposables = new DisposableStore();
disposables.add(Event.once(watcher.onDidStop)(async e => {
await e.joinRestart; // if we are restarting, await that so that we can possibly reuse this watcher again
if (disposables.isDisposed) {
return;
}
callback(true /* error */);
}));
disposables.add(Event.once(watcher.onDidFail)(() => callback(true /* error */)));
disposables.add(watcher.subscribe(path, change => callback(false, change)));
return disposables;
}
return undefined;
}
async setVerboseLogging(enabled: boolean): Promise<void> {
this.verboseLogging = enabled;
}
protected trace(message: string, watcher?: IParcelWatcherInstance): void {
protected trace(message: string, watcher?: ParcelWatcherInstance): void {
if (this.verboseLogging) {
this._onDidLogMessage.fire({ type: 'trace', message: this.toMessage(message, watcher) });
}
}
protected warn(message: string, watcher?: IParcelWatcherInstance) {
protected warn(message: string, watcher?: ParcelWatcherInstance) {
this._onDidLogMessage.fire({ type: 'warn', message: this.toMessage(message, watcher) });
}
private error(message: string, watcher: IParcelWatcherInstance | undefined) {
private error(message: string, watcher: ParcelWatcherInstance | undefined) {
this._onDidLogMessage.fire({ type: 'error', message: this.toMessage(message, watcher) });
}
private toMessage(message: string, watcher?: IParcelWatcherInstance): string {
private toMessage(message: string, watcher?: ParcelWatcherInstance): string {
return watcher ? `[File Watcher (parcel)] ${message} (path: ${watcher.request.path})` : `[File Watcher (parcel)] ${message}`;
}
protected get recursiveWatcher() { return this; }
}

View file

@ -4,29 +4,61 @@
*--------------------------------------------------------------------------------------------*/
import { Disposable } from 'vs/base/common/lifecycle';
import { IUniversalWatcher, IUniversalWatchRequest } from 'vs/platform/files/common/watcher';
import { Event } from 'vs/base/common/event';
import { ILogMessage, IUniversalWatcher, IUniversalWatchRequest } from 'vs/platform/files/common/watcher';
import { Emitter, Event } from 'vs/base/common/event';
import { ParcelWatcher } from 'vs/platform/files/node/watcher/parcel/parcelWatcher';
import { NodeJSWatcher } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcher';
import { Promises } from 'vs/base/common/async';
import { computeStats } from 'vs/platform/files/node/watcher/watcherStats';
export class UniversalWatcher extends Disposable implements IUniversalWatcher {
private readonly recursiveWatcher = this._register(new ParcelWatcher());
private readonly nonRecursiveWatcher = this._register(new NodeJSWatcher());
private readonly nonRecursiveWatcher = this._register(new NodeJSWatcher(this.recursiveWatcher));
readonly onDidChangeFile = Event.any(this.recursiveWatcher.onDidChangeFile, this.nonRecursiveWatcher.onDidChangeFile);
readonly onDidLogMessage = Event.any(this.recursiveWatcher.onDidLogMessage, this.nonRecursiveWatcher.onDidLogMessage);
readonly onDidError = Event.any(this.recursiveWatcher.onDidError, this.nonRecursiveWatcher.onDidError);
private readonly _onDidLogMessage = this._register(new Emitter<ILogMessage>());
readonly onDidLogMessage = Event.any(this._onDidLogMessage.event, this.recursiveWatcher.onDidLogMessage, this.nonRecursiveWatcher.onDidLogMessage);
private requests: IUniversalWatchRequest[] = [];
async watch(requests: IUniversalWatchRequest[]): Promise<void> {
await Promises.settled([
this.recursiveWatcher.watch(requests.filter(request => request.recursive)),
this.nonRecursiveWatcher.watch(requests.filter(request => !request.recursive))
]);
this.requests = requests;
// Watch recursively first to give recursive watchers a chance
// to step in for non-recursive watch requests, thus reducing
// watcher duplication.
let error: Error | undefined;
try {
await this.recursiveWatcher.watch(requests.filter(request => request.recursive));
} catch (e) {
error = e;
}
try {
await this.nonRecursiveWatcher.watch(requests.filter(request => !request.recursive));
} catch (e) {
if (!error) {
error = e;
}
}
if (error) {
throw error;
}
}
async setVerboseLogging(enabled: boolean): Promise<void> {
// Log stats
if (enabled && this.requests.length > 0) {
this._onDidLogMessage.fire({ type: 'trace', message: computeStats(this.requests, this.recursiveWatcher, this.nonRecursiveWatcher) });
}
// Forward to watchers
await Promises.settled([
this.recursiveWatcher.setVerboseLogging(enabled),
this.nonRecursiveWatcher.setVerboseLogging(enabled)

View file

@ -0,0 +1,207 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { IUniversalWatchRequest } from 'vs/platform/files/common/watcher';
import { INodeJSWatcherInstance, NodeJSWatcher } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcher';
import { ParcelWatcher, ParcelWatcherInstance } from 'vs/platform/files/node/watcher/parcel/parcelWatcher';
export function computeStats(
requests: IUniversalWatchRequest[],
recursiveWatcher: ParcelWatcher,
nonRecursiveWatcher: NodeJSWatcher
): string {
const lines: string[] = [];
const recursiveRequests = sortByPathPrefix(requests.filter(request => request.recursive));
const recursiveRequestsStatus = computeRequestStatus(recursiveRequests, recursiveWatcher);
const recursiveWatcherStatus = computeRecursiveWatchStatus(recursiveWatcher);
const nonRecursiveRequests = sortByPathPrefix(requests.filter(request => !request.recursive));
const nonRecursiveRequestsStatus = computeRequestStatus(nonRecursiveRequests, nonRecursiveWatcher);
const nonRecursiveWatcherStatus = computeNonRecursiveWatchStatus(nonRecursiveWatcher);
lines.push('[Summary]');
lines.push(`- Recursive Requests: total: ${recursiveRequests.length}, suspended: ${recursiveRequestsStatus.suspended}, polling: ${recursiveRequestsStatus.polling}`);
lines.push(`- Non-Recursive Requests: total: ${nonRecursiveRequests.length}, suspended: ${nonRecursiveRequestsStatus.suspended}, polling: ${nonRecursiveRequestsStatus.polling}`);
lines.push(`- Recursive Watchers: total: ${recursiveWatcher.watchers.size}, active: ${recursiveWatcherStatus.active}, failed: ${recursiveWatcherStatus.failed}, stopped: ${recursiveWatcherStatus.stopped}`);
lines.push(`- Non-Recursive Watchers: total: ${nonRecursiveWatcher.watchers.size}, active: ${nonRecursiveWatcherStatus.active}, failed: ${nonRecursiveWatcherStatus.failed}, reusing: ${nonRecursiveWatcherStatus.reusing}`);
lines.push(`- I/O Handles Impact: total: ${recursiveRequestsStatus.polling + nonRecursiveRequestsStatus.polling + recursiveWatcherStatus.active + nonRecursiveWatcherStatus.active}`);
lines.push(`\n[Recursive Requests (${recursiveRequests.length}, suspended: ${recursiveRequestsStatus.suspended}, polling: ${recursiveRequestsStatus.polling})]:`);
for (const request of recursiveRequests) {
fillRequestStats(lines, request, recursiveWatcher);
}
lines.push(`\n[Non-Recursive Requests (${nonRecursiveRequests.length}, suspended: ${nonRecursiveRequestsStatus.suspended}, polling: ${nonRecursiveRequestsStatus.polling})]:`);
for (const request of nonRecursiveRequests) {
fillRequestStats(lines, request, nonRecursiveWatcher);
}
fillRecursiveWatcherStats(lines, recursiveWatcher);
fillNonRecursiveWatcherStats(lines, nonRecursiveWatcher);
let maxLength = 0;
for (const line of lines) {
maxLength = Math.max(maxLength, line.split('\t')[0].length);
}
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
const parts = line.split('\t');
if (parts.length === 2) {
const padding = ' '.repeat(maxLength - parts[0].length);
lines[i] = `${parts[0]}${padding}\t${parts[1]}`;
}
}
return `\n\n[File Watcher] request stats:\n\n${lines.join('\n')}\n\n`;
}
function computeRequestStatus(requests: IUniversalWatchRequest[], watcher: ParcelWatcher | NodeJSWatcher): { suspended: number; polling: number } {
let polling = 0;
let suspended = 0;
for (const request of requests) {
const isSuspended = watcher.isSuspended(request);
if (isSuspended === false) {
continue;
}
suspended++;
if (isSuspended === 'polling') {
polling++;
}
}
return { suspended, polling };
}
function computeRecursiveWatchStatus(recursiveWatcher: ParcelWatcher): { active: number; failed: number; stopped: number } {
let active = 0;
let failed = 0;
let stopped = 0;
for (const watcher of recursiveWatcher.watchers.values()) {
if (!watcher.failed && !watcher.stopped) {
active++;
}
if (watcher.failed) {
failed++;
}
if (watcher.stopped) {
stopped++;
}
}
return { active, failed, stopped };
}
function computeNonRecursiveWatchStatus(nonRecursiveWatcher: NodeJSWatcher): { active: number; failed: number; reusing: number } {
let active = 0;
let failed = 0;
let reusing = 0;
for (const watcher of nonRecursiveWatcher.watchers) {
if (!watcher.instance.failed && !watcher.instance.isReusingRecursiveWatcher) {
active++;
}
if (watcher.instance.failed) {
failed++;
}
if (watcher.instance.isReusingRecursiveWatcher) {
reusing++;
}
}
return { active, failed, reusing };
}
function sortByPathPrefix(requests: IUniversalWatchRequest[]): IUniversalWatchRequest[];
function sortByPathPrefix(requests: INodeJSWatcherInstance[]): INodeJSWatcherInstance[];
function sortByPathPrefix(requests: ParcelWatcherInstance[]): ParcelWatcherInstance[];
function sortByPathPrefix(requests: IUniversalWatchRequest[] | INodeJSWatcherInstance[] | ParcelWatcherInstance[]): IUniversalWatchRequest[] | INodeJSWatcherInstance[] | ParcelWatcherInstance[] {
requests.sort((r1, r2) => {
const p1 = isUniversalWatchRequest(r1) ? r1.path : r1.request.path;
const p2 = isUniversalWatchRequest(r2) ? r2.path : r2.request.path;
const minLength = Math.min(p1.length, p2.length);
for (let i = 0; i < minLength; i++) {
if (p1[i] !== p2[i]) {
return (p1[i] < p2[i]) ? -1 : 1;
}
}
return p1.length - p2.length;
});
return requests;
}
function isUniversalWatchRequest(obj: unknown): obj is IUniversalWatchRequest {
const candidate = obj as IUniversalWatchRequest | undefined;
return typeof candidate?.path === 'string';
}
function fillRequestStats(lines: string[], request: IUniversalWatchRequest, watcher: ParcelWatcher | NodeJSWatcher): void {
const decorations = [];
const suspended = watcher.isSuspended(request);
if (suspended !== false) {
if (suspended === 'polling') {
decorations.push('[SUSPENDED <polling>]');
} else {
decorations.push('[SUSPENDED <non-polling>]');
}
}
lines.push(`${request.path}\t${decorations.length > 0 ? decorations.join(' ') + ' ' : ''}(${requestDetailsToString(request)})`);
}
function requestDetailsToString(request: IUniversalWatchRequest): string {
return `excludes: ${request.excludes.length > 0 ? request.excludes : '<none>'}, includes: ${request.includes && request.includes.length > 0 ? JSON.stringify(request.includes) : '<all>'}, correlationId: ${typeof request.correlationId === 'number' ? request.correlationId : '<none>'}`;
}
function fillRecursiveWatcherStats(lines: string[], recursiveWatcher: ParcelWatcher): void {
const watchers = sortByPathPrefix(Array.from(recursiveWatcher.watchers.values()));
const { active, failed, stopped } = computeRecursiveWatchStatus(recursiveWatcher);
lines.push(`\n[Recursive Watchers (${watchers.length}, active: ${active}, failed: ${failed}, stopped: ${stopped})]:`);
for (const watcher of watchers) {
const decorations = [];
if (watcher.failed) {
decorations.push('[FAILED]');
}
if (watcher.stopped) {
decorations.push('[STOPPED]');
}
if (watcher.subscriptionsCount > 0) {
decorations.push(`[SUBSCRIBED:${watcher.subscriptionsCount}]`);
}
if (watcher.restarts > 0) {
decorations.push(`[RESTARTED:${watcher.restarts}]`);
}
lines.push(`${watcher.request.path}\t${decorations.length > 0 ? decorations.join(' ') + ' ' : ''}(${requestDetailsToString(watcher.request)})`);
}
}
function fillNonRecursiveWatcherStats(lines: string[], nonRecursiveWatcher: NodeJSWatcher): void {
const watchers = sortByPathPrefix(Array.from(nonRecursiveWatcher.watchers.values()));
const { active, failed, reusing } = computeNonRecursiveWatchStatus(nonRecursiveWatcher);
lines.push(`\n[Non-Recursive Watchers (${watchers.length}, active: ${active}, failed: ${failed}, reusing: ${reusing})]:`);
for (const watcher of watchers) {
const decorations = [];
if (watcher.instance.failed) {
decorations.push('[FAILED]');
}
if (watcher.instance.isReusingRecursiveWatcher) {
decorations.push('[REUSING]');
}
lines.push(`${watcher.request.path}\t${decorations.length > 0 ? decorations.join(' ') + ' ' : ''}(${requestDetailsToString(watcher.request)})`);
}
}

View file

@ -3,12 +3,13 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import * as assert from 'assert';
import { tmpdir } from 'os';
import { basename, dirname, join } from 'vs/base/common/path';
import { Promises, RimRafMode } from 'vs/base/node/pfs';
import { flakySuite, getRandomTestPath } from 'vs/base/test/node/testUtils';
import { FileChangeType } from 'vs/platform/files/common/files';
import { INonRecursiveWatchRequest } from 'vs/platform/files/common/watcher';
import { INonRecursiveWatchRequest, IRecursiveWatcherWithSubscribe } from 'vs/platform/files/common/watcher';
import { watchFileContents } from 'vs/platform/files/node/watcher/nodejs/nodejsWatcherLib';
import { isLinux, isMacintosh, isWindows } from 'vs/base/common/platform';
import { getDriveLetter } from 'vs/base/common/extpath';
@ -21,6 +22,7 @@ import { extUriBiasedIgnorePathCase } from 'vs/base/common/resources';
import { URI } from 'vs/base/common/uri';
import { addUNCHostToAllowlist } from 'vs/base/node/unc';
import { Emitter, Event } from 'vs/base/common/event';
import { TestParcelWatcher } from 'vs/platform/files/test/node/parcelWatcher.integrationTest';
// this suite has shown flaky runs in Azure pipelines where
// tasks would just hang and timeout after a while (not in
@ -61,7 +63,20 @@ import { Emitter, Event } from 'vs/base/common/event';
enableLogging(false);
setup(async () => {
watcher = new TestNodeJSWatcher();
await createWatcher(undefined);
testDir = URI.file(getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher')).fsPath;
const sourceDir = FileAccess.asFileUri('vs/platform/files/test/node/fixtures/service').fsPath;
await Promises.copy(sourceDir, testDir, { preserveSymlinks: false });
});
async function createWatcher(accessor: IRecursiveWatcherWithSubscribe | undefined) {
await watcher?.stop();
watcher?.dispose();
watcher = new TestNodeJSWatcher(accessor);
watcher?.setVerboseLogging(loggingEnabled);
watcher.onDidLogMessage(e => {
@ -75,13 +90,7 @@ import { Emitter, Event } from 'vs/base/common/event';
console.log(`[non-recursive watcher test error] ${e}`);
}
});
testDir = getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher');
const sourceDir = FileAccess.asFileUri('vs/platform/files/test/node/fixtures/service').fsPath;
await Promises.copy(sourceDir, testDir, { preserveSymlinks: false });
});
}
teardown(async () => {
await watcher.stop();
@ -128,7 +137,13 @@ import { Emitter, Event } from 'vs/base/common/event';
}
test('basics (folder watch)', async function () {
await watcher.watch([{ path: testDir, excludes: [], recursive: false }]);
const request = { path: testDir, excludes: [], recursive: false };
await watcher.watch([request]);
assert.strictEqual(watcher.isSuspended(request), false);
const instance = Array.from(watcher.watchers)[0].instance;
assert.strictEqual(instance.isReusingRecursiveWatcher, false);
assert.strictEqual(instance.failed, false);
// New file
const newFilePath = join(testDir, 'newFile.txt');
@ -236,7 +251,13 @@ import { Emitter, Event } from 'vs/base/common/event';
test('basics (file watch)', async function () {
const filePath = join(testDir, 'lorem.txt');
await watcher.watch([{ path: filePath, excludes: [], recursive: false }]);
const request = { path: filePath, excludes: [], recursive: false };
await watcher.watch([request]);
assert.strictEqual(watcher.isSuspended(request), false);
const instance = Array.from(watcher.watchers)[0].instance;
assert.strictEqual(instance.isReusingRecursiveWatcher, false);
assert.strictEqual(instance.failed, false);
// Change file
let changeFuture = awaitEvent(watcher, filePath, FileChangeType.UPDATED);
@ -554,17 +575,20 @@ import { Emitter, Event } from 'vs/base/common/event';
await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId: 1 }]);
const instance = Array.from(watcher.watchers)[0].instance;
const onDidWatchFail = Event.toPromise(watcher.onWatchFail);
const changeFuture = awaitEvent(watcher, filePath, FileChangeType.DELETED, 1);
Promises.unlink(filePath);
await onDidWatchFail;
await changeFuture;
assert.strictEqual(instance.failed, true);
});
(isMacintosh /* macOS: does not seem to report deletes on folders */ ? test.skip : test)('deleting watched path emits watcher fail and delete event when correlated (folder watch)', async function () {
(isMacintosh || isWindows /* macOS: does not seem to report deletes on folders | Windows: reports on('error') event only */ ? test.skip : test)('deleting watched path emits watcher fail and delete event when correlated (folder watch)', async function () {
const folderPath = join(testDir, 'deep');
await watcher.watch([{ path: folderPath, excludes: [], recursive: false }]);
await watcher.watch([{ path: folderPath, excludes: [], recursive: false, correlationId: 1 }]);
const onDidWatchFail = Event.toPromise(watcher.onWatchFail);
const changeFuture = awaitEvent(watcher, folderPath, FileChangeType.DELETED, 1);
@ -577,8 +601,10 @@ import { Emitter, Event } from 'vs/base/common/event';
const filePath = join(testDir, 'not-found.txt');
const onDidWatchFail = Event.toPromise(watcher.onWatchFail);
await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId: 1 }]);
const request = { path: filePath, excludes: [], recursive: false, correlationId: 1 };
await watcher.watch([request]);
await onDidWatchFail;
assert.strictEqual(watcher.isSuspended(request), 'polling');
await basicCrudTest(filePath, undefined, 1, undefined, true);
await basicCrudTest(filePath, undefined, 1, undefined, true);
@ -586,11 +612,13 @@ import { Emitter, Event } from 'vs/base/common/event';
test('correlated watch requests support suspend/resume (file, exists in beginning)', async function () {
const filePath = join(testDir, 'lorem.txt');
await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId: 1 }]);
const request = { path: filePath, excludes: [], recursive: false, correlationId: 1 };
await watcher.watch([request]);
const onDidWatchFail = Event.toPromise(watcher.onWatchFail);
await basicCrudTest(filePath, true, 1);
await onDidWatchFail;
assert.strictEqual(watcher.isSuspended(request), 'polling');
await basicCrudTest(filePath, undefined, 1, undefined, true);
});
@ -599,8 +627,10 @@ import { Emitter, Event } from 'vs/base/common/event';
let onDidWatchFail = Event.toPromise(watcher.onWatchFail);
const folderPath = join(testDir, 'not-found');
await watcher.watch([{ path: folderPath, excludes: [], recursive: false, correlationId: 1 }]);
const request = { path: folderPath, excludes: [], recursive: false, correlationId: 1 };
await watcher.watch([request]);
await onDidWatchFail;
assert.strictEqual(watcher.isSuspended(request), 'polling');
let changeFuture = awaitEvent(watcher, folderPath, FileChangeType.ADDED, 1);
let onDidWatch = Event.toPromise(watcher.onDidWatch);
@ -608,6 +638,8 @@ import { Emitter, Event } from 'vs/base/common/event';
await changeFuture;
await onDidWatch;
assert.strictEqual(watcher.isSuspended(request), false);
const filePath = join(folderPath, 'newFile.txt');
await basicCrudTest(filePath, undefined, 1);
@ -649,4 +681,83 @@ import { Emitter, Event } from 'vs/base/common/event';
await basicCrudTest(filePath, undefined, 1);
});
test('parcel watcher reused when present for non-recursive file watching (uncorrelated)', function () {
return testParcelWatcherReused(undefined);
});
test('parcel watcher reused when present for non-recursive file watching (correlated)', function () {
return testParcelWatcherReused(2);
});
function createParcelWatcher() {
const recursiveWatcher = new TestParcelWatcher();
recursiveWatcher.setVerboseLogging(loggingEnabled);
recursiveWatcher.onDidLogMessage(e => {
if (loggingEnabled) {
console.log(`[recursive watcher test message] ${e.message}`);
}
});
recursiveWatcher.onDidError(e => {
if (loggingEnabled) {
console.log(`[recursive watcher test error] ${e}`);
}
});
return recursiveWatcher;
}
async function testParcelWatcherReused(correlationId: number | undefined) {
const recursiveWatcher = createParcelWatcher();
await recursiveWatcher.watch([{ path: testDir, excludes: [], recursive: true, correlationId: 1 }]);
const recursiveInstance = Array.from(recursiveWatcher.watchers)[0];
assert.strictEqual(recursiveInstance.subscriptionsCount, 0);
await createWatcher(recursiveWatcher);
const filePath = join(testDir, 'deep', 'conway.js');
await watcher.watch([{ path: filePath, excludes: [], recursive: false, correlationId }]);
const { instance } = Array.from(watcher.watchers)[0];
assert.strictEqual(instance.isReusingRecursiveWatcher, true);
assert.strictEqual(recursiveInstance.subscriptionsCount, 1);
let changeFuture = awaitEvent(watcher, filePath, isMacintosh /* somehow fsevents seems to report still on the initial create from test setup */ ? FileChangeType.ADDED : FileChangeType.UPDATED, correlationId);
await Promises.writeFile(filePath, 'Hello World');
await changeFuture;
await recursiveWatcher.stop();
recursiveWatcher.dispose();
await timeout(500); // give the watcher some time to restart
changeFuture = awaitEvent(watcher, filePath, FileChangeType.UPDATED, correlationId);
await Promises.writeFile(filePath, 'Hello World');
await changeFuture;
assert.strictEqual(instance.isReusingRecursiveWatcher, false);
}
test('correlated watch requests support suspend/resume (file, does not exist in beginning, parcel watcher reused)', async function () {
const recursiveWatcher = createParcelWatcher();
await recursiveWatcher.watch([{ path: testDir, excludes: [], recursive: true }]);
await createWatcher(recursiveWatcher);
const filePath = join(testDir, 'not-found-2.txt');
const onDidWatchFail = Event.toPromise(watcher.onWatchFail);
const request = { path: filePath, excludes: [], recursive: false, correlationId: 1 };
await watcher.watch([request]);
await onDidWatchFail;
assert.strictEqual(watcher.isSuspended(request), true);
const changeFuture = awaitEvent(watcher, filePath, FileChangeType.ADDED, 1);
await Promises.writeFile(filePath, 'Hello World');
await changeFuture;
assert.strictEqual(watcher.isSuspended(request), false);
});
});

View file

@ -21,6 +21,40 @@ import { extUriBiasedIgnorePathCase } from 'vs/base/common/resources';
import { URI } from 'vs/base/common/uri';
import { addUNCHostToAllowlist } from 'vs/base/node/unc';
import { Emitter, Event } from 'vs/base/common/event';
import { DisposableStore } from 'vs/base/common/lifecycle';
export class TestParcelWatcher extends ParcelWatcher {
protected override readonly suspendedWatchRequestPollingInterval = 100;
private readonly _onDidWatch = this._register(new Emitter<void>());
readonly onDidWatch = this._onDidWatch.event;
readonly onWatchFail = this._onDidWatchFail.event;
testRemoveDuplicateRequests(paths: string[], excludes: string[] = []): string[] {
// Work with strings as paths to simplify testing
const requests: IRecursiveWatchRequest[] = paths.map(path => {
return { path, excludes, recursive: true };
});
return this.removeDuplicateRequests(requests, false /* validate paths skipped for tests */).map(request => request.path);
}
protected override async doWatch(requests: IRecursiveWatchRequest[]): Promise<void> {
await super.doWatch(requests);
await this.whenReady();
this._onDidWatch.fire();
}
async whenReady(): Promise<void> {
for (const watcher of this.watchers) {
await watcher.ready;
}
}
}
// this suite has shown flaky runs in Azure pipelines where
// tasks would just hang and timeout after a while (not in
@ -29,39 +63,6 @@ import { Emitter, Event } from 'vs/base/common/event';
((process.env['BUILD_SOURCEVERSION'] || process.env['CI']) ? suite.skip : flakySuite)('File Watcher (parcel)', () => {
class TestParcelWatcher extends ParcelWatcher {
protected override readonly suspendedWatchRequestPollingInterval = 100;
private readonly _onDidWatch = this._register(new Emitter<void>());
readonly onDidWatch = this._onDidWatch.event;
readonly onWatchFail = this._onDidWatchFail.event;
testRemoveDuplicateRequests(paths: string[], excludes: string[] = []): string[] {
// Work with strings as paths to simplify testing
const requests: IRecursiveWatchRequest[] = paths.map(path => {
return { path, excludes, recursive: true };
});
return this.removeDuplicateRequests(requests, false /* validate paths skipped for tests */).map(request => request.path);
}
protected override async doWatch(requests: IRecursiveWatchRequest[]): Promise<void> {
await super.doWatch(requests);
await this.whenReady();
this._onDidWatch.fire();
}
async whenReady(): Promise<void> {
for (const watcher of this.watchers) {
await watcher.ready;
}
}
}
let testDir: string;
let watcher: TestParcelWatcher;
@ -90,7 +91,7 @@ import { Emitter, Event } from 'vs/base/common/event';
}
});
testDir = getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher');
testDir = URI.file(getRandomTestPath(tmpdir(), 'vsctests', 'filewatcher')).fsPath;
const sourceDir = FileAccess.asFileUri('vs/platform/files/test/node/fixtures/service').fsPath;
@ -98,7 +99,18 @@ import { Emitter, Event } from 'vs/base/common/event';
});
teardown(async () => {
const watchers = watcher.watchers.size;
let stoppedInstances = 0;
for (const instance of watcher.watchers) {
Event.once(instance.onDidStop)(() => {
if (instance.stopped) {
stoppedInstances++;
}
});
}
await watcher.stop();
assert.strictEqual(stoppedInstances, watchers, 'All watchers must be stopped before the test ends');
watcher.dispose();
// Possible that the file watcher is still holding
@ -170,37 +182,68 @@ import { Emitter, Event } from 'vs/base/common/event';
}
test('basics', async function () {
await watcher.watch([{ path: testDir, excludes: [], recursive: true }]);
const request = { path: testDir, excludes: [], recursive: true };
await watcher.watch([request]);
assert.strictEqual(watcher.watchers.size, watcher.watchers.size);
const instance = Array.from(watcher.watchers)[0];
assert.strictEqual(request, instance.request);
assert.strictEqual(instance.failed, false);
assert.strictEqual(instance.stopped, false);
const disposables = new DisposableStore();
const subscriptions1 = new Map<string, FileChangeType>();
const subscriptions2 = new Map<string, FileChangeType>();
// New file
const newFilePath = join(testDir, 'deep', 'newFile.txt');
disposables.add(instance.subscribe(newFilePath, change => subscriptions1.set(change.resource.fsPath, change.type)));
disposables.add(instance.subscribe(newFilePath, change => subscriptions2.set(change.resource.fsPath, change.type))); // can subscribe multiple times
assert.strictEqual(instance.include(newFilePath), true);
assert.strictEqual(instance.exclude(newFilePath), false);
let changeFuture: Promise<unknown> = awaitEvent(watcher, newFilePath, FileChangeType.ADDED);
await Promises.writeFile(newFilePath, 'Hello World');
await changeFuture;
assert.strictEqual(subscriptions1.get(newFilePath), FileChangeType.ADDED);
assert.strictEqual(subscriptions2.get(newFilePath), FileChangeType.ADDED);
// New folder
const newFolderPath = join(testDir, 'deep', 'New Folder');
disposables.add(instance.subscribe(newFolderPath, change => subscriptions1.set(change.resource.fsPath, change.type)));
const disposable = instance.subscribe(newFolderPath, change => subscriptions2.set(change.resource.fsPath, change.type));
disposable.dispose();
assert.strictEqual(instance.include(newFolderPath), true);
assert.strictEqual(instance.exclude(newFolderPath), false);
changeFuture = awaitEvent(watcher, newFolderPath, FileChangeType.ADDED);
await Promises.mkdir(newFolderPath);
await changeFuture;
assert.strictEqual(subscriptions1.get(newFolderPath), FileChangeType.ADDED);
assert.strictEqual(subscriptions2.has(newFolderPath), false /* subscription was disposed before the event */);
// Rename file
let renamedFilePath = join(testDir, 'deep', 'renamedFile.txt');
disposables.add(instance.subscribe(renamedFilePath, change => subscriptions1.set(change.resource.fsPath, change.type)));
changeFuture = Promise.all([
awaitEvent(watcher, newFilePath, FileChangeType.DELETED),
awaitEvent(watcher, renamedFilePath, FileChangeType.ADDED)
]);
await Promises.rename(newFilePath, renamedFilePath);
await changeFuture;
assert.strictEqual(subscriptions1.get(newFilePath), FileChangeType.DELETED);
assert.strictEqual(subscriptions1.get(renamedFilePath), FileChangeType.ADDED);
// Rename folder
let renamedFolderPath = join(testDir, 'deep', 'Renamed Folder');
disposables.add(instance.subscribe(renamedFolderPath, change => subscriptions1.set(change.resource.fsPath, change.type)));
changeFuture = Promise.all([
awaitEvent(watcher, newFolderPath, FileChangeType.DELETED),
awaitEvent(watcher, renamedFolderPath, FileChangeType.ADDED)
]);
await Promises.rename(newFolderPath, renamedFolderPath);
await changeFuture;
assert.strictEqual(subscriptions1.get(newFolderPath), FileChangeType.DELETED);
assert.strictEqual(subscriptions1.get(renamedFolderPath), FileChangeType.ADDED);
// Rename file (same name, different case)
const caseRenamedFilePath = join(testDir, 'deep', 'RenamedFile.txt');
@ -280,13 +323,19 @@ import { Emitter, Event } from 'vs/base/common/event';
// Delete file
changeFuture = awaitEvent(watcher, copiedFilepath, FileChangeType.DELETED);
disposables.add(instance.subscribe(copiedFilepath, change => subscriptions1.set(change.resource.fsPath, change.type)));
await Promises.unlink(copiedFilepath);
await changeFuture;
assert.strictEqual(subscriptions1.get(copiedFilepath), FileChangeType.DELETED);
// Delete folder
changeFuture = awaitEvent(watcher, copiedFolderpath, FileChangeType.DELETED);
disposables.add(instance.subscribe(copiedFolderpath, change => subscriptions1.set(change.resource.fsPath, change.type)));
await Promises.rmdir(copiedFolderpath);
await changeFuture;
assert.strictEqual(subscriptions1.get(copiedFolderpath), FileChangeType.DELETED);
disposables.dispose();
});
(isMacintosh /* this test seems not possible with fsevents backend */ ? test.skip : test)('basics (atomic writes)', async function () {
@ -676,26 +725,59 @@ import { Emitter, Event } from 'vs/base/common/event';
await watcher.watch([{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]);
let failed = false;
const instance = Array.from(watcher.watchers)[0];
assert.strictEqual(instance.include(folderPath), true);
instance.onDidFail(() => failed = true);
const onDidWatchFail = Event.toPromise(watcher.onWatchFail);
const changeFuture = awaitEvent(watcher, folderPath, FileChangeType.DELETED, undefined, 1);
Promises.rm(folderPath, RimRafMode.UNLINK);
await onDidWatchFail;
await changeFuture;
assert.strictEqual(failed, true);
assert.strictEqual(instance.failed, true);
});
test('correlated watch requests support suspend/resume (folder, does not exist in beginning)', async () => {
test('correlated watch requests support suspend/resume (folder, does not exist in beginning, not reusing watcher)', async () => {
await testCorrelatedWatchFolderDoesNotExist(false);
});
test('correlated watch requests support suspend/resume (folder, does not exist in beginning, reusing watcher)', async () => {
await testCorrelatedWatchFolderDoesNotExist(true);
});
async function testCorrelatedWatchFolderDoesNotExist(reuseExistingWatcher: boolean) {
let onDidWatchFail = Event.toPromise(watcher.onWatchFail);
const folderPath = join(testDir, 'not-found');
await watcher.watch([{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]);
const requests: IRecursiveWatchRequest[] = [];
if (reuseExistingWatcher) {
requests.push({ path: testDir, excludes: [], recursive: true });
await watcher.watch(requests);
}
const request: IRecursiveWatchRequest = { path: folderPath, excludes: [], recursive: true, correlationId: 1 };
requests.push(request);
await watcher.watch(requests);
await onDidWatchFail;
if (reuseExistingWatcher) {
assert.strictEqual(watcher.isSuspended(request), true);
} else {
assert.strictEqual(watcher.isSuspended(request), 'polling');
}
let changeFuture = awaitEvent(watcher, folderPath, FileChangeType.ADDED, undefined, 1);
let onDidWatch = Event.toPromise(watcher.onDidWatch);
await Promises.mkdir(folderPath);
await changeFuture;
await onDidWatch;
assert.strictEqual(watcher.isSuspended(request), false);
const filePath = join(folderPath, 'newFile.txt');
await basicCrudTest(filePath, 1);
@ -710,11 +792,25 @@ import { Emitter, Event } from 'vs/base/common/event';
await onDidWatch;
await basicCrudTest(filePath, 1);
}
test('correlated watch requests support suspend/resume (folder, exist in beginning, not reusing watcher)', async () => {
await testCorrelatedWatchFolderExists(false);
});
test('correlated watch requests support suspend/resume (folder, exist in beginning)', async () => {
test('correlated watch requests support suspend/resume (folder, exist in beginning, reusing watcher)', async () => {
await testCorrelatedWatchFolderExists(true);
});
async function testCorrelatedWatchFolderExists(reuseExistingWatcher: boolean) {
const folderPath = join(testDir, 'deep');
await watcher.watch([{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }]);
const requests: IRecursiveWatchRequest[] = [{ path: folderPath, excludes: [], recursive: true, correlationId: 1 }];
if (reuseExistingWatcher) {
requests.push({ path: testDir, excludes: [], recursive: true });
}
await watcher.watch(requests);
const filePath = join(folderPath, 'newFile.txt');
await basicCrudTest(filePath, 1);
@ -730,5 +826,25 @@ import { Emitter, Event } from 'vs/base/common/event';
await onDidWatch;
await basicCrudTest(filePath, 1);
}
test('watch request reuses another recursive watcher even when requests are coming in at the same time', async function () {
const folderPath1 = join(testDir, 'deep', 'not-existing1');
const folderPath2 = join(testDir, 'deep', 'not-existing2');
const folderPath3 = join(testDir, 'not-existing3');
const requests: IRecursiveWatchRequest[] = [
{ path: folderPath1, excludes: [], recursive: true, correlationId: 1 },
{ path: folderPath2, excludes: [], recursive: true, correlationId: 2 },
{ path: folderPath3, excludes: [], recursive: true, correlationId: 3 },
{ path: join(testDir, 'deep'), excludes: [], recursive: true }
];
await watcher.watch(requests);
assert.strictEqual(watcher.isSuspended(requests[0]), true);
assert.strictEqual(watcher.isSuspended(requests[1]), true);
assert.strictEqual(watcher.isSuspended(requests[2]), 'polling');
assert.strictEqual(watcher.isSuspended(requests[3]), false);
});
});

View file

@ -240,7 +240,7 @@ export class DesktopMain extends Disposable {
serviceCollection.set(IRemoteAuthorityResolverService, remoteAuthorityResolverService);
// Local Files
const diskFileSystemProvider = this._register(new DiskFileSystemProvider(mainProcessService, utilityProcessWorkerWorkbenchService, logService));
const diskFileSystemProvider = this._register(new DiskFileSystemProvider(mainProcessService, utilityProcessWorkerWorkbenchService, logService, loggerService));
fileService.registerProvider(Schemas.file, diskFileSystemProvider);
// URI Identity

View file

@ -3,6 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { localize } from 'vs/nls';
import { Event } from 'vs/base/common/event';
import { isLinux } from 'vs/base/common/platform';
import { FileSystemProviderCapabilities, IFileDeleteOptions, IStat, FileType, IFileReadStreamOptions, IFileWriteOptions, IFileOpenOptions, IFileOverwriteOptions, IFileSystemProviderWithFileReadWriteCapability, IFileSystemProviderWithOpenReadWriteCloseCapability, IFileSystemProviderWithFileReadStreamCapability, IFileSystemProviderWithFileFolderCopyCapability, IFileSystemProviderWithFileAtomicReadCapability, IFileAtomicReadOptions, IFileSystemProviderWithFileCloneCapability, IFileChange } from 'vs/platform/files/common/files';
@ -14,8 +15,9 @@ import { URI } from 'vs/base/common/uri';
import { DiskFileSystemProviderClient, LOCAL_FILE_SYSTEM_CHANNEL_NAME } from 'vs/platform/files/common/diskFileSystemProviderClient';
import { ILogMessage, AbstractUniversalWatcherClient } from 'vs/platform/files/common/watcher';
import { UniversalWatcherClient } from 'vs/workbench/services/files/electron-sandbox/watcherClient';
import { ILogService } from 'vs/platform/log/common/log';
import { ILoggerService, ILogService } from 'vs/platform/log/common/log';
import { IUtilityProcessWorkerWorkbenchService } from 'vs/workbench/services/utilityProcess/electron-sandbox/utilityProcessWorkerWorkbenchService';
import { LogService } from 'vs/platform/log/common/logService';
/**
* A sandbox ready disk file system provider that delegates almost all calls
@ -35,7 +37,8 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
constructor(
private readonly mainProcessService: IMainProcessService,
private readonly utilityProcessWorkerWorkbenchService: IUtilityProcessWorkerWorkbenchService,
logService: ILogService
logService: ILogService,
private readonly loggerService: ILoggerService
) {
super(logService, { watcher: { forceUniversal: true /* send all requests to universal watcher process */ } });
@ -143,5 +146,22 @@ export class DiskFileSystemProvider extends AbstractDiskFileSystemProvider imple
throw new Error('Method not implemented in sandbox.'); // we never expect this to be called given we set `forceUniversal: true`
}
private _watcherLogService: ILogService | undefined = undefined;
private get watcherLogService(): ILogService {
if (!this._watcherLogService) {
this._watcherLogService = new LogService(this.loggerService.createLogger('fileWatcher', { name: localize('fileWatcher', "File Watcher") }));
}
return this._watcherLogService;
}
protected override logWatcherMessage(msg: ILogMessage): void {
this.watcherLogService[msg.type](msg.message);
if (msg.type !== 'trace' && msg.type !== 'debug') {
super.logWatcherMessage(msg); // allow non-verbose log messages in window log
}
}
//#endregion
}