mirror of
https://github.com/Microsoft/vscode
synced 2024-10-12 06:17:18 +00:00
Merge branch 'joao/ipc-broadcast'
This commit is contained in:
commit
920559ace1
|
@ -3,8 +3,8 @@
|
|||
* Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
*--------------------------------------------------------------------------------------------*/
|
||||
|
||||
import { Event, Emitter, Relay } from 'vs/base/common/event';
|
||||
import { IDisposable, toDisposable, combinedDisposable } from 'vs/base/common/lifecycle';
|
||||
import { Event, Emitter, Relay, EventMultiplexer } from 'vs/base/common/event';
|
||||
import { IDisposable, toDisposable, combinedDisposable, DisposableStore } from 'vs/base/common/lifecycle';
|
||||
import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/common/async';
|
||||
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
|
||||
import * as errors from 'vs/base/common/errors';
|
||||
|
@ -95,7 +95,8 @@ export interface Client<TContext> {
|
|||
|
||||
export interface IConnectionHub<TContext> {
|
||||
readonly connections: Connection<TContext>[];
|
||||
readonly onDidChangeConnections: Event<Connection<TContext>>;
|
||||
readonly onDidAddConnection: Event<Connection<TContext>>;
|
||||
readonly onDidRemoveConnection: Event<Connection<TContext>>;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,6 +118,7 @@ export interface IClientRouter<TContext = string> {
|
|||
*/
|
||||
export interface IRoutingChannelClient<TContext = string> {
|
||||
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
|
||||
getBroadcastChannel<T extends IChannel>(channelName: string): T;
|
||||
}
|
||||
|
||||
interface IReader {
|
||||
|
@ -659,8 +661,11 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
|
|||
private channels = new Map<string, IServerChannel<TContext>>();
|
||||
private _connections = new Set<Connection<TContext>>();
|
||||
|
||||
private readonly _onDidChangeConnections = new Emitter<Connection<TContext>>();
|
||||
readonly onDidChangeConnections: Event<Connection<TContext>> = this._onDidChangeConnections.event;
|
||||
private readonly _onDidAddConnection = new Emitter<Connection<TContext>>();
|
||||
readonly onDidAddConnection: Event<Connection<TContext>> = this._onDidAddConnection.event;
|
||||
|
||||
private readonly _onDidRemoveConnection = new Emitter<Connection<TContext>>();
|
||||
readonly onDidRemoveConnection: Event<Connection<TContext>> = this._onDidRemoveConnection.event;
|
||||
|
||||
get connections(): Connection<TContext>[] {
|
||||
const result: Connection<TContext>[] = [];
|
||||
|
@ -683,12 +688,13 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
|
|||
|
||||
const connection: Connection<TContext> = { channelServer, channelClient, ctx };
|
||||
this._connections.add(connection);
|
||||
this._onDidChangeConnections.fire(connection);
|
||||
this._onDidAddConnection.fire(connection);
|
||||
|
||||
onDidClientDisconnect(() => {
|
||||
channelServer.dispose();
|
||||
channelClient.dispose();
|
||||
this._connections.delete(connection);
|
||||
this._onDidRemoveConnection.fire(connection);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -698,14 +704,14 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
|
|||
const that = this;
|
||||
|
||||
return {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken) {
|
||||
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
|
||||
const channelPromise = router.routeCall(that, command, arg)
|
||||
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
|
||||
|
||||
return getDelayedChannel(channelPromise)
|
||||
.call(command, arg, cancellationToken);
|
||||
},
|
||||
listen(event: string, arg: any) {
|
||||
listen(event: string, arg: any): Event<T> {
|
||||
const channelPromise = router.routeEvent(that, event, arg)
|
||||
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
|
||||
|
||||
|
@ -715,6 +721,66 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
|
|||
} as T;
|
||||
}
|
||||
|
||||
getBroadcastChannel<T extends IChannel>(channelName: string): T {
|
||||
const that = this;
|
||||
|
||||
return {
|
||||
call(_): Promise<T> {
|
||||
throw new Error('IPC broadcast channels are not supported for calls');
|
||||
},
|
||||
listen(eventName: string, arg: any): Event<T> {
|
||||
let disposables = new DisposableStore();
|
||||
|
||||
// Create an emitter which hooks up to all clients
|
||||
// as soon as first listener is added. It also
|
||||
// disconnects from all clients as soon as the last listener
|
||||
// is removed.
|
||||
const emitter = new Emitter<T>({
|
||||
onFirstListenerAdd: () => {
|
||||
disposables = new DisposableStore();
|
||||
|
||||
// The event multiplexer is useful since the active
|
||||
// client list is dynamic. We need to hook up and disconnection
|
||||
// to/from clients as they come and go.
|
||||
const eventMultiplexer = new EventMultiplexer<T>();
|
||||
const map = new Map<Connection<TContext>, IDisposable>();
|
||||
|
||||
const onDidAddConnection = (connection: Connection<TContext>) => {
|
||||
const channel = connection.channelClient.getChannel(channelName);
|
||||
const event = channel.listen<T>(eventName, arg);
|
||||
const disposable = eventMultiplexer.add(event);
|
||||
|
||||
map.set(connection, disposable);
|
||||
};
|
||||
|
||||
const onDidRemoveConnection = (connection: Connection<TContext>) => {
|
||||
const disposable = map.get(connection);
|
||||
|
||||
if (!disposable) {
|
||||
return;
|
||||
}
|
||||
|
||||
disposable.dispose();
|
||||
map.delete(connection);
|
||||
};
|
||||
|
||||
that.connections.forEach(onDidAddConnection);
|
||||
that.onDidAddConnection(onDidAddConnection, undefined, disposables);
|
||||
that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables);
|
||||
eventMultiplexer.event(emitter.fire, emitter, disposables);
|
||||
|
||||
disposables.add(eventMultiplexer);
|
||||
},
|
||||
onLastListenerRemove: () => {
|
||||
disposables.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
return emitter.event;
|
||||
}
|
||||
} as T;
|
||||
}
|
||||
|
||||
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
|
||||
this.channels.set(channelName, channel);
|
||||
|
||||
|
@ -726,7 +792,8 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
|
|||
dispose(): void {
|
||||
this.channels.clear();
|
||||
this._connections.clear();
|
||||
this._onDidChangeConnections.dispose();
|
||||
this._onDidAddConnection.dispose();
|
||||
this._onDidRemoveConnection.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -827,7 +894,7 @@ export class StaticRouter<TContext = string> implements IClientRouter<TContext>
|
|||
}
|
||||
}
|
||||
|
||||
await Event.toPromise(hub.onDidChangeConnections);
|
||||
await Event.toPromise(hub.onDidAddConnection);
|
||||
return await this.route(hub);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -415,4 +415,80 @@ suite('Base IPC', function () {
|
|||
return assert.equal(r, 'Super Context');
|
||||
});
|
||||
});
|
||||
|
||||
suite('one to many', function () {
|
||||
test('all clients get pinged', async function () {
|
||||
const service = new TestService();
|
||||
const channel = new TestChannel(service);
|
||||
const server = new TestIPCServer();
|
||||
server.registerChannel('channel', channel);
|
||||
|
||||
let client1GotPinged = false;
|
||||
const client1 = server.createConnection('client1');
|
||||
const ipcService1 = new TestChannelClient(client1.getChannel('channel'));
|
||||
ipcService1.onPong(() => client1GotPinged = true);
|
||||
|
||||
let client2GotPinged = false;
|
||||
const client2 = server.createConnection('client2');
|
||||
const ipcService2 = new TestChannelClient(client2.getChannel('channel'));
|
||||
ipcService2.onPong(() => client2GotPinged = true);
|
||||
|
||||
await timeout(1);
|
||||
service.ping('hello');
|
||||
|
||||
await timeout(1);
|
||||
assert(client1GotPinged, 'client 1 got pinged');
|
||||
assert(client2GotPinged, 'client 2 got pinged');
|
||||
|
||||
client1.dispose();
|
||||
client2.dispose();
|
||||
server.dispose();
|
||||
});
|
||||
|
||||
test('server gets pings from all clients (broadcast channel)', async function () {
|
||||
const server = new TestIPCServer();
|
||||
|
||||
const client1 = server.createConnection('client1');
|
||||
const clientService1 = new TestService();
|
||||
const clientChannel1 = new TestChannel(clientService1);
|
||||
client1.registerChannel('channel', clientChannel1);
|
||||
|
||||
const pings: string[] = [];
|
||||
const broadcastChannel = server.getBroadcastChannel('channel');
|
||||
const broadcastService = new TestChannelClient(broadcastChannel);
|
||||
broadcastService.onPong(msg => pings.push(msg));
|
||||
|
||||
await timeout(1);
|
||||
clientService1.ping('hello 1');
|
||||
|
||||
await timeout(1);
|
||||
assert.deepEqual(pings, ['hello 1']);
|
||||
|
||||
const client2 = server.createConnection('client2');
|
||||
const clientService2 = new TestService();
|
||||
const clientChannel2 = new TestChannel(clientService2);
|
||||
client2.registerChannel('channel', clientChannel2);
|
||||
|
||||
await timeout(1);
|
||||
clientService2.ping('hello 2');
|
||||
|
||||
await timeout(1);
|
||||
assert.deepEqual(pings, ['hello 1', 'hello 2']);
|
||||
|
||||
client1.dispose();
|
||||
clientService1.ping('hello 1');
|
||||
|
||||
await timeout(1);
|
||||
assert.deepEqual(pings, ['hello 1', 'hello 2']);
|
||||
|
||||
await timeout(1);
|
||||
clientService2.ping('hello again 2');
|
||||
|
||||
await timeout(1);
|
||||
assert.deepEqual(pings, ['hello 1', 'hello 2', 'hello again 2']);
|
||||
|
||||
client2.dispose();
|
||||
server.dispose();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue