address pr comments

This commit is contained in:
Connor Peet 2023-05-11 12:29:42 -07:00
parent 364b84e3ec
commit f76fd4f664
No known key found for this signature in database
GPG key ID: CF8FD2EA0DBC61BD
8 changed files with 163 additions and 24 deletions

View file

@ -196,8 +196,8 @@ export function activate(context: vscode.ExtensionContext) {
onDidReceiveMessage: dataEmitter.event,
onDidClose: closeEmitter.event,
onDidEnd: endEmitter.event,
dataHandler: d => remoteSocket.write(d),
endHandler: () => remoteSocket.end(),
send: d => remoteSocket.write(d),
end: () => remoteSocket.end(),
};
}, connectionToken));
}

View file

@ -1171,6 +1171,10 @@ export class PauseableEmitter<T> extends Emitter<T> {
protected _eventQueue = new LinkedList<T>();
private _mergeFn?: (input: T[]) => T;
public get isPaused(): boolean {
return this._isPaused !== 0;
}
constructor(options?: EmitterOptions & { merge?: (input: T[]) => T }) {
super(options);
this._mergeFn = options?.merge;

View file

@ -10,7 +10,7 @@ import { ManagedRemoteConnection, RemoteConnectionType } from 'vs/platform/remot
import { VSBuffer } from 'vs/base/common/buffer';
import { IRemoteSocketFactoryService, ISocketFactory } from 'vs/platform/remote/common/remoteSocketFactoryService';
import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { Emitter, Event } from 'vs/base/common/event';
import { Emitter, Event, PauseableEmitter } from 'vs/base/common/event';
import { makeRawSocketHeaders, socketRawEndHeaderSequence } from 'vs/platform/remote/common/managedSocket';
@extHostNamedCustomer(MainContext.MainThreadManagedSockets)
@ -30,7 +30,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
async $registerSocketFactory(socketFactoryId: number): Promise<void> {
const that = this;
const scoketFactory = new class implements ISocketFactory<RemoteConnectionType.Managed> {
const socketFactory = new class implements ISocketFactory<RemoteConnectionType.Managed> {
supports(connectTo: ManagedRemoteConnection): boolean {
return (connectTo.id === socketFactoryId);
@ -65,7 +65,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
});
}
};
this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, scoketFactory));
this._registrations.set(socketFactoryId, this._remoteSocketFactoryService.register(RemoteConnectionType.Managed, socketFactory));
}
@ -91,7 +91,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
}
}
interface RemoteSocketHalf {
export interface RemoteSocketHalf {
onData: Emitter<VSBuffer>;
onClose: Emitter<SocketCloseEvent>;
onEnd: Emitter<void>;
@ -103,11 +103,7 @@ export class ManagedSocket extends Disposable implements ISocket {
proxy: ExtHostManagedSocketsShape,
path: string, query: string, debugLabel: string,
half: {
onClose: Emitter<SocketCloseEvent>;
onData: Emitter<VSBuffer>;
onEnd: Emitter<void>;
}
half: RemoteSocketHalf
): Promise<ManagedSocket> {
const socket = new ManagedSocket(socketId, proxy, debugLabel, half.onClose, half.onData, half.onEnd);
@ -115,9 +111,28 @@ export class ManagedSocket extends Disposable implements ISocket {
const d = new DisposableStore();
return new Promise<ManagedSocket>((resolve, reject) => {
let dataSoFar: VSBuffer | undefined;
d.add(socket.onData(d => {
if (d.indexOf(socketRawEndHeaderSequence) !== -1) {
resolve(socket);
if (!dataSoFar) {
dataSoFar = d;
} else {
dataSoFar = VSBuffer.concat([dataSoFar, d], dataSoFar.byteLength + d.byteLength);
}
const index = dataSoFar.indexOf(socketRawEndHeaderSequence);
if (index === -1) {
return;
}
resolve(socket);
// pause data events until the socket consumer is hooked up. We may
// immediately emit remaining data, but if not there may still be
// microtasks queued which would fire data into the abyss.
socket.pauseData();
const rest = dataSoFar.slice(index + socketRawEndHeaderSequence.byteLength);
if (rest.byteLength) {
half.onData.fire(rest);
}
}));
@ -126,7 +141,14 @@ export class ManagedSocket extends Disposable implements ISocket {
}).finally(() => d.dispose());
}
public onData: Event<VSBuffer>;
private readonly pausableDataEmitter = this._register(new PauseableEmitter<VSBuffer>());
public onData: Event<VSBuffer> = (...args) => {
if (this.pausableDataEmitter.isPaused) {
queueMicrotask(() => this.pausableDataEmitter.resume());
}
return this.pausableDataEmitter.event(...args);
};
public onClose: Event<SocketCloseEvent>;
public onEnd: Event<void>;
@ -144,11 +166,19 @@ export class ManagedSocket extends Disposable implements ISocket {
onEndEmitter: Emitter<void>,
) {
super();
this._register(onDataEmitter);
this._register(onDataEmitter.event(data => this.pausableDataEmitter.fire(data)));
this.onClose = this._register(onCloseEmitter).event;
this.onData = this._register(onDataEmitter).event;
this.onEnd = this._register(onEndEmitter).event;
}
/** Pauses data events until a new listener comes in onData() */
pauseData() {
this.pausableDataEmitter.pause();
}
write(buffer: VSBuffer): void {
this.proxy.$remoteSocketWrite(this.socketId, buffer);
}

View file

@ -71,19 +71,19 @@ export class ExtHostManagedSockets implements IExtHostManagedSockets {
}
$remoteSocketWrite(socketId: number, buffer: VSBuffer): void {
this._managedRemoteSockets.get(socketId)?.actual.dataHandler(buffer.buffer);
this._managedRemoteSockets.get(socketId)?.actual.send(buffer.buffer);
}
$remoteSocketEnd(socketId: number): void {
const socket = this._managedRemoteSockets.get(socketId);
if (socket) {
socket.actual.endHandler();
socket.actual.end();
socket.dispose();
}
}
$remoteSocketDrain(socketId: number): Promise<void> {
return this._managedRemoteSockets.get(socketId)?.actual.drainHandler?.() ?? Promise.resolve();
async $remoteSocketDrain(socketId: number): Promise<void> {
await this._managedRemoteSockets.get(socketId)?.actual.drain?.();
}
}

View file

@ -0,0 +1,104 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import * as assert from 'assert';
import { disposableTimeout, timeout } from 'vs/base/common/async';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter } from 'vs/base/common/event';
import { DisposableStore } from 'vs/base/common/lifecycle';
import { SocketCloseEvent } from 'vs/base/parts/ipc/common/ipc.net';
import { mock } from 'vs/base/test/common/mock';
import { ManagedSocket, RemoteSocketHalf } from 'vs/workbench/api/browser/mainThreadManagedSockets';
import { ExtHostManagedSocketsShape } from 'vs/workbench/api/common/extHost.protocol';
suite('MainThreadManagedSockets', () => {
suite('ManagedSocket', () => {
let extHost: ExtHostMock;
let half: RemoteSocketHalf;
class ExtHostMock extends mock<ExtHostManagedSocketsShape>() {
private onDidFire = new Emitter<void>();
public readonly events: any[] = [];
override $remoteSocketWrite(socketId: number, buffer: VSBuffer): void {
this.events.push({ socketId, data: buffer.toString() });
this.onDidFire.fire();
}
override $remoteSocketDrain(socketId: number) {
this.events.push({ socketId, event: 'drain' });
this.onDidFire.fire();
return Promise.resolve();
}
override $remoteSocketEnd(socketId: number) {
this.events.push({ socketId, event: 'end' });
this.onDidFire.fire();
}
expectEvent(test: (evt: any) => void, message: string) {
if (this.events.some(test)) {
return;
}
const d = new DisposableStore();
return new Promise<void>(resolve => {
d.add(this.onDidFire.event(() => {
if (this.events.some(test)) {
return;
}
}));
d.add(disposableTimeout(() => {
throw new Error(`Expected ${message} but only had ${JSON.stringify(this.events, null, 2)}`);
}, 1000));
}).finally(() => d.dispose());
}
}
setup(() => {
extHost = new ExtHostMock();
half = {
onClose: new Emitter<SocketCloseEvent>(),
onData: new Emitter<VSBuffer>(),
onEnd: new Emitter<void>(),
};
});
async function doConnect() {
const socket = ManagedSocket.connect(1, extHost, '/hello', 'world=true', '', half);
await extHost.expectEvent(evt => evt.data && evt.data.startsWith('GET ws://localhost/hello?world=true&skipWebSocketFrames=true HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Key:'), 'websocket open event');
half.onData.fire(VSBuffer.fromString('Opened successfully ;)\r\n\r\n'));
return await socket;
}
test('connects', async () => {
await doConnect();
});
test('includes trailing connection data', async () => {
const socketProm = ManagedSocket.connect(1, extHost, '/hello', 'world=true', '', half);
await extHost.expectEvent(evt => evt.data && evt.data.includes('GET ws://localhost'), 'websocket open event');
half.onData.fire(VSBuffer.fromString('Opened successfully ;)\r\n\r\nSome trailing data'));
const socket = await socketProm;
const data: string[] = [];
socket.onData(d => data.push(d.toString()));
await timeout(1); // allow microtasks to flush
assert.deepStrictEqual(data, ['Some trailing data']);
});
test('round trips data', async () => {
const socket = await doConnect();
const data: string[] = [];
socket.onData(d => data.push(d.toString()));
socket.write(VSBuffer.fromString('ping'));
await extHost.expectEvent(evt => evt.data === 'ping', 'expected ping');
half.onData.fire(VSBuffer.fromString("pong"));
assert.deepStrictEqual(data, ['pong']);
});
});
});

View file

@ -63,7 +63,7 @@ export class NativeWorkbenchEnvironmentService extends AbstractNativeEnvironment
get remoteAuthority() { return this.configuration.remoteAuthority; }
@memoize
get expectsResolverExtension() { return !!this.configuration.remoteAuthority; }
get expectsResolverExtension() { return !!this.configuration.remoteAuthority?.includes('+'); }
@memoize
get execPath() { return this.configuration.execPath; }

View file

@ -7,7 +7,7 @@ import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteA
import { IRemoteExtensionsScannerService, RemoteExtensionsScannerChannelName } from 'vs/platform/remote/common/remoteExtensionsScanner';
import * as platform from 'vs/base/common/platform';
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
import { IExtensionDescription, IRelaxedExtensionDescription } from 'vs/platform/extensions/common/extensions';
import { ExtensionIdentifier, IExtensionDescription, IRelaxedExtensionDescription } from 'vs/platform/extensions/common/extensions';
import { URI } from 'vs/base/common/uri';
import { IUserDataProfileService } from 'vs/workbench/services/userDataProfile/common/userDataProfile';
import { IRemoteUserDataProfilesService } from 'vs/workbench/services/userDataProfile/common/remoteUserDataProfiles';
@ -45,6 +45,7 @@ class RemoteExtensionsScannerService implements IRemoteExtensionsScannerService
const scannedExtensions = await channel.call<IRelaxedExtensionDescription[]>('scanExtensions', [platform.language, profileLocation, this.environmentService.extensionDevelopmentLocationURI, languagePack]);
scannedExtensions.forEach((extension) => {
extension.extensionLocation = URI.revive(extension.extensionLocation);
extension.identifier = new ExtensionIdentifier(extension.identifier.value);
});
return scannedExtensions;
},

View file

@ -31,9 +31,9 @@ declare module 'vscode' {
onDidClose: Event<Error | undefined>;
onDidEnd: Event<void>;
dataHandler: (data: Uint8Array) => void;
endHandler: () => void;
drainHandler?: () => void;
send: (data: Uint8Array) => void;
end: () => void;
drain?: () => Thenable<void>;
}
export class ManagedResolvedAuthority {