cli: add service integration for macos, observability

Adds support for running the tunnel as a service on macOS via launchservices.

It also hooks up observability (`code tunnel service log`) on macOS and Linux.
On macOS--and later Windows, hence the manual implementation of `tail`--it
saves output to a log file and watches it. On Linux, it simply delegates to
journalctl.

The "tailing" is implemented via simple polling of the file size. I didn't want
to pull in a giant set of dependencies for inotify/kqueue/etc just for this use
case; performance when polling a single log file is not a huge concern.
This commit is contained in:
Connor Peet 2022-11-15 15:17:48 -08:00
parent 2ab5e3f458
commit 1bf3323015
No known key found for this signature in database
GPG key ID: CF8FD2EA0DBC61BD
12 changed files with 606 additions and 52 deletions

View file

@ -9,6 +9,7 @@ use std::process::Command;
use clap::Parser;
use cli::{
commands::{args, tunnels, update, version, CommandContext},
constants::get_default_user_agent,
desktop, log as own_log,
state::LauncherPaths,
util::{
@ -38,16 +39,12 @@ async fn main() -> Result<(), std::convert::Infallible> {
let core = parsed.core();
let context = CommandContext {
http: reqwest::Client::new(),
http: reqwest::ClientBuilder::new()
.user_agent(get_default_user_agent())
.build()
.unwrap(),
paths: LauncherPaths::new(&core.global_options.cli_data_dir).unwrap(),
log: own_log::Logger::new(
SdkTracerProvider::builder().build().tracer("codecli"),
if core.global_options.verbose {
own_log::Level::Trace
} else {
core.global_options.log.unwrap_or(own_log::Level::Info)
},
),
log: make_logger(core),
args: core.clone(),
};
@ -111,6 +108,23 @@ async fn main() -> Result<(), std::convert::Infallible> {
}
}
fn make_logger(core: &args::CliCore) -> own_log::Logger {
let log_level = if core.global_options.verbose {
own_log::Level::Trace
} else {
core.global_options.log.unwrap_or(own_log::Level::Info)
};
let tracer = SdkTracerProvider::builder().build().tracer("codecli");
let mut log = own_log::Logger::new(tracer, log_level);
if let Some(f) = &core.global_options.log_to_file {
log =
log.tee(own_log::FileLogSink::new(log_level, f).expect("expected to make file logger"))
}
log
}
fn print_and_exit<E>(err: E) -> !
where
E: std::fmt::Display,
@ -143,7 +157,12 @@ async fn start_code(context: CommandContext, args: Vec<String>) -> Result<i32, A
.args(args)
.status()
.map(|s| s.code().unwrap_or(1))
.map_err(|e| wrap(e, format!("error running VS Code from {}", binary.display())))?;
.map_err(|e| {
wrap(
e,
format!("error running VS Code from {}", binary.display()),
)
})?;
Ok(code)
}

View file

@ -3,7 +3,7 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use std::fmt;
use std::{fmt, path::PathBuf};
use crate::{constants, log, options, tunnels::code_server::CodeServerArgs};
use clap::{ArgEnum, Args, Parser, Subcommand};
@ -394,6 +394,10 @@ pub struct GlobalOptions {
#[clap(long, global = true)]
pub verbose: bool,
/// Log to a file in addition to stdout. Used when running as a service.
#[clap(long, global = true, hide = true)]
pub log_to_file: Option<PathBuf>,
/// Log level to use.
#[clap(long, arg_enum, value_name = "level", global = true)]
pub log: Option<log::Level>,
@ -596,6 +600,9 @@ pub enum TunnelServiceSubCommands {
/// Uninstalls and stops the tunnel service.
Uninstall,
/// Shows logs for the running service.
Log,
/// Internal command for running the service
#[clap(hide = true)]
InternalRun,

View file

@ -116,13 +116,11 @@ pub async fn service(
match service_args {
TunnelServiceSubCommands::Install => {
// ensure logged in, otherwise subsequent serving will fail
println!("authing");
Auth::new(&ctx.paths, ctx.log.clone())
.get_credential()
.await?;
// likewise for license consent
println!("consent");
legal::require_consent(&ctx.paths, false)?;
let current_exe =
@ -147,6 +145,9 @@ pub async fn service(
TunnelServiceSubCommands::Uninstall => {
manager.unregister().await?;
}
TunnelServiceSubCommands::Log => {
manager.show_logs().await?;
}
TunnelServiceSubCommands::InternalRun => {
manager
.run(ctx.paths.clone(), TunnelServiceContainer::new(ctx.args))

View file

@ -137,6 +137,11 @@ impl LauncherPaths {
&self.root
}
/// Suggested path for tunnel service logs, when using file logs
pub fn service_log_file(&self) -> PathBuf {
self.root.join("tunnel-service.log")
}
/// Removes the launcher data directory.
pub fn remove(&self) -> Result<(), WrappedError> {
remove_dir_all(&self.root).map_err(|e| {

View file

@ -21,6 +21,8 @@ mod service;
mod service_linux;
#[cfg(target_os = "windows")]
mod service_windows;
#[cfg(target_os = "macos")]
mod service_macos;
pub use control_server::serve;
pub use service::{

View file

@ -40,6 +40,9 @@ pub trait ServiceManager {
handle: impl 'static + ServiceContainer,
) -> Result<(), AnyError>;
/// Show logs from the running service to standard out.
async fn show_logs(&self) -> Result<(), AnyError>;
/// Unregisters the current executable as a service.
async fn unregister(&self) -> Result<(), AnyError>;
}
@ -50,12 +53,16 @@ pub type ServiceManagerImpl = super::service_windows::WindowsService;
#[cfg(target_os = "linux")]
pub type ServiceManagerImpl = super::service_linux::SystemdService;
#[cfg(not(any(target_os = "windows", target_os = "linux")))]
pub type ServiceManagerImpl = UnimplementedServiceManager;
#[cfg(target_os = "macos")]
pub type ServiceManagerImpl = super::service_macos::LaunchdService;
#[allow(unreachable_code)]
#[allow(unused_variables)]
pub fn create_service_manager(log: log::Logger, paths: &LauncherPaths) -> ServiceManagerImpl {
#[cfg(target_os = "macos")]
{
super::service_macos::LaunchdService::new(log, paths)
}
#[cfg(target_os = "windows")]
{
super::service_windows::WindowsService::new(log)
@ -64,36 +71,4 @@ pub fn create_service_manager(log: log::Logger, paths: &LauncherPaths) -> Servic
{
super::service_linux::SystemdService::new(log, paths.clone())
}
#[cfg(not(any(target_os = "windows", target_os = "linux")))]
{
UnimplementedServiceManager::new()
}
}
pub struct UnimplementedServiceManager();
#[allow(dead_code)]
impl UnimplementedServiceManager {
fn new() -> Self {
Self()
}
}
#[async_trait]
impl ServiceManager for UnimplementedServiceManager {
async fn register(&self, _exe: PathBuf, _args: &[&str]) -> Result<(), AnyError> {
unimplemented!("Service management is not supported on this platform");
}
async fn run(
self,
_launcher_paths: LauncherPaths,
_handle: impl 'static + ServiceContainer,
) -> Result<(), AnyError> {
unimplemented!("Service management is not supported on this platform");
}
async fn unregister(&self) -> Result<(), AnyError> {
unimplemented!("Service management is not supported on this platform");
}
}

View file

@ -7,6 +7,7 @@ use std::{
fs::File,
io::{self, Write},
path::PathBuf,
process::Command,
};
use async_trait::async_trait;
@ -115,6 +116,29 @@ impl ServiceManager for SystemdService {
handle.run_service(self.log, launcher_paths, rx).await
}
async fn show_logs(&self) -> Result<(), AnyError> {
// show the systemctl status header...
Command::new("systemctl")
.args([
"--user",
"status",
"-n",
"0",
&SystemdService::service_name_string(),
])
.status()
.map(|s| s.code().unwrap_or(1))
.map_err(|e| wrap(e, format!("error running journalctl")))?;
// then follow log files
Command::new("journalctl")
.args(["--user", "-f", "-u", &SystemdService::service_name_string()])
.status()
.map(|s| s.code().unwrap_or(1))
.map_err(|e| wrap(e, format!("error running journalctl")))?;
Ok(())
}
async fn unregister(&self) -> Result<(), crate::util::errors::AnyError> {
let connection = SystemdService::connect().await?;
let proxy = SystemdService::proxy(&connection).await?;

View file

@ -0,0 +1,188 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use std::{
fs::{remove_file, File},
io::{self, Write},
path::{Path, PathBuf},
};
use async_trait::async_trait;
use tokio::sync::mpsc;
use crate::{
commands::tunnels::ShutdownSignal,
constants::APPLICATION_NAME,
log,
state::LauncherPaths,
util::{
command::capture_command_and_check_status,
errors::{wrap, AnyError, MissingHomeDirectory},
io::{tailf, TailEvent},
},
};
use super::ServiceManager;
pub struct LaunchdService {
log: log::Logger,
log_file: PathBuf,
}
impl LaunchdService {
pub fn new(log: log::Logger, paths: &LauncherPaths) -> Self {
Self {
log,
log_file: paths.service_log_file(),
}
}
}
#[async_trait]
impl ServiceManager for LaunchdService {
async fn register(
&self,
exe: std::path::PathBuf,
args: &[&str],
) -> Result<(), crate::util::errors::AnyError> {
let service_file = get_service_file_path()?;
write_service_file(&service_file, &self.log_file, exe, args)
.map_err(|e| wrap(e, "error creating service file"))?;
info!(self.log, "Successfully registered service...");
capture_command_and_check_status(
"launchctl",
&["load", service_file.as_os_str().to_string_lossy().as_ref()],
)
.await?;
capture_command_and_check_status("launchctl", &["start", &get_service_label()]).await?;
info!(self.log, "Tunnel service successfully started");
Ok(())
}
async fn show_logs(&self) -> Result<(), AnyError> {
if !self.log_file.exists() {
println!("The tunnel service has not started yet.");
return Ok(());
}
let file =
std::fs::File::open(&self.log_file).map_err(|e| wrap(e, "error opening log file"))?;
let mut rx = tailf(file, 20);
while let Some(line) = rx.recv().await {
match line {
TailEvent::Line(l) => print!("{}", l),
TailEvent::Reset => println!("== Tunnel service restarted =="),
TailEvent::Err(e) => return Err(wrap(e, "error reading log file").into()),
}
}
Ok(())
}
async fn run(
self,
launcher_paths: crate::state::LauncherPaths,
mut handle: impl 'static + super::ServiceContainer,
) -> Result<(), crate::util::errors::AnyError> {
let (tx, rx) = mpsc::channel::<ShutdownSignal>(1);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tx.send(ShutdownSignal::CtrlC).await.ok();
});
handle.run_service(self.log, launcher_paths, rx).await
}
async fn unregister(&self) -> Result<(), crate::util::errors::AnyError> {
let service_file = get_service_file_path()?;
match capture_command_and_check_status("launchctl", &["stop", &get_service_label()]).await {
Ok(_) => {}
// status 3 == "no such process"
Err(AnyError::CommandFailed(e)) if e.output.status.code() == Some(3) => {}
Err(e) => return Err(e),
};
info!(self.log, "Successfully stopped service...");
capture_command_and_check_status(
"launchctl",
&[
"unload",
service_file.as_os_str().to_string_lossy().as_ref(),
],
)
.await?;
info!(self.log, "Tunnel service uninstalled");
if let Ok(f) = get_service_file_path() {
remove_file(f).ok();
}
Ok(())
}
}
fn get_service_label() -> String {
format!("com.visualstudio.{}.tunnel", &*APPLICATION_NAME)
}
fn get_service_file_path() -> Result<PathBuf, MissingHomeDirectory> {
match dirs::home_dir() {
Some(mut d) => {
d.push(format!("{}.plist", get_service_label()));
Ok(d)
}
None => Err(MissingHomeDirectory()),
}
}
fn write_service_file(
path: &PathBuf,
log_file: &Path,
exe: std::path::PathBuf,
args: &[&str],
) -> io::Result<()> {
let mut f = File::create(path)?;
let log_file = log_file.as_os_str().to_string_lossy();
// todo: we may be able to skip file logging and use the ASL instead
// if/when we no longer need to support older macOS versions.
write!(
&mut f,
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n\
<!DOCTYPE plist PUBLIC \"-//Apple//DTD PLIST 1.0//EN\" \"http://www.apple.com/DTDs/PropertyList-1.0.dtd\">\n\
<plist version=\"1.0\">\n\
<dict>\n\
<key>Label</key>\n\
<string>{}</string>\n\
<key>LimitLoadToSessionType</key>\n\
<string>Aqua</string>\n\
<key>ProgramArguments</key>\n\
<array>\n\
<string>{}</string>\n\
<string>{}</string>\n\
</array>\n\
<key>KeepAlive</key>\n\
<true/>\n\
<key>StandardErrorPath</key>\n\
<string>{}</string>\n\
<key>StandardOutPath</key>\n\
<string>{}</string>\n\
</dict>\n\
</plist>",
get_service_label(),
exe.into_os_string().to_string_lossy(),
args.join("</string><string>"),
log_file,
log_file
)?;
Ok(())
}

View file

@ -119,6 +119,10 @@ impl CliServiceManager for WindowsService {
Ok(())
}
async fn show_logs(&self) -> Result<(), AnyError> {
todo!();
}
#[allow(unused_must_use)] // triggers incorrectly on `define_windows_service!`
async fn run(
self,

View file

@ -2,10 +2,34 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use super::errors::{wrap, WrappedError};
use std::{ffi::OsStr, process::Stdio};
use super::errors::{wrap, AnyError, CommandFailed, WrappedError};
use std::{ffi::OsStr, process::Stdio, borrow::Cow};
use tokio::process::Command;
pub async fn capture_command_and_check_status(
command_str: impl AsRef<OsStr>,
args: &[impl AsRef<OsStr>],
) -> Result<std::process::Output, AnyError> {
let output = capture_command(&command_str, args).await?;
if !output.status.success() {
return Err(CommandFailed {
command: format!(
"{} {}",
command_str.as_ref().to_string_lossy(),
args.iter()
.map(|a| a.as_ref().to_string_lossy())
.collect::<Vec<Cow<'_, str>>>()
.join(" ")
),
output,
}
.into());
}
Ok(output)
}
pub async fn capture_command<A, I, S>(
command_str: A,
args: I,

View file

@ -371,6 +371,37 @@ impl std::fmt::Display for CorruptDownload {
}
}
#[derive(Debug)]
pub struct MissingHomeDirectory();
impl std::fmt::Display for MissingHomeDirectory {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Could not find your home directory. Please ensure this command is running in the context of an normal user.")
}
}
#[derive(Debug)]
pub struct CommandFailed {
pub output: std::process::Output,
pub command: String,
}
impl std::fmt::Display for CommandFailed {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"Failed to run command \"{}\" (code {}): {}",
self.command,
self.output.status,
String::from_utf8_lossy(if self.output.stderr.is_empty() {
&self.output.stdout
} else {
&self.output.stderr
})
)
}
}
// Makes an "AnyError" enum that contains any of the given errors, in the form
// `enum AnyError { FooError(FooError) }` (when given `makeAnyError!(FooError)`).
// Useful to easily deal with application error types without making tons of "From"
@ -433,7 +464,9 @@ makeAnyError!(
ServiceAlreadyRegistered,
WindowsNeedsElevation,
UpdatesNotConfigured,
CorruptDownload
CorruptDownload,
MissingHomeDirectory,
CommandFailed
);
impl From<reqwest::Error> for AnyError {

View file

@ -2,9 +2,18 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
use std::{io, task::Poll};
use std::{
fs::File,
io::{self, BufRead, Seek},
task::Poll,
time::Duration,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::mpsc,
time::sleep,
};
pub trait ReportCopyProgress {
fn report_progress(&mut self, bytes_so_far: u64, total_bytes: u64);
@ -95,3 +104,266 @@ impl ReadBuffer {
Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
pub enum TailEvent {
/// A new line was read from the file. The line includes its trailing newline character.
Line(String),
/// The file appears to have been rewritten (size shrunk)
Reset,
/// An error was encountered with the file.
Err(io::Error),
}
/// Simple, naive implementation of `tail -f -n <n> <path>`. Uses polling, so
/// it's not the fastest, but simple and working for easy cases.
pub fn tailf(file: File, n: usize) -> mpsc::UnboundedReceiver<TailEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let mut last_len = match file.metadata() {
Ok(m) => m.len(),
Err(e) => {
tx.send(TailEvent::Err(e)).ok();
return rx;
}
};
let mut reader = io::BufReader::new(file);
let mut pos = 0;
// Read the initial "n" lines back from the request. initial_lines
// is a small ring buffer.
let mut initial_lines = Vec::with_capacity(n);
let mut initial_lines_i = 0;
loop {
let mut line = String::new();
let bytes_read = match reader.read_line(&mut line) {
Ok(0) => break,
Ok(n) => n,
Err(e) => {
tx.send(TailEvent::Err(e)).ok();
return rx;
}
};
if !line.ends_with('\n') {
// EOF
break;
}
pos += bytes_read as u64;
if initial_lines.len() < initial_lines.capacity() {
initial_lines.push(line)
} else {
initial_lines[initial_lines_i] = line;
}
initial_lines_i = (initial_lines_i + 1) % n;
}
// remove tail lines...
if initial_lines_i < initial_lines.len() {
for line in initial_lines.drain((initial_lines_i)..) {
tx.send(TailEvent::Line(line)).ok();
}
}
// then the remaining lines
if !initial_lines.is_empty() {
for line in initial_lines.drain(0..) {
tx.send(TailEvent::Line(line)).ok();
}
}
// now spawn the poll process to keep reading new lines
tokio::spawn(async move {
let poll_interval = Duration::from_millis(500);
loop {
tokio::select! {
_ = sleep(poll_interval) => {},
_ = tx.closed() => return
}
match reader.get_ref().metadata() {
Err(e) => {
tx.send(TailEvent::Err(e)).ok();
return;
}
Ok(m) => {
if m.len() == last_len {
continue;
}
if m.len() < last_len {
tx.send(TailEvent::Reset).ok();
pos = 0;
}
last_len = m.len();
}
}
if let Err(e) = reader.seek(io::SeekFrom::Start(pos)) {
tx.send(TailEvent::Err(e)).ok();
return;
}
loop {
let mut line = String::new();
let n = match reader.read_line(&mut line) {
Ok(0) => break,
Ok(n) => n,
Err(e) => {
tx.send(TailEvent::Err(e)).ok();
return;
}
};
if n == 0 || !line.ends_with('\n') {
break;
}
pos += n as u64;
if tx.send(TailEvent::Line(line)).is_err() {
return;
}
}
}
});
rx
}
#[cfg(test)]
mod tests {
use rand::Rng;
use std::{fs::OpenOptions, io::Write};
use super::*;
#[tokio::test]
async fn test_tailf_empty() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("tmp");
let read_file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(&file_path)
.unwrap();
let mut rx = tailf(read_file, 32);
assert!(rx.try_recv().is_err());
let mut append_file = OpenOptions::new()
.write(true)
.append(true)
.open(&file_path)
.unwrap();
writeln!(&mut append_file, "some line").unwrap();
let recv = rx.recv().await;
if let Some(TailEvent::Line(l)) = recv {
assert_eq!("some line\n".to_string(), l);
} else {
unreachable!("expect a line event, got {:?}", recv)
}
write!(&mut append_file, "partial ").unwrap();
writeln!(&mut append_file, "line").unwrap();
let recv = rx.recv().await;
if let Some(TailEvent::Line(l)) = recv {
assert_eq!("partial line\n".to_string(), l);
} else {
unreachable!("expect a line event, got {:?}", recv)
}
}
#[tokio::test]
async fn test_tailf_resets() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("tmp");
let mut read_file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(&file_path)
.unwrap();
writeln!(&mut read_file, "some existing content").unwrap();
let mut rx = tailf(read_file, 0);
assert!(rx.try_recv().is_err());
let mut append_file = File::create(&file_path).unwrap(); // truncates
writeln!(&mut append_file, "some line").unwrap();
let recv = rx.recv().await;
if let Some(TailEvent::Reset) = recv {
// ok
} else {
unreachable!("expect a reset event, got {:?}", recv)
}
let recv = rx.recv().await;
if let Some(TailEvent::Line(l)) = recv {
assert_eq!("some line\n".to_string(), l);
} else {
unreachable!("expect a line event, got {:?}", recv)
}
}
#[tokio::test]
async fn test_tailf_with_data() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("tmp");
let mut read_file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(&file_path)
.unwrap();
let mut rng = rand::thread_rng();
let mut written = vec![];
let base_line = "Elit ipsum cillum ex cillum. Adipisicing consequat cupidatat do proident ut in sunt Lorem ipsum tempor. Eiusmod ipsum Lorem labore exercitation sunt pariatur excepteur fugiat cillum velit cillum enim. Nisi Lorem cupidatat ad enim velit officia eiusmod esse tempor aliquip. Deserunt pariatur tempor in duis culpa esse sit nulla irure ullamco ipsum voluptate non laboris. Occaecat officia nulla officia mollit do aliquip reprehenderit ad incididunt.";
for i in 0..100 {
let line = format!("{}: {}", i, &base_line[..rng.gen_range(0..base_line.len())]);
writeln!(&mut read_file, "{}", line).unwrap();
written.push(line);
}
write!(&mut read_file, "partial line").unwrap();
read_file.seek(io::SeekFrom::Start(0)).unwrap();
let last_n = 32;
let mut rx = tailf(read_file, last_n);
for i in 0..last_n {
let recv = rx.try_recv().unwrap();
if let TailEvent::Line(l) = recv {
let mut expected = written[written.len() - last_n + i].to_string();
expected.push('\n');
assert_eq!(expected, l);
} else {
unreachable!("expect a line event, got {:?}", recv)
}
}
assert!(rx.try_recv().is_err());
let mut append_file = OpenOptions::new()
.write(true)
.append(true)
.open(&file_path)
.unwrap();
writeln!(append_file, " is now complete").unwrap();
let recv = rx.recv().await;
if let Some(TailEvent::Line(l)) = recv {
assert_eq!("partial line is now complete\n".to_string(), l);
} else {
unreachable!("expect a line event, got {:?}", recv)
}
}
}