debt - adopt IPC utilities for chokidar watcher service

This commit is contained in:
Benjamin Pasero 2020-09-16 10:40:14 +02:00
parent 795b0dba8d
commit 70953ac5de
8 changed files with 65 additions and 358 deletions

View file

@ -5,7 +5,6 @@
import * as assert from 'assert';
import * as platform from 'vs/base/common/platform';
import { NsfwWatcherService } from 'vs/platform/files/node/watcher/nsfw/nsfwWatcherService';
import { IWatcherRequest } from 'vs/platform/files/node/watcher/nsfw/watcher';

View file

@ -13,8 +13,8 @@ export interface IWatcherRequest {
export interface IWatcherService {
readonly onDidLogMessage: Event<ILogMessage>;
readonly onDidChangeFile: Event<IDiskFileChange[]>;
readonly onDidLogMessage: Event<ILogMessage>;
setRoots(roots: IWatcherRequest[]): Promise<void>;
setVerboseLogging(enabled: boolean): Promise<void>;

View file

@ -18,6 +18,7 @@ import { IDiskFileChange, normalizeFileChanges, ILogMessage } from 'vs/platform/
import { IWatcherRequest, IWatcherService, IWatcherOptions } from 'vs/platform/files/node/watcher/unix/watcher';
import { Emitter, Event } from 'vs/base/common/event';
import { equals } from 'vs/base/common/arrays';
import { Disposable } from 'vs/base/common/lifecycle';
process.noAsar = true; // disable ASAR support in watcher process
@ -30,81 +31,76 @@ interface ExtendedWatcherRequest extends IWatcherRequest {
parsedPattern?: glob.ParsedPattern;
}
export class ChokidarWatcherService implements IWatcherService {
export class ChokidarWatcherService extends Disposable implements IWatcherService {
private static readonly FS_EVENT_DELAY = 50; // aggregate and only emit events when changes have stopped for this duration (in ms)
private static readonly EVENT_SPAM_WARNING_THRESHOLD = 60 * 1000; // warn after certain time span of event spam
private _watchers: { [watchPath: string]: IWatcher } = Object.create(null);
private _watcherCount = 0;
private readonly _onDidChangeFile = this._register(new Emitter<IDiskFileChange[]>());
readonly onDidChangeFile = this._onDidChangeFile.event;
private _pollingInterval?: number;
private _usePolling?: boolean;
private _verboseLogging: boolean | undefined;
private readonly _onDidLogMessage = this._register(new Emitter<ILogMessage>());
readonly onDidLogMessage: Event<ILogMessage> = this._onDidLogMessage.event;
private watchers = new Map<string, IWatcher>();
private _watcherCount = 0;
get wacherCount() { return this._watcherCount; }
private pollingInterval?: number;
private usePolling?: boolean;
private verboseLogging: boolean | undefined;
private spamCheckStartTime: number | undefined;
private spamWarningLogged: boolean | undefined;
private enospcErrorLogged: boolean | undefined;
private readonly _onWatchEvent = new Emitter<IDiskFileChange[]>();
readonly onWatchEvent = this._onWatchEvent.event;
private readonly _onLogMessage = new Emitter<ILogMessage>();
readonly onLogMessage: Event<ILogMessage> = this._onLogMessage.event;
watch(options: IWatcherOptions): Event<IDiskFileChange[]> {
this._pollingInterval = options.pollingInterval;
this._usePolling = options.usePolling;
this._watchers = Object.create(null);
async init(options: IWatcherOptions): Promise<void> {
this.pollingInterval = options.pollingInterval;
this.usePolling = options.usePolling;
this.watchers.clear();
this._watcherCount = 0;
return this.onWatchEvent;
this.verboseLogging = options.verboseLogging;
}
async setVerboseLogging(enabled: boolean): Promise<void> {
this._verboseLogging = enabled;
this.verboseLogging = enabled;
}
async setRoots(requests: IWatcherRequest[]): Promise<void> {
const watchers = Object.create(null);
const watchers = new Map<string, IWatcher>();
const newRequests: string[] = [];
const requestsByBasePath = normalizeRoots(requests);
// evaluate new & remaining watchers
for (const basePath in requestsByBasePath) {
const watcher = this._watchers[basePath];
const watcher = this.watchers.get(basePath);
if (watcher && isEqualRequests(watcher.requests, requestsByBasePath[basePath])) {
watchers[basePath] = watcher;
delete this._watchers[basePath];
watchers.set(basePath, watcher);
this.watchers.delete(basePath);
} else {
newRequests.push(basePath);
}
}
// stop all old watchers
for (const path in this._watchers) {
await this._watchers[path].stop();
for (const [, watcher] of this.watchers) {
await watcher.stop();
}
// start all new watchers
for (const basePath of newRequests) {
const requests = requestsByBasePath[basePath];
watchers[basePath] = this._watch(basePath, requests);
watchers.set(basePath, this.watch(basePath, requests));
}
this._watchers = watchers;
this.watchers = watchers;
}
// for test purposes
get wacherCount() {
return this._watcherCount;
}
private _watch(basePath: string, requests: IWatcherRequest[]): IWatcher {
const pollingInterval = this._pollingInterval || 5000;
const usePolling = this._usePolling;
private watch(basePath: string, requests: IWatcherRequest[]): IWatcher {
const pollingInterval = this.pollingInterval || 5000;
const usePolling = this.usePolling;
const watcherOpts: chokidar.WatchOptions = {
ignoreInitial: true,
@ -120,8 +116,7 @@ export class ChokidarWatcherService implements IWatcherService {
const isSingleFolder = requests.length === 1;
if (isSingleFolder) {
// if there's only one request, use the built-in ignore-filterering
excludes.push(...requests[0].excludes);
excludes.push(...requests[0].excludes); // if there's only one request, use the built-in ignore-filterering
}
if ((isMacintosh || isLinux) && (basePath.length === 0 || basePath === '/')) {
@ -146,7 +141,7 @@ export class ChokidarWatcherService implements IWatcherService {
this.warn(`Watcher basePath does not match version on disk and was corrected (original: ${basePath}, real: ${realBasePath})`);
}
if (this._verboseLogging) {
if (this.verboseLogging) {
this.log(`Start watching with chockidar: ${realBasePath}, excludes: ${excludes.join(',')}, usePolling: ${usePolling ? 'true, interval ' + pollingInterval : 'false'}`);
}
@ -165,7 +160,7 @@ export class ChokidarWatcherService implements IWatcherService {
requests,
stop: async () => {
try {
if (this._verboseLogging) {
if (this.verboseLogging) {
this.log(`Stop watching: ${basePath}]`);
}
if (chokidarWatcher) {
@ -227,7 +222,7 @@ export class ChokidarWatcherService implements IWatcherService {
const event = { type: eventType, path };
// Logging
if (this._verboseLogging) {
if (this.verboseLogging) {
this.log(`${eventType === FileChangeType.ADDED ? '[ADDED]' : eventType === FileChangeType.DELETED ? '[DELETED]' : '[CHANGED]'} ${path}`);
}
@ -253,10 +248,10 @@ export class ChokidarWatcherService implements IWatcherService {
// Broadcast to clients normalized
const res = normalizeFileChanges(events);
this._onWatchEvent.fire(res);
this._onDidChangeFile.fire(res);
// Logging
if (this._verboseLogging) {
if (this.verboseLogging) {
res.forEach(r => {
this.log(` >> normalized ${r.type === FileChangeType.ADDED ? '[ADDED]' : r.type === FileChangeType.DELETED ? '[DELETED]' : '[CHANGED]'} ${r.path}`);
});
@ -290,24 +285,23 @@ export class ChokidarWatcherService implements IWatcherService {
}
async stop(): Promise<void> {
for (const path in this._watchers) {
const watcher = this._watchers[path];
for (const [, watcher] of this.watchers) {
await watcher.stop();
}
this._watchers = Object.create(null);
this.watchers.clear();
}
private log(message: string) {
this._onLogMessage.fire({ type: 'trace', message: `[File Watcher (chokidar)] ` + message });
this._onDidLogMessage.fire({ type: 'trace', message: `[File Watcher (chokidar)] ` + message });
}
private warn(message: string) {
this._onLogMessage.fire({ type: 'warn', message: `[File Watcher (chokidar)] ` + message });
this._onDidLogMessage.fire({ type: 'warn', message: `[File Watcher (chokidar)] ` + message });
}
private error(message: string) {
this._onLogMessage.fire({ type: 'error', message: `[File Watcher (chokidar)] ` + message });
this._onDidLogMessage.fire({ type: 'error', message: `[File Watcher (chokidar)] ` + message });
}
}

View file

@ -4,15 +4,9 @@
*--------------------------------------------------------------------------------------------*/
import * as assert from 'assert';
import * as os from 'os';
import * as path from 'vs/base/common/path';
import * as pfs from 'vs/base/node/pfs';
import { normalizeRoots, ChokidarWatcherService } from '../chokidarWatcherService';
import { IWatcherRequest } from '../watcher';
import * as platform from 'vs/base/common/platform';
import { Delayer } from 'vs/base/common/async';
import { IDiskFileChange } from 'vs/platform/files/node/watcher/watcher';
import { FileChangeType } from 'vs/platform/files/common/files';
import { normalizeRoots } from 'vs/platform/files/node/watcher/unix/chokidarWatcherService';
import { IWatcherRequest } from 'vs/platform/files/node/watcher/unix/watcher';
function newRequest(basePath: string, ignored: string[] = []): IWatcherRequest {
return { path: basePath, excludes: ignored };
@ -36,25 +30,6 @@ function assertNormalizedRequests(inputRequests: IWatcherRequest[], expectedRequ
}
}
function sort(changes: IDiskFileChange[]) {
return changes.sort((c1, c2) => {
return c1.path.localeCompare(c2.path);
});
}
function wait(time: number) {
return new Delayer<void>(time).trigger(() => { });
}
async function assertFileEvents(actuals: IDiskFileChange[], expected: IDiskFileChange[]) {
let repeats = 40;
while ((actuals.length < expected.length) && repeats-- > 0) {
await wait(50);
}
assert.deepEqual(sort(actuals), sort(expected));
actuals.length = 0;
}
suite('Chokidar normalizeRoots', () => {
test('should not impacts roots that don\'t overlap', () => {
if (platform.isWindows) {
@ -115,208 +90,3 @@ suite('Chokidar normalizeRoots', () => {
assertNormalizedRequests([r2, r3], { [p2]: [r2, r3] });
});
});
suite.skip('Chokidar watching', () => {
const tmpdir = os.tmpdir();
const testDir = path.join(tmpdir, 'chokidartest-' + Date.now());
const aFolder = path.join(testDir, 'a');
const bFolder = path.join(testDir, 'b');
const b2Folder = path.join(bFolder, 'b2');
const service = new ChokidarWatcherService();
const result: IDiskFileChange[] = [];
let error: string | null = null;
suiteSetup(async () => {
await pfs.mkdirp(testDir);
await pfs.mkdirp(aFolder);
await pfs.mkdirp(bFolder);
await pfs.mkdirp(b2Folder);
const opts = { verboseLogging: false, pollingInterval: 200 };
service.watch(opts)(e => {
if (Array.isArray(e)) {
result.push(...e);
}
});
service.onLogMessage(msg => {
if (msg.type === 'error') {
console.log('set error', msg.message);
error = msg.message;
}
});
});
suiteTeardown(async () => {
await pfs.rimraf(testDir, pfs.RimRafMode.MOVE);
await service.stop();
});
setup(() => {
result.length = 0;
assert.equal(error, null);
});
teardown(() => {
assert.equal(error, null);
});
test('simple file operations, single root, no ignore', async () => {
let request: IWatcherRequest = { path: testDir, excludes: [] };
service.setRoots([request]);
await wait(300);
assert.equal(service.wacherCount, 1);
// create a file
let testFilePath = path.join(testDir, 'file.txt');
await pfs.writeFile(testFilePath, '');
await assertFileEvents(result, [{ path: testFilePath, type: FileChangeType.ADDED }]);
// modify a file
await pfs.writeFile(testFilePath, 'Hello');
await assertFileEvents(result, [{ path: testFilePath, type: FileChangeType.UPDATED }]);
// create a folder
let testFolderPath = path.join(testDir, 'newFolder');
await pfs.mkdirp(testFolderPath);
// copy a file
let copiedFilePath = path.join(testFolderPath, 'file2.txt');
await pfs.copy(testFilePath, copiedFilePath);
await assertFileEvents(result, [{ path: copiedFilePath, type: FileChangeType.ADDED }, { path: testFolderPath, type: FileChangeType.ADDED }]);
// delete a file
await pfs.rimraf(copiedFilePath, pfs.RimRafMode.MOVE);
let renamedFilePath = path.join(testFolderPath, 'file3.txt');
// move a file
await pfs.rename(testFilePath, renamedFilePath);
await assertFileEvents(result, [{ path: copiedFilePath, type: FileChangeType.DELETED }, { path: testFilePath, type: FileChangeType.DELETED }, { path: renamedFilePath, type: FileChangeType.ADDED }]);
// delete a folder
await pfs.rimraf(testFolderPath, pfs.RimRafMode.MOVE);
await assertFileEvents(result, [{ path: testFolderPath, type: FileChangeType.DELETED }, { path: renamedFilePath, type: FileChangeType.DELETED }]);
});
test('simple file operations, ignore', async () => {
let request: IWatcherRequest = { path: testDir, excludes: ['**/b/**', '**/*.js', '.git/**'] };
service.setRoots([request]);
await wait(300);
assert.equal(service.wacherCount, 1);
// create various ignored files
let file1 = path.join(bFolder, 'file1.txt'); // hidden
await pfs.writeFile(file1, 'Hello');
let file2 = path.join(b2Folder, 'file2.txt'); // hidden
await pfs.writeFile(file2, 'Hello');
let folder1 = path.join(bFolder, 'folder1'); // hidden
await pfs.mkdirp(folder1);
let folder2 = path.join(aFolder, 'b'); // hidden
await pfs.mkdirp(folder2);
let folder3 = path.join(testDir, '.git'); // hidden
await pfs.mkdirp(folder3);
let folder4 = path.join(testDir, '.git1');
await pfs.mkdirp(folder4);
let folder5 = path.join(aFolder, '.git');
await pfs.mkdirp(folder5);
let file3 = path.join(aFolder, 'file3.js'); // hidden
await pfs.writeFile(file3, 'var x;');
let file4 = path.join(aFolder, 'file4.txt');
await pfs.writeFile(file4, 'Hello');
await assertFileEvents(result, [{ path: file4, type: FileChangeType.ADDED }, { path: folder4, type: FileChangeType.ADDED }, { path: folder5, type: FileChangeType.ADDED }]);
// move some files
let movedFile1 = path.join(folder2, 'file1.txt'); // from ignored to ignored
await pfs.rename(file1, movedFile1);
let movedFile2 = path.join(aFolder, 'file2.txt'); // from ignored to visible
await pfs.rename(file2, movedFile2);
let movedFile3 = path.join(aFolder, 'file3.txt'); // from ignored file ext to visible
await pfs.rename(file3, movedFile3);
await assertFileEvents(result, [{ path: movedFile2, type: FileChangeType.ADDED }, { path: movedFile3, type: FileChangeType.ADDED }]);
// delete all files
await pfs.rimraf(movedFile1); // hidden
await pfs.rimraf(movedFile2, pfs.RimRafMode.MOVE);
await pfs.rimraf(movedFile3, pfs.RimRafMode.MOVE);
await pfs.rimraf(folder1); // hidden
await pfs.rimraf(folder2); // hidden
await pfs.rimraf(folder3); // hidden
await pfs.rimraf(folder4, pfs.RimRafMode.MOVE);
await pfs.rimraf(folder5, pfs.RimRafMode.MOVE);
await pfs.rimraf(file4, pfs.RimRafMode.MOVE);
await assertFileEvents(result, [{ path: movedFile2, type: FileChangeType.DELETED }, { path: movedFile3, type: FileChangeType.DELETED }, { path: file4, type: FileChangeType.DELETED }, { path: folder4, type: FileChangeType.DELETED }, { path: folder5, type: FileChangeType.DELETED }]);
});
test('simple file operations, multiple roots', async () => {
let request1: IWatcherRequest = { path: aFolder, excludes: ['**/*.js'] };
let request2: IWatcherRequest = { path: b2Folder, excludes: ['**/*.ts'] };
service.setRoots([request1, request2]);
await wait(300);
assert.equal(service.wacherCount, 2);
// create some files
let folderPath1 = path.join(aFolder, 'folder1');
await pfs.mkdirp(folderPath1);
let filePath1 = path.join(folderPath1, 'file1.json');
await pfs.writeFile(filePath1, '');
let filePath2 = path.join(folderPath1, 'file2.js'); // filtered
await pfs.writeFile(filePath2, '');
let folderPath2 = path.join(b2Folder, 'folder2');
await pfs.mkdirp(folderPath2);
let filePath3 = path.join(folderPath2, 'file3.ts'); // filtered
await pfs.writeFile(filePath3, '');
let filePath4 = path.join(testDir, 'file4.json'); // outside roots
await pfs.writeFile(filePath4, '');
await assertFileEvents(result, [{ path: folderPath1, type: FileChangeType.ADDED }, { path: filePath1, type: FileChangeType.ADDED }, { path: folderPath2, type: FileChangeType.ADDED }]);
// change roots
let request3: IWatcherRequest = { path: aFolder, excludes: ['**/*.json'] };
service.setRoots([request3]);
await wait(300);
assert.equal(service.wacherCount, 1);
// delete all
await pfs.rimraf(folderPath1, pfs.RimRafMode.MOVE);
await pfs.rimraf(folderPath2, pfs.RimRafMode.MOVE);
await pfs.rimraf(filePath4, pfs.RimRafMode.MOVE);
await assertFileEvents(result, [{ path: folderPath1, type: FileChangeType.DELETED }, { path: filePath2, type: FileChangeType.DELETED }]);
});
test('simple file operations, nested roots', async () => {
let request1: IWatcherRequest = { path: testDir, excludes: ['**/b2/**'] };
let request2: IWatcherRequest = { path: bFolder, excludes: ['**/b3/**'] };
service.setRoots([request1, request2]);
await wait(300);
assert.equal(service.wacherCount, 1);
// create files
let filePath1 = path.join(bFolder, 'file1.xml'); // visible by both
await pfs.writeFile(filePath1, '');
let filePath2 = path.join(b2Folder, 'file2.xml'); // filtered by root1, but visible by root2
await pfs.writeFile(filePath2, '');
let folderPath1 = path.join(b2Folder, 'b3'); // filtered
await pfs.mkdirp(folderPath1);
let filePath3 = path.join(folderPath1, 'file3.xml'); // filtered
await pfs.writeFile(filePath3, '');
await assertFileEvents(result, [{ path: filePath1, type: FileChangeType.ADDED }, { path: filePath2, type: FileChangeType.ADDED }]);
let renamedFilePath2 = path.join(folderPath1, 'file2.xml');
// move a file
await pfs.rename(filePath2, renamedFilePath2);
await assertFileEvents(result, [{ path: filePath2, type: FileChangeType.DELETED }]);
// delete all
await pfs.rimraf(folderPath1, pfs.RimRafMode.MOVE);
await pfs.rimraf(filePath1, pfs.RimRafMode.MOVE);
await assertFileEvents(result, [{ path: filePath1, type: FileChangeType.DELETED }]);
});
});

View file

@ -14,12 +14,18 @@ export interface IWatcherRequest {
export interface IWatcherOptions {
pollingInterval?: number;
usePolling?: boolean;
verboseLogging?: boolean;
}
export interface IWatcherService {
watch(options: IWatcherOptions): Event<IDiskFileChange[]>;
readonly onDidChangeFile: Event<IDiskFileChange[]>;
readonly onDidLogMessage: Event<ILogMessage>;
init(options: IWatcherOptions): Promise<void>;
setRoots(roots: IWatcherRequest[]): Promise<void>;
setVerboseLogging(enabled: boolean): Promise<void>;
onLogMessage: Event<ILogMessage>;
stop(): Promise<void>;
}
}

View file

@ -4,10 +4,9 @@
*--------------------------------------------------------------------------------------------*/
import { Server } from 'vs/base/parts/ipc/node/ipc.cp';
import { WatcherChannel } from 'vs/platform/files/node/watcher/unix/watcherIpc';
import { ChokidarWatcherService } from 'vs/platform/files/node/watcher/unix/chokidarWatcherService';
import { createChannelReceiver } from 'vs/base/parts/ipc/common/ipc';
const server = new Server('watcher');
const service = new ChokidarWatcherService();
const channel = new WatcherChannel(service);
server.registerChannel('watcher', channel);
server.registerChannel('watcher', createChannelReceiver(service));

View file

@ -1,58 +0,0 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { IChannel, IServerChannel } from 'vs/base/parts/ipc/common/ipc';
import { IWatcherRequest, IWatcherService, IWatcherOptions } from './watcher';
import { Event } from 'vs/base/common/event';
import { IDiskFileChange, ILogMessage } from 'vs/platform/files/node/watcher/watcher';
export class WatcherChannel implements IServerChannel {
constructor(private service: IWatcherService) { }
listen(_: unknown, event: string, arg?: any): Event<any> {
switch (event) {
case 'watch': return this.service.watch(arg);
case 'onLogMessage': return this.service.onLogMessage;
}
throw new Error(`Event not found: ${event}`);
}
call(_: unknown, command: string, arg?: any): Promise<any> {
switch (command) {
case 'setRoots': return this.service.setRoots(arg);
case 'setVerboseLogging': return this.service.setVerboseLogging(arg);
case 'stop': return this.service.stop();
}
throw new Error(`Call not found: ${command}`);
}
}
export class WatcherChannelClient implements IWatcherService {
constructor(private channel: IChannel) { }
watch(options: IWatcherOptions): Event<IDiskFileChange[]> {
return this.channel.listen('watch', options);
}
setVerboseLogging(enable: boolean): Promise<void> {
return this.channel.call('setVerboseLogging', enable);
}
get onLogMessage(): Event<ILogMessage> {
return this.channel.listen('onLogMessage');
}
setRoots(roots: IWatcherRequest[]): Promise<void> {
return this.channel.call('setRoots', roots);
}
stop(): Promise<void> {
return this.channel.call('stop');
}
}

View file

@ -3,20 +3,20 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { getNextTickChannel } from 'vs/base/parts/ipc/common/ipc';
import { createChannelSender, getNextTickChannel } from 'vs/base/parts/ipc/common/ipc';
import { Client } from 'vs/base/parts/ipc/node/ipc.cp';
import { IDiskFileChange, ILogMessage } from 'vs/platform/files/node/watcher/watcher';
import { WatcherChannelClient } from 'vs/platform/files/node/watcher/unix/watcherIpc';
import { Disposable } from 'vs/base/common/lifecycle';
import { IWatcherRequest, IWatcherOptions } from 'vs/platform/files/node/watcher/unix/watcher';
import { IWatcherRequest, IWatcherOptions, IWatcherService } from 'vs/platform/files/node/watcher/unix/watcher';
import { getPathFromAmdModule } from 'vs/base/common/amd';
export class FileWatcher extends Disposable {
private static readonly MAX_RESTARTS = 5;
private isDisposed: boolean;
private restartCounter: number;
private service: WatcherChannelClient | undefined;
private service: IWatcherService | undefined;
constructor(
private folders: IWatcherRequest[],
@ -62,14 +62,11 @@ export class FileWatcher extends Disposable {
}));
// Initialize watcher
const channel = getNextTickChannel(client.getChannel('watcher'));
this.service = new WatcherChannelClient(channel);
this.service = createChannelSender<IWatcherService>(getNextTickChannel(client.getChannel('watcher')));
this.service.init({ ...this.watcherOptions, verboseLogging: this.verboseLogging });
this.service.setVerboseLogging(this.verboseLogging);
this._register(this.service.watch(this.watcherOptions)(e => !this.isDisposed && this.onDidFilesChange(e)));
this._register(this.service.onLogMessage(m => this.onLogMessage(m)));
this._register(this.service.onDidChangeFile(e => !this.isDisposed && this.onDidFilesChange(e)));
this._register(this.service.onDidLogMessage(m => this.onLogMessage(m)));
// Start watching
this.service.setRoots(this.folders);