net: gracefully disconnect better from remote

This adds in a step to gracefully disconnect (send a "disconnect"
message and then await the flush) before closing the workbench. In some
cases, like Remote - SSH, sending messages is more async than others. In
the exec server connection we handle a `zlib.flate` stream to compress
data to and from the VS Code server, and its API is asynchronous, so
this lets us ensure the stream is drained before giving the go-ahead
to close up shop

This lifecycle phase is a little awkward and I ended it putting it
directly in the lifecycle service: we don't want to do this in
`onWillShutdown` because some contributions want to save data to the
remote workspace, and if shutdown is vetoed it would leave a broken
state. But `onDidShutdown` is synchronous and already depended upon by
several other points, and changing that also felt a bit risky.

cc @alexdima

Refs #211462, will require some small adoption in Remote - SSH to close.
This commit is contained in:
Connor Peet 2024-06-27 12:35:29 -07:00
parent aea213b7fc
commit 4c0517e140
No known key found for this signature in database
GPG Key ID: CF8FD2EA0DBC61BD
7 changed files with 70 additions and 24 deletions

View File

@ -597,6 +597,8 @@ export class Client<TContext = string> extends IPCClient<TContext> {
override dispose(): void {
super.dispose();
const socket = this.protocol.getSocket();
// should be sent gracefully with a .flush(), but try to send it out as a
// last resort here if nothing else:
this.protocol.sendDisconnect();
this.protocol.dispose();
socket.end();
@ -808,6 +810,7 @@ export interface PersistentProtocolOptions {
export class PersistentProtocol implements IMessagePassingProtocol {
private _isReconnecting: boolean;
private _didSendDisconnect?: boolean;
private _outgoingUnackMsg: Queue<ProtocolMessage>;
private _outgoingMsgId: number;
@ -910,9 +913,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
}
sendDisconnect(): void {
const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
this._socketWriter.flush();
if (!this._didSendDisconnect) {
this._didSendDisconnect = true;
const msg = new ProtocolMessage(ProtocolMessageType.Disconnect, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
this._socketWriter.flush();
}
}
sendPause(): void {

View File

@ -3,13 +3,13 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { Emitter } from 'vs/base/common/event';
import { Barrier } from 'vs/base/common/async';
import { Emitter } from 'vs/base/common/event';
import { Disposable } from 'vs/base/common/lifecycle';
import { ILifecycleService, WillShutdownEvent, StartupKind, LifecyclePhase, LifecyclePhaseToString, ShutdownReason, BeforeShutdownErrorEvent, InternalBeforeShutdownEvent } from 'vs/workbench/services/lifecycle/common/lifecycle';
import { ILogService } from 'vs/platform/log/common/log';
import { mark } from 'vs/base/common/performance';
import { ILogService } from 'vs/platform/log/common/log';
import { IStorageService, StorageScope, StorageTarget, WillSaveStateReason } from 'vs/platform/storage/common/storage';
import { BeforeShutdownErrorEvent, ILifecycleService, InternalBeforeShutdownEvent, LifecyclePhase, LifecyclePhaseToString, ShutdownReason, StartupKind, WillShutdownEvent } from 'vs/workbench/services/lifecycle/common/lifecycle';
export abstract class AbstractLifecycleService extends Disposable implements ILifecycleService {
@ -44,7 +44,7 @@ export abstract class AbstractLifecycleService extends Disposable implements ILi
constructor(
@ILogService protected readonly logService: ILogService,
@IStorageService protected readonly storageService: IStorageService
@IStorageService protected readonly storageService: IStorageService,
) {
super();

View File

@ -11,19 +11,22 @@ import { ILogService } from 'vs/platform/log/common/log';
import { AbstractLifecycleService } from 'vs/workbench/services/lifecycle/common/lifecycleService';
import { InstantiationType, registerSingleton } from 'vs/platform/instantiation/common/extensions';
import { INativeHostService } from 'vs/platform/native/common/native';
import { Promises, disposableTimeout, raceCancellation } from 'vs/base/common/async';
import { Promises, disposableTimeout, raceCancellation, timeout } from 'vs/base/common/async';
import { toErrorMessage } from 'vs/base/common/errorMessage';
import { CancellationTokenSource } from 'vs/base/common/cancellation';
import { IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService';
export class NativeLifecycleService extends AbstractLifecycleService {
private static readonly BEFORE_SHUTDOWN_WARNING_DELAY = 5000;
private static readonly WILL_SHUTDOWN_WARNING_DELAY = 800;
private static readonly MAX_GRACEFUL_REMOTE_DISCONNECT_TIME = 3000;
constructor(
@INativeHostService private readonly nativeHostService: INativeHostService,
@IStorageService storageService: IStorageService,
@ILogService logService: ILogService
@ILogService logService: ILogService,
@IRemoteAgentService private readonly remoteAgentService: IRemoteAgentService,
) {
super(logService, storageService);
@ -66,6 +69,10 @@ export class NativeLifecycleService extends AbstractLifecycleService {
// trigger onWillShutdown events and joining
await this.handleWillShutdown(reply.reason);
// now that all services have stored their data, it's safe to terminate
// the remote connection gracefully before synchronously saying we're shut down
await this.handleRemoteAgentDisconnect();
// trigger onDidShutdown event now that we know we will quit
this._onDidShutdown.fire();
@ -74,6 +81,19 @@ export class NativeLifecycleService extends AbstractLifecycleService {
});
}
private async handleRemoteAgentDisconnect(): Promise<void> {
const longRunningWarning = disposableTimeout(() => {
this.logService.warn(`[lifecycle] the remote agent is taking a long time to disconnect, waiting...`);
}, NativeLifecycleService.BEFORE_SHUTDOWN_WARNING_DELAY);
await Promise.race([
this.remoteAgentService.endConnection(),
timeout(NativeLifecycleService.MAX_GRACEFUL_REMOTE_DISCONNECT_TIME),
]);
longRunningWarning.dispose();
}
protected async handleBeforeShutdown(reason: ShutdownReason): Promise<boolean> {
const logService = this.logService;

View File

@ -27,7 +27,7 @@ export class RemoteAgentService extends AbstractRemoteAgentService implements IR
@IProductService productService: IProductService,
@IRemoteAuthorityResolverService remoteAuthorityResolverService: IRemoteAuthorityResolverService,
@ISignService signService: ISignService,
@ILogService logService: ILogService
@ILogService logService: ILogService,
) {
super(remoteSocketFactoryService, userDataProfileService, environmentService, productService, remoteAuthorityResolverService, signService, logService);
}

View File

@ -3,23 +3,23 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { Disposable } from 'vs/base/common/lifecycle';
import { IChannel, IServerChannel, getDelayedChannel, IPCLogger } from 'vs/base/parts/ipc/common/ipc';
import { Client } from 'vs/base/parts/ipc/common/ipc.net';
import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService';
import { connectRemoteAgentManagement, IConnectionOptions, ManagementPersistentConnection, PersistentConnectionEvent } from 'vs/platform/remote/common/remoteAgentConnection';
import { IExtensionHostExitInfo, IRemoteAgentConnection, IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService';
import { IRemoteAuthorityResolverService } from 'vs/platform/remote/common/remoteAuthorityResolver';
import { RemoteAgentConnectionContext, IRemoteAgentEnvironment } from 'vs/platform/remote/common/remoteAgentEnvironment';
import { RemoteExtensionEnvironmentChannelClient } from 'vs/workbench/services/remote/common/remoteAgentEnvironmentChannel';
import { IDiagnosticInfoOptions, IDiagnosticInfo } from 'vs/platform/diagnostics/common/diagnostics';
import { Emitter } from 'vs/base/common/event';
import { ISignService } from 'vs/platform/sign/common/sign';
import { Disposable } from 'vs/base/common/lifecycle';
import { getDelayedChannel, IChannel, IPCLogger, IServerChannel } from 'vs/base/parts/ipc/common/ipc';
import { Client } from 'vs/base/parts/ipc/common/ipc.net';
import { IDiagnosticInfo, IDiagnosticInfoOptions } from 'vs/platform/diagnostics/common/diagnostics';
import { ILogService } from 'vs/platform/log/common/log';
import { ITelemetryData, TelemetryLevel } from 'vs/platform/telemetry/common/telemetry';
import { IProductService } from 'vs/platform/product/common/productService';
import { IUserDataProfileService } from 'vs/workbench/services/userDataProfile/common/userDataProfile';
import { connectRemoteAgentManagement, IConnectionOptions, ManagementPersistentConnection, PersistentConnectionEvent } from 'vs/platform/remote/common/remoteAgentConnection';
import { IRemoteAgentEnvironment, RemoteAgentConnectionContext } from 'vs/platform/remote/common/remoteAgentEnvironment';
import { IRemoteAuthorityResolverService } from 'vs/platform/remote/common/remoteAuthorityResolver';
import { IRemoteSocketFactoryService } from 'vs/platform/remote/common/remoteSocketFactoryService';
import { ISignService } from 'vs/platform/sign/common/sign';
import { ITelemetryData, TelemetryLevel } from 'vs/platform/telemetry/common/telemetry';
import { IWorkbenchEnvironmentService } from 'vs/workbench/services/environment/common/environmentService';
import { RemoteExtensionEnvironmentChannelClient } from 'vs/workbench/services/remote/common/remoteAgentEnvironmentChannel';
import { IExtensionHostExitInfo, IRemoteAgentConnection, IRemoteAgentService } from 'vs/workbench/services/remote/common/remoteAgentService';
import { IUserDataProfileService } from 'vs/workbench/services/userDataProfile/common/userDataProfile';
export abstract class AbstractRemoteAgentService extends Disposable implements IRemoteAgentService {
@ -35,7 +35,7 @@ export abstract class AbstractRemoteAgentService extends Disposable implements I
@IProductService productService: IProductService,
@IRemoteAuthorityResolverService private readonly _remoteAuthorityResolverService: IRemoteAuthorityResolverService,
@ISignService signService: ISignService,
@ILogService logService: ILogService
@ILogService logService: ILogService,
) {
super();
if (this._environmentService.remoteAuthority) {
@ -114,6 +114,13 @@ export abstract class AbstractRemoteAgentService extends Disposable implements I
);
}
async endConnection(): Promise<void> {
if (this._connection) {
await this._connection.end();
this._connection.dispose();
}
}
private _withChannel<R>(callback: (channel: IChannel, connection: IRemoteAgentConnection) => Promise<R>, fallback: R): Promise<R> {
const connection = this.getConnection();
if (!connection) {
@ -159,6 +166,8 @@ class RemoteAgentConnection extends Disposable implements IRemoteAgentConnection
this._connection = null;
}
end: () => Promise<void> = () => Promise.resolve();
getChannel<T extends IChannel>(channelName: string): T {
return <T>getDelayedChannel(this._getOrCreateConnection().then(c => c.getChannel(channelName)));
}
@ -222,6 +231,10 @@ class RemoteAgentConnection extends Disposable implements IRemoteAgentConnection
connection.protocol.onDidDispose(() => {
connection.dispose();
});
this.end = () => {
connection.protocol.sendDisconnect();
return connection.protocol.drain();
};
this._register(connection.onDidStateChange(e => this._onDidStateChange.fire(e)));
return connection.client;
}

View File

@ -37,6 +37,11 @@ export interface IRemoteAgentService {
*/
getRoundTripTime(): Promise<number | undefined>;
/**
* Gracefully ends the current connection, if any.
*/
endConnection(): Promise<void>;
getDiagnosticInfo(options: IDiagnosticInfoOptions): Promise<IDiagnosticInfo | undefined>;
updateTelemetryLevel(telemetryLevel: TelemetryLevel): Promise<void>;
logTelemetry(eventName: string, data?: ITelemetryData): Promise<void>;
@ -54,6 +59,7 @@ export interface IRemoteAgentConnection {
readonly onReconnecting: Event<void>;
readonly onDidStateChange: Event<PersistentConnectionEvent>;
end(): Promise<void>;
dispose(): void;
getChannel<T extends IChannel>(channelName: string): T;
withChannel<T extends IChannel, R>(channelName: string, callback: (channel: T) => Promise<R>): Promise<R>;

View File

@ -2117,6 +2117,7 @@ export class TestRemoteAgentService implements IRemoteAgentService {
async logTelemetry(eventName: string, data?: ITelemetryData): Promise<void> { }
async flushTelemetry(): Promise<void> { }
async getRoundTripTime(): Promise<number | undefined> { return undefined; }
async endConnection(): Promise<void> { }
}
export class TestRemoteExtensionsScannerService implements IRemoteExtensionsScannerService {