add observability for windows

This commit is contained in:
Connor Peet 2022-11-15 20:35:38 -08:00
parent 1bf3323015
commit e996746b14
No known key found for this signature in database
GPG key ID: CF8FD2EA0DBC61BD
7 changed files with 79 additions and 51 deletions

View file

@ -74,7 +74,7 @@ impl ServiceContainer for TunnelServiceContainer {
&mut self,
log: log::Logger,
launcher_paths: LauncherPaths,
shutdown_rx: mpsc::Receiver<ShutdownSignal>,
shutdown_rx: mpsc::UnboundedReceiver<ShutdownSignal>,
) -> Result<(), AnyError> {
let csa = (&self.args).into();
serve_with_csa(
@ -126,7 +126,6 @@ pub async fn service(
let current_exe =
std::env::current_exe().map_err(|e| wrap(e, "could not get current exe"))?;
println!("calling register");
manager
.register(
current_exe,
@ -240,7 +239,7 @@ async fn serve_with_csa(
log: Logger,
gateway_args: TunnelServeArgs,
csa: CodeServerArgs,
shutdown_rx: Option<mpsc::Receiver<ShutdownSignal>>,
shutdown_rx: Option<mpsc::UnboundedReceiver<ShutdownSignal>>,
) -> Result<i32, AnyError> {
// Intentionally read before starting the server. If the server updated and
// respawn is requested, the old binary will get renamed, and then
@ -260,7 +259,7 @@ async fn serve_with_csa(
let shutdown_tx = if let Some(tx) = shutdown_rx {
tx
} else {
let (tx, rx) = mpsc::channel::<ShutdownSignal>(2);
let (tx, rx) = mpsc::unbounded_channel::<ShutdownSignal>();
if let Some(process_id) = gateway_args.parent_process_id {
match Pid::from_str(&process_id) {
Ok(pid) => {
@ -271,7 +270,7 @@ async fn serve_with_csa(
while s.refresh_process(pid) {
sleep(Duration::from_millis(2000)).await;
}
tx.send(ShutdownSignal::ParentProcessKilled).await.ok();
tx.send(ShutdownSignal::ParentProcessKilled).ok();
});
}
Err(_) => {
@ -281,7 +280,7 @@ async fn serve_with_csa(
}
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tx.send(ShutdownSignal::CtrlC).await.ok();
tx.send(ShutdownSignal::CtrlC).ok();
});
rx
};

View file

@ -167,7 +167,7 @@ pub async fn serve(
launcher_paths: &LauncherPaths,
code_server_args: &CodeServerArgs,
platform: Platform,
shutdown_rx: mpsc::Receiver<ShutdownSignal>,
shutdown_rx: mpsc::UnboundedReceiver<ShutdownSignal>,
) -> Result<ServerTermination, AnyError> {
let mut port = tunnel.add_port_direct(CONTROL_PORT).await?;
print_listening(log, &tunnel.name);

View file

@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use tokio::sync::mpsc;
@ -11,7 +11,8 @@ use tokio::sync::mpsc;
use crate::commands::tunnels::ShutdownSignal;
use crate::log;
use crate::state::LauncherPaths;
use crate::util::errors::AnyError;
use crate::util::errors::{wrap, AnyError};
use crate::util::io::{tailf, TailEvent};
pub const SERVICE_LOG_FILE_NAME: &str = "tunnel-service.log";
@ -21,7 +22,7 @@ pub trait ServiceContainer: Send {
&mut self,
log: log::Logger,
launcher_paths: LauncherPaths,
shutdown_rx: mpsc::Receiver<ShutdownSignal>,
shutdown_rx: mpsc::UnboundedReceiver<ShutdownSignal>,
) -> Result<(), AnyError>;
}
@ -65,10 +66,29 @@ pub fn create_service_manager(log: log::Logger, paths: &LauncherPaths) -> Servic
}
#[cfg(target_os = "windows")]
{
super::service_windows::WindowsService::new(log)
super::service_windows::WindowsService::new(log, paths)
}
#[cfg(target_os = "linux")]
{
super::service_linux::SystemdService::new(log, paths.clone())
}
}
pub(crate) async fn tail_log_file(log_file: &Path) -> Result<(), AnyError> {
if !log_file.exists() {
println!("The tunnel service has not started yet.");
return Ok(());
}
let file = std::fs::File::open(&log_file).map_err(|e| wrap(e, "error opening log file"))?;
let mut rx = tailf(file, 20);
while let Some(line) = rx.recv().await {
match line {
TailEvent::Line(l) => print!("{}", l),
TailEvent::Reset => println!("== Tunnel service restarted =="),
TailEvent::Err(e) => return Err(wrap(e, "error reading log file").into()),
}
}
Ok(())
}

View file

@ -107,10 +107,10 @@ impl ServiceManager for SystemdService {
launcher_paths: crate::state::LauncherPaths,
mut handle: impl 'static + super::ServiceContainer,
) -> Result<(), crate::util::errors::AnyError> {
let (tx, rx) = mpsc::channel::<ShutdownSignal>(1);
let (tx, rx) = mpsc::unbounded_channel::<ShutdownSignal>();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tx.send(ShutdownSignal::CtrlC).await.ok();
tx.send(ShutdownSignal::CtrlC).ok();
});
handle.run_service(self.log, launcher_paths, rx).await
@ -128,14 +128,14 @@ impl ServiceManager for SystemdService {
])
.status()
.map(|s| s.code().unwrap_or(1))
.map_err(|e| wrap(e, format!("error running journalctl")))?;
.map_err(|e| wrap(e, "error running systemctl"))?;
// then follow log files
Command::new("journalctl")
.args(["--user", "-f", "-u", &SystemdService::service_name_string()])
.status()
.map(|s| s.code().unwrap_or(1))
.map_err(|e| wrap(e, format!("error running journalctl")))?;
.map_err(|e| wrap(e, "error running journalctl"))?;
Ok(())
}

View file

@ -20,11 +20,10 @@ use crate::{
util::{
command::capture_command_and_check_status,
errors::{wrap, AnyError, MissingHomeDirectory},
io::{tailf, TailEvent},
},
};
use super::ServiceManager;
use super::{ServiceManager, service::tail_log_file};
pub struct LaunchdService {
log: log::Logger,
@ -67,23 +66,7 @@ impl ServiceManager for LaunchdService {
}
async fn show_logs(&self) -> Result<(), AnyError> {
if !self.log_file.exists() {
println!("The tunnel service has not started yet.");
return Ok(());
}
let file =
std::fs::File::open(&self.log_file).map_err(|e| wrap(e, "error opening log file"))?;
let mut rx = tailf(file, 20);
while let Some(line) = rx.recv().await {
match line {
TailEvent::Line(l) => print!("{}", l),
TailEvent::Reset => println!("== Tunnel service restarted =="),
TailEvent::Err(e) => return Err(wrap(e, "error reading log file").into()),
}
}
Ok(())
tail_log_file(&self.log_file).await
}
async fn run(
@ -91,10 +74,10 @@ impl ServiceManager for LaunchdService {
launcher_paths: crate::state::LauncherPaths,
mut handle: impl 'static + super::ServiceContainer,
) -> Result<(), crate::util::errors::AnyError> {
let (tx, rx) = mpsc::channel::<ShutdownSignal>(1);
let (tx, rx) = mpsc::unbounded_channel::<ShutdownSignal>();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tx.send(ShutdownSignal::CtrlC).await.ok();
tx.send(ShutdownSignal::CtrlC).ok();
});
handle.run_service(self.log, launcher_paths, rx).await

View file

@ -6,7 +6,7 @@
use async_trait::async_trait;
use dialoguer::{theme::ColorfulTheme, Input, Password};
use lazy_static::lazy_static;
use std::{ffi::OsString, sync::Mutex, thread, time::Duration};
use std::{ffi::OsString, path::PathBuf, sync::Mutex, thread, time::Duration};
use tokio::sync::mpsc;
use windows_service::{
define_windows_service,
@ -21,7 +21,7 @@ use windows_service::{
use crate::{
commands::tunnels::ShutdownSignal,
util::errors::{wrap, AnyError, WindowsNeedsElevation},
util::errors::{wrap, wrapdbg, AnyError, WindowsNeedsElevation},
};
use crate::{
log::{self, FileLogSink},
@ -29,19 +29,23 @@ use crate::{
};
use super::service::{
ServiceContainer, ServiceManager as CliServiceManager, SERVICE_LOG_FILE_NAME,
tail_log_file, ServiceContainer, ServiceManager as CliServiceManager, SERVICE_LOG_FILE_NAME,
};
pub struct WindowsService {
log: log::Logger,
log_file: PathBuf,
}
const SERVICE_NAME: &str = "code_tunnel";
const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS;
impl WindowsService {
pub fn new(log: log::Logger) -> Self {
Self { log }
pub fn new(log: log::Logger, paths: &LauncherPaths) -> Self {
Self {
log,
log_file: paths.service_log_file(),
}
}
}
@ -54,6 +58,10 @@ impl CliServiceManager for WindowsService {
)
.map_err(|e| WindowsNeedsElevation(format!("error getting service manager: {}", e)))?;
let mut args = args.iter().map(OsString::from).collect::<Vec<OsString>>();
args.push(OsString::from("--log-to-file"));
args.push(self.log_file.as_os_str().to_os_string());
let mut service_info = ServiceInfo {
name: OsString::from(SERVICE_NAME),
display_name: OsString::from("VS Code Tunnel"),
@ -61,7 +69,7 @@ impl CliServiceManager for WindowsService {
start_type: ServiceStartType::AutoStart,
error_control: ServiceErrorControl::Normal,
executable_path: exe,
launch_arguments: args.iter().map(OsString::from).collect(),
launch_arguments: args,
dependencies: vec![],
account_name: None,
account_password: None,
@ -74,7 +82,7 @@ impl CliServiceManager for WindowsService {
let service = if let Ok(service) = existing_service {
service
.change_config(&service_info)
.map_err(|e| wrap(e, "error updating existing service"))?;
.map_err(|e| wrapdbg(e, "error updating existing service"))?;
service
} else {
loop {
@ -112,7 +120,7 @@ impl CliServiceManager for WindowsService {
if status == ServiceState::Stopped {
service
.start::<&str>(&[])
.map_err(|e| wrap(e, "error starting service"))?;
.map_err(|e| wrapdbg(e, "error starting service"))?;
}
info!(self.log, "Tunnel service successfully started");
@ -120,7 +128,7 @@ impl CliServiceManager for WindowsService {
}
async fn show_logs(&self) -> Result<(), AnyError> {
todo!();
tail_log_file(&self.log_file).await
}
#[allow(unused_must_use)] // triggers incorrectly on `define_windows_service!`
@ -136,7 +144,7 @@ impl CliServiceManager for WindowsService {
Ok(sink) => self.log.tee(sink),
Err(e) => {
warning!(self.log, "Failed to create service log file: {}", e);
self.log.clone()
self.log
}
};
@ -176,12 +184,12 @@ impl CliServiceManager for WindowsService {
let service_status = service
.query_status()
.map_err(|e| wrap(e, "error getting service status"))?;
.map_err(|e| wrapdbg(e, "error getting service status"))?;
if service_status.current_state != ServiceState::Stopped {
service
.stop()
.map_err(|e| wrap(e, "error getting stopping service"))?;
.map_err(|e| wrapdbg(e, "error getting stopping service"))?;
while let Ok(ServiceState::Stopped) = service.query_status().map(|s| s.current_state) {
info!(self.log, "Polling for service to stop...");
@ -191,7 +199,7 @@ impl CliServiceManager for WindowsService {
service
.delete()
.map_err(|e| wrap(e, "error deleting service"))?;
.map_err(|e| wrapdbg(e, "error deleting service"))?;
Ok(())
}
@ -212,7 +220,7 @@ fn service_main(_arguments: Vec<OsString>) -> Result<(), AnyError> {
let mut service = SERVICE_IMPL.lock().unwrap().take().unwrap();
// Create a channel to be able to poll a stop event from the service worker loop.
let (shutdown_tx, shutdown_rx) = mpsc::channel::<ShutdownSignal>(5);
let (shutdown_tx, shutdown_rx) = mpsc::unbounded_channel::<ShutdownSignal>();
let mut shutdown_tx = Some(shutdown_tx);
// Define system service event handler that will be receiving service events.
@ -222,7 +230,7 @@ fn service_main(_arguments: Vec<OsString>) -> Result<(), AnyError> {
ServiceControl::Stop => {
shutdown_tx
.take()
.and_then(|tx| tx.blocking_send(ShutdownSignal::ServiceStopped).ok());
.and_then(|tx| tx.send(ShutdownSignal::ServiceStopped).ok());
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
@ -245,6 +253,13 @@ fn service_main(_arguments: Vec<OsString>) -> Result<(), AnyError> {
})
.map_err(|e| wrap(e, "error marking service as running"))?;
info!(service.log, "Starting service loop...");
let panic_log = service.log.clone();
std::panic::set_hook(Box::new(move |p| {
error!(panic_log, "Service panic: {:?}", p);
}));
let result = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()

View file

@ -43,6 +43,17 @@ impl From<reqwest::Error> for WrappedError {
}
}
pub fn wrapdbg<T, S>(original: T, message: S) -> WrappedError
where
T: std::fmt::Debug,
S: Into<String>,
{
WrappedError {
message: message.into(),
original: format!("{:?}", original),
}
}
pub fn wrap<T, S>(original: T, message: S) -> WrappedError
where
T: Display,