adopt to new multiplex channel

This commit is contained in:
Sandeep Somavarapu 2020-02-05 11:39:06 +01:00
parent 395f34d3d6
commit 154c56def8
3 changed files with 18 additions and 19 deletions

View file

@ -10,6 +10,7 @@ import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cance
import * as errors from 'vs/base/common/errors'; import * as errors from 'vs/base/common/errors';
import { VSBuffer } from 'vs/base/common/buffer'; import { VSBuffer } from 'vs/base/common/buffer';
import { getRandomElement } from 'vs/base/common/arrays'; import { getRandomElement } from 'vs/base/common/arrays';
import { isFunction } from 'vs/base/common/types';
/** /**
* An `IChannel` is an abstraction over a collection of commands. * An `IChannel` is an abstraction over a collection of commands.
@ -707,24 +708,26 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
* be selected and when listening without a router, every client * be selected and when listening without a router, every client
* will be listened to. * will be listened to.
*/ */
getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T { getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T;
getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T {
const that = this; const that = this;
return { return {
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> { call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
let connectionPromise: Promise<Client<TContext>>; let connectionPromise: Promise<Client<TContext>>;
if (router) { if (isFunction(routerOrClientFilter)) {
connectionPromise = router.routeCall(that, command, arg);
} else {
// when no router is provided, we go random client picking // when no router is provided, we go random client picking
let connection = getRandomElement(that.connections); let connection = getRandomElement(that.connections.filter(routerOrClientFilter));
connectionPromise = connection connectionPromise = connection
// if we found a client, let's call on it // if we found a client, let's call on it
? Promise.resolve(connection) ? Promise.resolve(connection)
// else, let's wait for a client to come along // else, let's wait for a client to come along
: Event.toPromise(that.onDidAddConnection); : Event.toPromise(Event.filter(that.onDidAddConnection, routerOrClientFilter));
} else {
connectionPromise = routerOrClientFilter.routeCall(that, command, arg);
} }
const channelPromise = connectionPromise const channelPromise = connectionPromise
@ -734,11 +737,11 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
.call(command, arg, cancellationToken); .call(command, arg, cancellationToken);
}, },
listen(event: string, arg: any): Event<T> { listen(event: string, arg: any): Event<T> {
if (!router) { if (isFunction(routerOrClientFilter)) {
return that.getMulticastEvent(channelName, event, arg); return that.getMulticastEvent(channelName, routerOrClientFilter, event, arg);
} }
const channelPromise = router.routeEvent(that, event, arg) const channelPromise = routerOrClientFilter.routeEvent(that, event, arg)
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName)); .then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
return getDelayedChannel(channelPromise) return getDelayedChannel(channelPromise)
@ -747,7 +750,7 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
} as T; } as T;
} }
private getMulticastEvent<T extends IChannel>(channelName: string, eventName: string, arg: any): Event<T> { private getMulticastEvent<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean, eventName: string, arg: any): Event<T> {
const that = this; const that = this;
let disposables = new DisposableStore(); let disposables = new DisposableStore();
@ -784,8 +787,8 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
map.delete(connection); map.delete(connection);
}; };
that.connections.forEach(onDidAddConnection); that.connections.filter(clientFilter).forEach(onDidAddConnection);
that.onDidAddConnection(onDidAddConnection, undefined, disposables); Event.filter(that.onDidAddConnection, clientFilter)(onDidAddConnection, undefined, disposables);
that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables); that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables);
eventMultiplexer.event(emitter.fire, emitter, disposables); eventMultiplexer.event(emitter.fire, emitter, disposables);

View file

@ -454,7 +454,7 @@ suite('Base IPC', function () {
client1.registerChannel('channel', clientChannel1); client1.registerChannel('channel', clientChannel1);
const pings: string[] = []; const pings: string[] = [];
const channel = server.getChannel('channel'); const channel = server.getChannel('channel', () => true);
const service = new TestChannelClient(channel); const service = new TestChannelClient(channel);
service.onPong(msg => pings.push(msg)); service.onPong(msg => pings.push(msg));

View file

@ -26,7 +26,6 @@ import { resolveCommonProperties } from 'vs/platform/telemetry/node/commonProper
import { TelemetryAppenderChannel } from 'vs/platform/telemetry/node/telemetryIpc'; import { TelemetryAppenderChannel } from 'vs/platform/telemetry/node/telemetryIpc';
import { TelemetryService, ITelemetryServiceConfig } from 'vs/platform/telemetry/common/telemetryService'; import { TelemetryService, ITelemetryServiceConfig } from 'vs/platform/telemetry/common/telemetryService';
import { AppInsightsAppender } from 'vs/platform/telemetry/node/appInsightsAppender'; import { AppInsightsAppender } from 'vs/platform/telemetry/node/appInsightsAppender';
import { ActiveWindowManager } from 'vs/code/node/activeWindowTracker';
import { ipcRenderer } from 'electron'; import { ipcRenderer } from 'electron';
import { ILogService, LogLevel, ILoggerService } from 'vs/platform/log/common/log'; import { ILogService, LogLevel, ILoggerService } from 'vs/platform/log/common/log';
import { LoggerChannelClient, FollowerLogService } from 'vs/platform/log/common/logIpc'; import { LoggerChannelClient, FollowerLogService } from 'vs/platform/log/common/logIpc';
@ -131,9 +130,6 @@ async function main(server: Server, initData: ISharedProcessInitData, configurat
const electronService = createChannelSender<IElectronService>(mainProcessService.getChannel('electron'), { context: configuration.windowId }); const electronService = createChannelSender<IElectronService>(mainProcessService.getChannel('electron'), { context: configuration.windowId });
services.set(IElectronService, electronService); services.set(IElectronService, electronService);
const activeWindowManager = new ActiveWindowManager(electronService);
const activeWindowRouter = new StaticRouter(ctx => activeWindowManager.getActiveClientId().then(id => ctx === id));
// Files // Files
const fileService = new FileService(logService); const fileService = new FileService(logService);
services.set(IFileService, fileService); services.set(IFileService, fileService);
@ -184,8 +180,8 @@ async function main(server: Server, initData: ISharedProcessInitData, configurat
services.set(ICredentialsService, new SyncDescriptor(KeytarCredentialsService)); services.set(ICredentialsService, new SyncDescriptor(KeytarCredentialsService));
services.set(IUserDataAuthTokenService, new SyncDescriptor(UserDataAuthTokenService)); services.set(IUserDataAuthTokenService, new SyncDescriptor(UserDataAuthTokenService));
services.set(IUserDataSyncLogService, new SyncDescriptor(UserDataSyncLogService)); services.set(IUserDataSyncLogService, new SyncDescriptor(UserDataSyncLogService));
services.set(IUserDataSyncUtilService, new UserDataSyncUtilServiceClient(server.getChannel('userDataSyncUtil', activeWindowRouter))); services.set(IUserDataSyncUtilService, new UserDataSyncUtilServiceClient(server.getChannel('userDataSyncUtil', client => client.ctx !== 'main')));
services.set(IGlobalExtensionEnablementService, new GlobalExtensionEnablementServiceClient(server.getChannel('globalExtensionEnablement', activeWindowRouter))); services.set(IGlobalExtensionEnablementService, new GlobalExtensionEnablementServiceClient(server.getChannel('globalExtensionEnablement', client => client.ctx !== 'main')));
services.set(IUserDataSyncStoreService, new SyncDescriptor(UserDataSyncStoreService)); services.set(IUserDataSyncStoreService, new SyncDescriptor(UserDataSyncStoreService));
services.set(ISettingsSyncService, new SyncDescriptor(SettingsSynchroniser)); services.set(ISettingsSyncService, new SyncDescriptor(SettingsSynchroniser));
services.set(IUserDataSyncService, new SyncDescriptor(UserDataSyncService)); services.set(IUserDataSyncService, new SyncDescriptor(UserDataSyncService));