cli: add tunnel kill/restart subcommands (#177286)

This commit is contained in:
Connor Peet 2023-03-15 15:09:12 -07:00 committed by GitHub
parent 54921d7173
commit abab52dd94
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 177 additions and 70 deletions

View file

@ -91,6 +91,8 @@ async fn main() -> Result<(), std::convert::Infallible> {
Some(args::Commands::Tunnel(tunnel_args)) => match tunnel_args.subcommand {
Some(args::TunnelSubcommand::Prune) => tunnels::prune(context).await,
Some(args::TunnelSubcommand::Unregister) => tunnels::unregister(context).await,
Some(args::TunnelSubcommand::Kill) => tunnels::kill(context).await,
Some(args::TunnelSubcommand::Restart) => tunnels::restart(context).await,
Some(args::TunnelSubcommand::Rename(rename_args)) => {
tunnels::rename(context, rename_args).await
}

View file

@ -627,6 +627,12 @@ pub enum TunnelSubcommand {
/// Delete all servers which are currently not running.
Prune,
/// Stops any running tunnel on the system.
Kill,
/// Restarts any running tunnel on the system.
Restart,
/// Rename the name of this machine associated with port forwarding service.
Rename(TunnelRenameArgs),

View file

@ -7,6 +7,7 @@ use async_trait::async_trait;
use sha2::{Digest, Sha256};
use std::{str::FromStr, time::Duration};
use sysinfo::Pid;
use tokio::sync::mpsc;
use super::{
args::{
@ -17,13 +18,17 @@ use super::{
};
use crate::{
async_pipe::socket_stream_split,
auth::Auth,
json_rpc::{new_json_rpc, start_json_rpc},
log::{self, Logger},
singleton::connect_as_client,
state::LauncherPaths,
tunnels::{
code_server::CodeServerArgs,
create_service_manager, dev_tunnels, legal,
paths::get_all_servers,
protocol,
shutdown_signal::ShutdownRequest,
singleton_server::{
make_singleton_server, start_singleton_server, BroadcastLogSink, SingletonServerArgs,
@ -31,7 +36,7 @@ use crate::{
Next, ServiceContainer, ServiceManager,
},
util::{
errors::{wrap, AnyError},
errors::{wrap, AnyError, CodeError},
prereqs::PreReqChecker,
},
};
@ -198,6 +203,68 @@ pub async fn unregister(ctx: CommandContext) -> Result<i32, AnyError> {
Ok(0)
}
async fn do_single_rpc_call(
ctx: CommandContext,
method: &'static str,
params: impl serde::Serialize,
) -> Result<i32, AnyError> {
let client = match connect_as_client(&ctx.paths.tunnel_lockfile()).await {
Ok(p) => p,
Err(CodeError::SingletonLockfileOpenFailed(_))
| Err(CodeError::SingletonLockedProcessExited(_)) => {
error!(ctx.log, "No tunnel is running");
return Ok(1);
}
Err(e) => return Err(e.into()),
};
let (msg_tx, msg_rx) = mpsc::unbounded_channel();
let mut rpc = new_json_rpc();
let caller = rpc.get_caller(msg_tx);
let (read, write) = socket_stream_split(client);
let log = ctx.log.clone();
let rpc = tokio::spawn(async move {
start_json_rpc(
rpc.methods(()).build(log),
read,
write,
msg_rx,
ShutdownRequest::create_rx([ShutdownRequest::CtrlC]),
)
.await
.unwrap();
});
let r = caller.call::<_, _, ()>(method, params).await.unwrap();
rpc.abort();
if let Err(r) = r {
error!(ctx.log, "RPC call failed: {:?}", r);
return Ok(1);
}
Ok(0)
}
pub async fn restart(ctx: CommandContext) -> Result<i32, AnyError> {
do_single_rpc_call(
ctx,
protocol::singleton::METHOD_RESTART,
protocol::EmptyObject {},
)
.await
}
pub async fn kill(ctx: CommandContext) -> Result<i32, AnyError> {
do_single_rpc_call(
ctx,
protocol::singleton::METHOD_SHUTDOWN,
protocol::EmptyObject {},
)
.await
}
/// Removes unused servers.
pub async fn prune(ctx: CommandContext) -> Result<i32, AnyError> {
get_all_servers(&ctx.paths)
@ -275,7 +342,7 @@ async fn serve_with_csa(
return Ok(0);
}
match acquire_singleton(paths.root().join("tunnel.lock")).await {
match acquire_singleton(paths.tunnel_lockfile()).await {
Ok(SingletonConnection::Client(stream)) => {
debug!(log, "starting as client to singleton");
let should_exit = start_singleton_client(SingletonClientArgs {

View file

@ -77,7 +77,7 @@ pub async fn start_json_rpc<C: Send + Sync + 'static, S: Clone>(
match r {
MaybeSync::Sync(Some(v)) => {
write_tx.send(v).ok();
write.write_all(&v).await?;
},
MaybeSync::Sync(None) => continue,
MaybeSync::Future(fut) => {

View file

@ -303,11 +303,11 @@ pub fn format(level: Level, prefix: &str, message: &str) -> String {
if let Some(c) = level.color_code() {
format!(
"\x1b[2m[{}]\x1b[0m {}{}\x1b[0m {}{}\r\n",
"\x1b[2m[{}]\x1b[0m {}{}\x1b[0m {}{}\n",
timestamp, c, name, prefix, message
)
} else {
format!("[{}] {} {}{}\r\n", timestamp, name, prefix, message)
format!("[{}] {} {}{}\n", timestamp, name, prefix, message)
}
}

View file

@ -229,12 +229,7 @@ impl<S: Serialization> RpcCaller<S> {
}
/// Enqueues an outbound call, returning its result.
#[allow(dead_code)]
pub async fn call<M, A, R>(
&self,
method: M,
params: A,
) -> oneshot::Receiver<Result<R, ResponseError>>
pub fn call<M, A, R>(&self, method: M, params: A) -> oneshot::Receiver<Result<R, ResponseError>>
where
M: AsRef<str> + serde::Serialize,
A: Serialize,
@ -330,13 +325,11 @@ impl<S: Serialization, C: Send + Sync> RpcDispatcher<S, C> {
cb(Outcome::Error(err));
}
MaybeSync::Sync(None)
} else if partial.result.is_some() {
} else {
if let Some(cb) = self.calls.lock().unwrap().remove(&id.unwrap()) {
cb(Outcome::Success(body.to_vec()));
}
MaybeSync::Sync(None)
} else {
MaybeSync::Sync(None)
}
}
@ -354,7 +347,6 @@ struct PartialIncoming {
pub id: Option<u32>,
pub method: Option<String>,
pub error: Option<ResponseError>,
pub result: Option<()>,
}
#[derive(Serialize)]
@ -381,7 +373,7 @@ struct ErrorResponse {
pub error: ResponseError,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct ResponseError {
pub code: i32,
pub message: String,

View file

@ -106,7 +106,7 @@ fn validate_cli_is_good(exe_path: &Path) -> Result<(), AnyError> {
if !o.status.success() {
let msg = format!(
"could not execute new binary, aborting. Stdout:\r\n\r\n{}\r\n\r\nStderr:\r\n\r\n{}",
"could not execute new binary, aborting. Stdout:\n\n{}\n\nStderr:\n\n{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr),
);

View file

@ -51,6 +51,8 @@ struct LockFileMatter {
pid: u32,
}
/// Tries to acquire the singleton homed at the given lock file, either starting
/// a new singleton if it doesn't exist, or connecting otherwise.
pub async fn acquire_singleton(lock_file: PathBuf) -> Result<SingletonConnection, CodeError> {
let file = OpenOptions::new()
.read(true)
@ -60,13 +62,27 @@ pub async fn acquire_singleton(lock_file: PathBuf) -> Result<SingletonConnection
.map_err(CodeError::SingletonLockfileOpenFailed)?;
match FileLock::acquire(file) {
Ok(Lock::AlreadyLocked(mut file)) => connect_as_client(&mut file).await,
Ok(Lock::Acquired(lock)) => start_singleton_server(lock).await,
Ok(Lock::AlreadyLocked(mut file)) => connect_as_client_with_file(&mut file)
.await
.map(SingletonConnection::Client),
Ok(Lock::Acquired(lock)) => start_singleton_server(lock)
.await
.map(SingletonConnection::Singleton),
Err(e) => Err(e),
}
}
async fn start_singleton_server(mut lock: FileLock) -> Result<SingletonConnection, CodeError> {
/// Tries to connect to the singleton homed at the given file as a client.
pub async fn connect_as_client(lock_file: &Path) -> Result<AsyncPipe, CodeError> {
let mut file = OpenOptions::new()
.read(true)
.open(lock_file)
.map_err(CodeError::SingletonLockfileOpenFailed)?;
connect_as_client_with_file(&mut file).await
}
async fn start_singleton_server(mut lock: FileLock) -> Result<SingletonServer, CodeError> {
let socket_path = get_socket_name();
let mut vec = Vec::with_capacity(128);
@ -84,15 +100,15 @@ async fn start_singleton_server(mut lock: FileLock) -> Result<SingletonConnectio
.map_err(CodeError::SingletonLockfileOpenFailed)?;
let server = listen_socket_rw_stream(&socket_path).await?;
Ok(SingletonConnection::Singleton(SingletonServer {
Ok(SingletonServer {
server,
_lock: lock,
}))
})
}
const MAX_CLIENT_ATTEMPTS: i32 = 10;
async fn connect_as_client(mut file: &mut File) -> Result<SingletonConnection, CodeError> {
async fn connect_as_client_with_file(mut file: &mut File) -> Result<AsyncPipe, CodeError> {
// retry, since someone else could get a lock and we could read it before
// the JSON info was finished writing out
let mut attempt = 0;
@ -104,14 +120,14 @@ async fn connect_as_client(mut file: &mut File) -> Result<SingletonConnection, C
tokio::select! {
p = retry_get_socket_rw_stream(&socket_path, 5, Duration::from_millis(500)) => p,
_ = wait_until_process_exits(Pid::from_u32(prev.pid), 500) => Err(CodeError::SingletonLockedProcessExited(prev.pid)),
_ = wait_until_process_exits(Pid::from_u32(prev.pid), 500) => return Err(CodeError::SingletonLockedProcessExited(prev.pid)),
}
}
Err(e) => Err(CodeError::SingletonLockfileReadFailed(e)),
};
if r.is_ok() || attempt == MAX_CLIENT_ATTEMPTS {
return r.map(SingletonConnection::Client);
return r;
}
attempt += 1;

View file

@ -137,6 +137,11 @@ impl LauncherPaths {
&self.root
}
/// Lockfile for the running tunnel
pub fn tunnel_lockfile(&self) -> PathBuf {
self.root.join("tunnel.lock")
}
/// Suggested path for tunnel service logs, when using file logs
pub fn service_log_file(&self) -> PathBuf {
self.root.join("tunnel-service.log")

View file

@ -10,6 +10,7 @@ pub mod paths;
pub mod shutdown_signal;
pub mod singleton_client;
pub mod singleton_server;
pub mod protocol;
mod control_server;
mod nosleep;
@ -20,7 +21,6 @@ mod nosleep_macos;
#[cfg(target_os = "windows")]
mod nosleep_windows;
mod port_forwarder;
mod protocol;
mod server_bridge;
mod server_multiplexer;
mod service;

View file

@ -150,7 +150,7 @@ fn print_listening(log: &log::Logger, tunnel_name: &str) {
}
}
let message = &format!("\r\nOpen this link in your browser {}\r\n", addr);
let message = &format!("\nOpen this link in your browser {}\n", addr);
log.result(message);
}

View file

@ -162,6 +162,10 @@ pub mod singleton {
use crate::log;
use serde::{Deserialize, Serialize};
pub const METHOD_RESTART: &str = "restart";
pub const METHOD_SHUTDOWN: &str = "shutdown";
pub const METHOD_LOG: &str = "log";
#[derive(Serialize)]
pub struct LogMessage<'a> {
pub level: Option<log::Level>,

View file

@ -48,17 +48,22 @@ pub async fn start_singleton_client(args: SingletonClientArgs) -> bool {
let stdin_handle = rpc.get_caller(msg_tx);
thread::spawn(move || {
let term = console::Term::stderr();
let mut input = String::new();
loop {
match term.read_key() {
Ok(console::Key::Char('x')) => {
stdin_handle.notify("shutdown", EmptyObject {});
}
Ok(console::Key::Char('r')) => {
stdin_handle.notify("restart", EmptyObject {});
}
Err(_) => return, // EOF or not a tty
match std::io::stdin().read_line(&mut input) {
Err(_) | Ok(0) => return, // EOF or not a tty
_ => {}
};
match input.chars().next().map(|c| c.to_ascii_lowercase()) {
Some('x') => {
stdin_handle.notify(protocol::singleton::METHOD_SHUTDOWN, EmptyObject {});
return;
}
Some('r') => {
stdin_handle.notify(protocol::singleton::METHOD_RESTART, EmptyObject {});
}
Some(_) | None => {}
}
}
});
@ -68,18 +73,21 @@ pub async fn start_singleton_client(args: SingletonClientArgs) -> bool {
exit_entirely: exit_entirely.clone(),
});
rpc.register_sync("shutdown", |_: EmptyObject, c| {
rpc.register_sync(protocol::singleton::METHOD_SHUTDOWN, |_: EmptyObject, c| {
c.exit_entirely.store(true, Ordering::SeqCst);
Ok(())
});
rpc.register_sync("log", |log: protocol::singleton::LogMessageOwned, c| {
match log.level {
Some(level) => c.log.emit(level, &format!("{}{}", log.prefix, log.message)),
None => c.log.result(format!("{}{}", log.prefix, log.message)),
}
Ok(())
});
rpc.register_sync(
protocol::singleton::METHOD_LOG,
|log: protocol::singleton::LogMessageOwned, c| {
match log.level {
Some(level) => c.log.emit(level, &format!("{}{}", log.prefix, log.message)),
None => c.log.result(format!("{}{}", log.prefix, log.message)),
}
Ok(())
},
);
let (read, write) = socket_stream_split(args.stream);
let _ = start_json_rpc(rpc.build(args.log), read, write, msg_rx, args.shutdown).await;

View file

@ -74,25 +74,31 @@ pub fn make_singleton_server(
broadcast_tx: log_broadcast.get_brocaster(),
});
rpc.register_sync("restart", |_: protocol::EmptyObject, ctx| {
info!(ctx.log, "restarting tunnel after client request");
let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcRestartRequested);
Ok(())
});
rpc.register_sync(
protocol::singleton::METHOD_RESTART,
|_: protocol::EmptyObject, ctx| {
info!(ctx.log, "restarting tunnel after client request");
let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcRestartRequested);
Ok(())
},
);
rpc.register_sync("shutdown", |_: protocol::EmptyObject, ctx| {
info!(
ctx.log,
"closing tunnel and all clients after a shutdown request"
);
let _ = ctx.broadcast_tx.send(RpcCaller::serialize_notify(
&JsonRpcSerializer {},
"shutdown",
protocol::EmptyObject {},
));
let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcShutdownRequested);
Ok(())
});
rpc.register_sync(
protocol::singleton::METHOD_SHUTDOWN,
|_: protocol::EmptyObject, ctx| {
info!(
ctx.log,
"closing tunnel and all clients after a shutdown request"
);
let _ = ctx.broadcast_tx.send(RpcCaller::serialize_notify(
&JsonRpcSerializer {},
protocol::singleton::METHOD_SHUTDOWN,
protocol::EmptyObject {},
));
let _ = ctx.shutdown_tx.send(ShutdownSignal::RpcShutdownRequested);
Ok(())
},
);
// we tokio spawn instead of keeping a future, since we want it to progress
// even outside of the start_singleton_server loop (i.e. while the tunnel restarts)
@ -163,11 +169,12 @@ async fn serve_singleton_rpc<C: Clone + Send + Sync + 'static>(
}
const CONTROL_INSTRUCTIONS: &str =
"Connected to an existing tunnel process running on this machine. You can press:\r
\r
- Ctrl+C to detach\r
- \"x\" to stop the tunnel and exit\r
- \"r\" to restart the tunnel\r";
"Connected to an existing tunnel process running on this machine. You can press:
- Ctrl+C to detach
- \"x\" + Enter to stop the tunnel and exit
- \"r\" + Enter to restart the tunnel
";
/// Log sink that can broadcast and replay log events. Used for transmitting
/// logs from the singleton to all clients. This should be created and injected
@ -209,7 +216,7 @@ impl BroadcastLogSink {
let _ = log_replay_tx.send(RpcCaller::serialize_notify(
&JsonRpcSerializer {},
"log",
protocol::singleton::METHOD_LOG,
protocol::singleton::LogMessage {
level: None,
prefix: "",
@ -226,7 +233,7 @@ impl log::LogSink for BroadcastLogSink {
let s = JsonRpcSerializer {};
let serialized = RpcCaller::serialize_notify(
&s,
"log",
protocol::singleton::METHOD_LOG,
protocol::singleton::LogMessage {
level: Some(level),
prefix,

View file

@ -171,7 +171,7 @@ impl std::fmt::Display for SetupError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}\r\n\r\nMore info at {}/remote/linux",
"{}\n\nMore info at {}/remote/linux",
DOCUMENTATION_URL.unwrap_or("<docs>"),
self.0
)