IPC: getBroadcastChannel

fixes #89614
This commit is contained in:
Joao Moreno 2020-01-31 16:14:16 +01:00
parent be0aca7188
commit c11a5feafe
2 changed files with 131 additions and 9 deletions

View file

@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { Event, Emitter, Relay } from 'vs/base/common/event';
import { Event, Emitter, Relay, EventMultiplexer } from 'vs/base/common/event';
import { IDisposable, toDisposable, combinedDisposable } from 'vs/base/common/lifecycle';
import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/common/async';
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
@ -95,7 +95,8 @@ export interface Client<TContext> {
export interface IConnectionHub<TContext> {
readonly connections: Connection<TContext>[];
readonly onDidChangeConnections: Event<Connection<TContext>>;
readonly onDidAddConnection: Event<Connection<TContext>>;
readonly onDidRemoveConnection: Event<Connection<TContext>>;
}
/**
@ -117,6 +118,7 @@ export interface IClientRouter<TContext = string> {
*/
export interface IRoutingChannelClient<TContext = string> {
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
getBroadcastChannel<T extends IChannel>(channelName: string): T;
}
interface IReader {
@ -659,8 +661,11 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
private channels = new Map<string, IServerChannel<TContext>>();
private _connections = new Set<Connection<TContext>>();
private readonly _onDidChangeConnections = new Emitter<Connection<TContext>>();
readonly onDidChangeConnections: Event<Connection<TContext>> = this._onDidChangeConnections.event;
private readonly _onDidAddConnection = new Emitter<Connection<TContext>>();
readonly onDidAddConnection: Event<Connection<TContext>> = this._onDidAddConnection.event;
private readonly _onDidRemoveConnection = new Emitter<Connection<TContext>>();
readonly onDidRemoveConnection: Event<Connection<TContext>> = this._onDidRemoveConnection.event;
get connections(): Connection<TContext>[] {
const result: Connection<TContext>[] = [];
@ -683,12 +688,13 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
const connection: Connection<TContext> = { channelServer, channelClient, ctx };
this._connections.add(connection);
this._onDidChangeConnections.fire(connection);
this._onDidAddConnection.fire(connection);
onDidClientDisconnect(() => {
channelServer.dispose();
channelClient.dispose();
this._connections.delete(connection);
this._onDidRemoveConnection.fire(connection);
});
});
});
@ -698,14 +704,14 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
const that = this;
return {
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
const channelPromise = router.routeCall(that, command, arg)
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
return getDelayedChannel(channelPromise)
.call(command, arg, cancellationToken);
},
listen(event: string, arg: any) {
listen(event: string, arg: any): Event<T> {
const channelPromise = router.routeEvent(that, event, arg)
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
@ -715,6 +721,45 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
} as T;
}
getBroadcastChannel<T extends IChannel>(channelName: string): T {
const that = this;
return {
call(_): Promise<T> {
throw new Error('IPC broadcast channels are not supported for calls');
},
listen(eventName: string, arg: any): Event<T> {
const eventMultiplexer = new EventMultiplexer<T>();
const map = new Map<Connection<TContext>, IDisposable>();
const onDidAddConnection = (connection: Connection<TContext>) => {
const channel = connection.channelClient.getChannel(channelName);
const event = channel.listen<T>(eventName, arg);
const disposable = eventMultiplexer.add(event);
map.set(connection, disposable);
};
const onDidRemoveConnection = (connection: Connection<TContext>) => {
const disposable = map.get(connection);
if (!disposable) {
return;
}
disposable.dispose();
map.delete(connection);
};
that.connections.forEach(onDidAddConnection);
that.onDidAddConnection(onDidAddConnection);
that.onDidRemoveConnection(onDidRemoveConnection);
return eventMultiplexer.event;
}
} as T;
}
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
this.channels.set(channelName, channel);
@ -726,7 +771,8 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
dispose(): void {
this.channels.clear();
this._connections.clear();
this._onDidChangeConnections.dispose();
this._onDidAddConnection.dispose();
this._onDidRemoveConnection.dispose();
}
}
@ -827,7 +873,7 @@ export class StaticRouter<TContext = string> implements IClientRouter<TContext>
}
}
await Event.toPromise(hub.onDidChangeConnections);
await Event.toPromise(hub.onDidAddConnection);
return await this.route(hub);
}
}

View file

@ -415,4 +415,80 @@ suite('Base IPC', function () {
return assert.equal(r, 'Super Context');
});
});
suite('one to many', function () {
test('all clients get pinged', async function () {
const service = new TestService();
const channel = new TestChannel(service);
const server = new TestIPCServer();
server.registerChannel('channel', channel);
let client1GotPinged = false;
const client1 = server.createConnection('client1');
const ipcService1 = new TestChannelClient(client1.getChannel('channel'));
ipcService1.onPong(() => client1GotPinged = true);
let client2GotPinged = false;
const client2 = server.createConnection('client2');
const ipcService2 = new TestChannelClient(client2.getChannel('channel'));
ipcService2.onPong(() => client2GotPinged = true);
await timeout(1);
service.ping('hello');
await timeout(1);
assert(client1GotPinged, 'client 1 got pinged');
assert(client2GotPinged, 'client 2 got pinged');
client1.dispose();
client2.dispose();
server.dispose();
});
test('server gets pings from all clients (broadcast channel)', async function () {
const server = new TestIPCServer();
const client1 = server.createConnection('client1');
const clientService1 = new TestService();
const clientChannel1 = new TestChannel(clientService1);
client1.registerChannel('channel', clientChannel1);
const pings: string[] = [];
const broadcastChannel = server.getBroadcastChannel('channel');
const broadcastService = new TestChannelClient(broadcastChannel);
broadcastService.onPong(msg => pings.push(msg));
await timeout(1);
clientService1.ping('hello 1');
await timeout(1);
assert.deepEqual(pings, ['hello 1']);
const client2 = server.createConnection('client2');
const clientService2 = new TestService();
const clientChannel2 = new TestChannel(clientService2);
client2.registerChannel('channel', clientChannel2);
await timeout(1);
clientService2.ping('hello 2');
await timeout(1);
assert.deepEqual(pings, ['hello 1', 'hello 2']);
client1.dispose();
clientService1.ping('hello 1');
await timeout(1);
assert.deepEqual(pings, ['hello 1', 'hello 2']);
await timeout(1);
clientService2.ping('hello again 2');
await timeout(1);
assert.deepEqual(pings, ['hello 1', 'hello 2', 'hello again 2']);
client2.dispose();
server.dispose();
});
});
});