Give each task a separate process group

This makes it vastly easier to handle task singnalling; signals can be sent to all
processes in the group, including SIGTERM.

The Apple and Linux variants have largely been combined into a single codebase.
This commit is contained in:
Martijn Pieters 2022-10-17 15:25:46 +01:00
parent 9b69c422a4
commit 6635d1e1b8
No known key found for this signature in database
22 changed files with 659 additions and 784 deletions

View file

@ -8,7 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- pueue log output now includes the task label, if any. [#355](https://github.com/Nukesor/pueue/issues/355)
- `pueue log` output now includes the task label, if any. [#355](https://github.com/Nukesor/pueue/issues/355)
- Enable `pueue edit` to edit multiple properties in one go.
- Tasks are now started in a process group, and pueue kill --children will kill all processes in the group. [#372](https://github.com/Nukesor/pueue/issues/372)
### Added
@ -29,10 +31,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added the `created_at` and `enqueued_at` metadata fields on `Task` [#356](https://github.com/Nukesor/pueue/issues/356).
They'll only be exposed when running `status --json` for now.
### Changed
- Enable `pueue edit` to edit multiple properties in one go.
### Fixed
- Interpret the `$EDITOR` command, when editing a task's command/path, as a shell expression instead of an executable ([#336](https://github.com/Nukesor/pueue/issues/336)).
@ -40,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Don't show the version warning message between daemon and client, when using any `--json` flag.
- Fix some test failures in non-standard environments for NixOS test suite ([#346](https://github.com/Nukesor/pueue/issues/346)).
- The time in pueue's logs will now be in localtime instead of UTC [#385](https://github.com/Nukesor/pueue/issues/385).
- macos support has been brought on par with Linux.
### Misc

175
Cargo.lock generated
View file

@ -123,6 +123,25 @@ dependencies = [
"console",
]
[[package]]
name = "bindgen"
version = "0.59.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -173,6 +192,15 @@ version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -205,6 +233,17 @@ dependencies = [
"scanlex",
]
[[package]]
name = "clang-sys"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa2e27ae6ab525c3d369ded447057bca5438d86dc3a68f6faafb8269ba82ebf3"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "clap"
version = "3.2.17"
@ -265,6 +304,16 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "command-group"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7a8a86f409b4a59df3a3e4bee2de0b83f1755fdd2a25e3a9684c396fc4bed2c"
dependencies = [
"nix 0.22.3",
"winapi",
]
[[package]]
name = "console"
version = "0.15.1"
@ -293,6 +342,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@ -368,6 +426,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "darwin-libproc"
version = "0.2.0"
source = "git+https://github.com/jamwaffles/darwin-libproc.git?rev=cbd571e4be87a6603de328d1f1f0ad54d7bdfedf#cbd571e4be87a6603de328d1f1f0ad54d7bdfedf"
dependencies = [
"darwin-libproc-sys",
"libc",
"memchr",
]
[[package]]
name = "darwin-libproc-sys"
version = "0.2.0"
source = "git+https://github.com/jamwaffles/darwin-libproc.git?rev=cbd571e4be87a6603de328d1f1f0ad54d7bdfedf#cbd571e4be87a6603de328d1f1f0ad54d7bdfedf"
[[package]]
name = "diff"
version = "0.1.13"
@ -471,6 +544,16 @@ dependencies = [
"instant",
]
[[package]]
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@ -599,6 +682,12 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d"
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "half"
version = "1.8.2"
@ -721,10 +810,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.132"
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.135"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c"
[[package]]
name = "libloading"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f"
dependencies = [
"cfg-if",
"winapi",
]
[[package]]
name = "libproc"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b799ad155d75ce914c467ee5627b62247c20d4aedbd446f821484cebf3cded7"
dependencies = [
"bindgen",
"errno",
"libc",
]
[[package]]
name = "linux-raw-sys"
@ -766,6 +882,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.5.3"
@ -789,11 +911,12 @@ dependencies = [
[[package]]
name = "nix"
version = "0.24.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc"
checksum = "e4916f159ed8e5de0082076562152a76b7a1f64a01fd9d1e0fea002c37624faf"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
@ -811,6 +934,16 @@ dependencies = [
"libc",
]
[[package]]
name = "nom"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@ -902,6 +1035,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "1.1.0"
@ -1062,6 +1201,8 @@ checksum = "2dfb6451c91904606a1abe93e83a8ec851f45827fa84273f256ade45dc095818"
dependencies = [
"bitflags",
"byteorder",
"chrono",
"flate2",
"hex",
"lazy_static",
"rustix",
@ -1079,17 +1220,16 @@ dependencies = [
"clap",
"clap_complete",
"comfy-table",
"command-group",
"crossbeam-channel",
"crossterm",
"ctrlc",
"env_logger",
"handlebars",
"log",
"nix 0.24.2",
"pest",
"pest_derive",
"pretty_assertions",
"procfs",
"pueue-lib",
"rstest",
"serde",
@ -1103,7 +1243,6 @@ dependencies = [
"tempfile",
"test-log",
"tokio",
"whoami",
]
[[package]]
@ -1115,9 +1254,11 @@ dependencies = [
"better-panic",
"byteorder",
"chrono",
"command-group",
"darwin-libproc",
"dirs",
"libproc",
"log",
"nix 0.24.2",
"portpicker",
"pretty_assertions",
"procfs",
@ -1338,6 +1479,12 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
@ -1349,9 +1496,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.35.9"
version = "0.35.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c825b8aa8010eb9ee99b75f05e10180b9278d161583034d7574c9d617aeada"
checksum = "fbb2fda4666def1433b1b05431ab402e42a1084285477222b72d6c564c417cef"
dependencies = [
"bitflags",
"errno",
@ -1499,6 +1646,12 @@ dependencies = [
"dirs",
]
[[package]]
name = "shlex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook"
version = "0.3.14"

View file

@ -14,6 +14,7 @@ rust-version = "1.63"
[workspace.dependencies]
chrono = { version = "0.4", features = ["serde"] }
command-group = "1.0.8"
log = "0.4"
serde = "1"
serde_json = "1"
@ -73,6 +74,7 @@ shell-escape = "0.1"
simplelog = "0.12"
tempfile = "3"
command-group = { workspace = true }
chrono = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
@ -95,9 +97,3 @@ similar-asserts = "1"
# nextest run --no-capture`)
env_logger = "0.9.1"
test-log = "0.2.11"
# Test specific dev-dependencies
[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies]
nix = "0.24"
whoami = "1"
procfs = { version = "0.14", default-features = false }

View file

@ -53,9 +53,8 @@ The queue will be continuously processed, even if you no longer have any active
- A lot more. Check the -h options for each subcommand for detailed options.
- Cross Platform
* Linux is fully supported and battle-tested.
* MacOS is fully supported and on par with Linux.
* Windows is fully supported and working fine for quite a while.
* MacOS only has **rudimentary** process handling, but it's still usable.
Check this [issue](https://github.com/Nukesor/pueue/issues/115) to find out what's missing.
- [Why should I use it](https://github.com/Nukesor/pueue/wiki/FAQ#why-should-i-use-it)
- [Advantages over Using a Terminal Multiplexer](https://github.com/Nukesor/pueue/wiki/FAQ#advantages-over-using-a-terminal-multiplexer)

View file

@ -134,7 +134,7 @@ pub enum SubCommand {
#[clap(short, long)]
all: bool,
/// Also resume direct child processes of your paused tasks.
/// Resume all processes in the process groups of your paused tasks.
/// By default only the main process will get a SIGSTART.
#[clap(short, long)]
children: bool,
@ -212,10 +212,9 @@ pub enum SubCommand {
#[clap(short, long)]
wait: bool,
/// Also pause direct child processes of a task's main process.
/// Pause all processes in the task's process group.
/// By default only the main process will get a SIGSTOP.
/// This is useful when calling bash scripts, which start other processes themselves.
/// This operation is not recursive!
#[clap(short, long)]
children: bool,
},
@ -234,7 +233,7 @@ pub enum SubCommand {
#[clap(short, long)]
all: bool,
/// Send the SIGTERM signal to all children as well.
/// Send the (SIGTERM) signal to all processes in the task process group.
/// Useful when working with shell scripts.
#[clap(short, long)]
children: bool,
@ -384,7 +383,7 @@ pub enum SubCommand {
/// Kill all tasks, clean up afterwards and reset EVERYTHING!
Reset {
/// Send the SIGTERM signal to all children as well.
/// Send the SIGTERM signal to all processes in the task process group.
/// Useful when working with shell scripts.
#[clap(short, long)]
children: bool,

View file

@ -93,7 +93,7 @@ macro_rules! ok_or_return_failure_message {
mod fixtures {
pub use crossbeam_channel::Sender;
use std::collections::HashMap;
use std::path::PathBuf;
use std::env::temp_dir;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;
@ -135,7 +135,7 @@ mod fixtures {
pub fn get_stub_task_in_group(id: &str, group: &str, status: TaskStatus) -> Task {
Task::new(
id.to_string(),
PathBuf::from("/tmp"),
temp_dir(),
HashMap::new(),
group.to_string(),
status,

View file

@ -1,12 +1,12 @@
use command_group::GroupChild;
use std::collections::BTreeMap;
use std::process::Child;
/// This structure is needed to manage worker pools for groups.
/// It's a newtype pattern around a nested BTreeMap, which implements some convenience functions.
///
/// The datastructure contains these types of data:
/// BTreeMap<group, BTreeMap<group_worker_id, (task_id, Subprocess handle)>
pub struct Children(pub BTreeMap<String, BTreeMap<usize, (usize, Child)>>);
pub struct Children(pub BTreeMap<String, BTreeMap<usize, (usize, GroupChild)>>);
impl Children {
/// Returns whether there are any active tasks across all groups.
@ -28,25 +28,10 @@ impl Children {
false
}
/// A convenience function to get a child by its respective task_id.
/// We have to do a nested linear search over all children of all pools,
/// beceause these datastructure aren't indexed via task_ids.
pub fn get_child(&self, task_id: usize) -> Option<&Child> {
for pool in self.0.values() {
for (child_task_id, child) in pool.values() {
if child_task_id == &task_id {
return Some(child);
}
}
}
None
}
/// A convenience function to get a mutable child by its respective task_id.
/// We have to do a nested linear search over all children of all pools,
/// beceause these datastructure aren't indexed via task_ids.
pub fn get_child_mut(&mut self, task_id: usize) -> Option<&mut Child> {
pub fn get_child_mut(&mut self, task_id: usize) -> Option<&mut GroupChild> {
for pool in self.0.values_mut() {
for (child_task_id, child) in pool.values_mut() {
if child_task_id == &task_id {
@ -107,7 +92,7 @@ impl Children {
/// This function should only be called when spawning a new process.
/// At this point, we're sure that the worker pool for the given group already exists, hence
/// the expect call.
pub fn add_child(&mut self, group: &str, worker_id: usize, task_id: usize, child: Child) {
pub fn add_child(&mut self, group: &str, worker_id: usize, task_id: usize, child: GroupChild) {
let pool = self
.0
.get_mut(group)

View file

@ -93,7 +93,9 @@ impl TaskHandler {
/// Triggered on `reset` and `kill`.
pub fn kill_task(&mut self, task_id: usize, kill_children: bool) {
if let Some(child) = self.children.get_child_mut(task_id) {
kill_child(task_id, child, kill_children);
kill_child(task_id, child, kill_children).unwrap_or_else(|err| {
warn!("Failed to send kill to task {task_id} child process {child:?} with error {err:?}");
})
} else {
warn!("Tried to kill non-existing child: {task_id}");
}

View file

@ -15,7 +15,7 @@ impl TaskHandler {
}
};
{
let child_stdin = child.stdin.as_mut().unwrap();
let child_stdin = child.inner().stdin.as_mut().unwrap();
if let Err(err) = child_stdin.write_all(&input.into_bytes()) {
error!("Failed to send input to task {task_id} with err {err:?}");
};

View file

@ -5,6 +5,7 @@ use std::process::Stdio;
use anyhow::Result;
use chrono::prelude::*;
use command_group::CommandGroup;
use crossbeam_channel::{Receiver, SendError, Sender};
use handlebars::Handlebars;
use log::{debug, error, info};
@ -261,7 +262,7 @@ impl TaskHandler {
/// This is a small wrapper around the real platform dependant process handling logic
/// It only ensures, that the process we want to manipulate really does exists.
fn perform_action(&mut self, id: usize, action: ProcessAction, children: bool) -> Result<bool> {
match self.children.get_child(id) {
match self.children.get_child_mut(id) {
Some(child) => {
debug!("Executing action {action:?} to {id}");
run_action_on_child(child, &action, children)?;

View file

@ -128,7 +128,7 @@ impl TaskHandler {
.envs(envs.clone())
.stdout(Stdio::from(stdout_log))
.stderr(Stdio::from(stderr_log))
.spawn();
.group_spawn();
// Check if the task managed to spawn
let child = match spawned_command {

View file

@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
This project adheres **somewhat** to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
The concept of SemVer is applied to the daemon/client API, but not the library API itself.
## [2.2.0] - unreleased
### Changed
- Tasks are now started in a process group, with signals (including SIGTERM) optionally sent to the whole group. [#372](https://github.com/Nukesor/pueue/issues/372)
## [0.21.0] - unreleased
### Added

View file

@ -33,6 +33,7 @@ strum_macros = "0.24"
thiserror = "1"
tokio-rustls = "0.23"
command-group = { workspace = true }
log = { workspace = true }
snap = { workspace = true }
serde = { workspace = true }
@ -60,10 +61,18 @@ winapi = { version = "0.3", features = [
] }
# Unix
[target.'cfg(not(windows))'.dependencies]
nix = "0.24"
[target.'cfg(unix)'.dependencies]
whoami = "1"
# Linux / Freebsd
[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies]
procfs = { version = "0.14", default-features = false }
# Linux only
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.14.1"
# Apple only
[target.'cfg(target_vendor = "apple")'.dependencies]
libproc = "0.12.0"
[target.'cfg(target_vendor = "apple")'.dev-dependencies]
# fork that fixes a version conflict, see heim-rs/darwin-libproc#3
# only used as a test dependency.
darwin-libproc = { version="0.2.0", git = "https://github.com/jamwaffles/darwin-libproc.git", rev = "cbd571e4be87a6603de328d1f1f0ad54d7bdfedf" }

View file

@ -1,166 +1,24 @@
use std::convert::TryInto;
use std::path::Path;
use std::process::{Child, Command};
use libproc::libproc::{proc_pid, task_info};
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::Result;
use log::debug;
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
pub fn compile_shell_command(command_string: &str) -> Command {
let mut command = Command::new("sh");
command.arg("-c").arg(command_string);
command
}
fn map_action_to_signal(action: &ProcessAction) -> Signal {
match action {
ProcessAction::Pause => Signal::SIGSTOP,
ProcessAction::Resume => Signal::SIGCONT,
}
}
fn map_internal_signal_to_nix_signal(signal: InternalSignal) -> Signal {
match signal {
InternalSignal::SigKill => Signal::SIGKILL,
InternalSignal::SigInt => Signal::SIGINT,
InternalSignal::SigTerm => Signal::SIGTERM,
InternalSignal::SigCont => Signal::SIGCONT,
InternalSignal::SigStop => Signal::SIGSTOP,
}
}
/// Convenience wrapper around `send_signal_to_child` for raw unix signals.
/// Its purpose is to hide platform specific logic.
pub fn send_internal_signal_to_child(
child: &Child,
signal: InternalSignal,
send_to_children: bool,
) -> Result<bool> {
let signal = map_internal_signal_to_nix_signal(signal);
send_signal_to_child(child, signal, send_to_children)
}
/// Convenience wrapper around `send_signal_to_child` for internal actions on processes.
/// Its purpose is to hide platform specific logic.
pub fn run_action_on_child(
child: &Child,
action: &ProcessAction,
send_to_children: bool,
) -> Result<bool> {
let signal = map_action_to_signal(action);
send_signal_to_child(child, signal, send_to_children)
}
/// Send a signal to one of Pueue's child process handles.
/// We need a special since we assume that there's also a `sh -c` around the actuall process.
pub fn send_signal_to_child(
child: &Child,
signal: Signal,
_send_to_children: bool,
) -> Result<bool> {
let pid = child.id();
// Send the signal to the shell, don't propagate to its children yet.
send_signal_to_process(pid, signal, false)?;
signal::kill(Pid::from_raw(pid.try_into().unwrap()), signal)?;
Ok(true)
}
/// This is a helper function to safely kill a child process.
/// Its purpose is to properly kill all processes and prevent any dangling processes.
pub fn kill_child(task_id: usize, child: &mut Child, _kill_children: bool) -> bool {
match child.kill() {
Err(_) => {
debug!("Task {task_id} has already finished by itself");
false
}
_ => true,
}
}
/// Send a signal to a unix process.
fn send_signal_to_process(pid: u32, signal: Signal, _children: bool) -> Result<bool, nix::Error> {
debug!("Sending signal {signal} to {pid}");
signal::kill(Pid::from_raw(pid.try_into().unwrap()), signal)?;
Ok(true)
}
/// Check, whether a specific process is exists or not
/// Check, whether a specific process exists or not
pub fn process_exists(pid: u32) -> bool {
Path::new(&format!("/proc/{pid}")).exists()
proc_pid::pidinfo::<task_info::TaskInfo>(pid.try_into().unwrap(), 0).is_ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;
use std::time::Duration;
pub mod tests {
// TODO: swap darwin_libproc out for libproc when that project supports listing pids by
// group id.
use darwin_libproc;
use log::warn;
/// Assert that certain process id no longer exists
fn process_is_gone(pid: u32) -> bool {
!process_exists(pid)
}
#[test]
/// Simply check, whether spawning of a shell command works
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
assert!(ecode.success());
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes.
fn test_shell_command_is_killed() {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.spawn()
.expect("Failed to spawn echo");
let pid = child.id();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
}
#[test]
/// Ensure a normal command without `sh -c` will be killed.
fn test_normal_command_is_killed() {
let mut child = Command::new("sleep")
.arg("60")
.spawn()
.expect("Failed to spawn echo");
let pid = child.id();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
assert!(process_is_gone(pid));
pub fn get_process_group_pids(pgrp: i32) -> Vec<i32> {
match darwin_libproc::pgrp_only_pids(pgrp) {
Err(error) => {
warn!("Failed to get list of processes in process group {pgrp}: {error}");
Vec::new()
}
Ok(processes) => processes,
}
}
}

View file

@ -1,501 +1,41 @@
use std::convert::TryInto;
use std::process::{Child, Command};
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::{bail, Result};
use log::{debug, info, warn};
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use procfs::process::{all_processes, Process};
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
pub fn compile_shell_command(command_string: &str) -> Command {
let mut command = Command::new("sh");
command.arg("-c").arg(command_string);
command
}
fn map_action_to_signal(action: &ProcessAction) -> Signal {
match action {
ProcessAction::Pause => Signal::SIGSTOP,
ProcessAction::Resume => Signal::SIGCONT,
}
}
fn map_internal_signal_to_nix_signal(signal: InternalSignal) -> Signal {
match signal {
InternalSignal::SigKill => Signal::SIGKILL,
InternalSignal::SigInt => Signal::SIGINT,
InternalSignal::SigTerm => Signal::SIGTERM,
InternalSignal::SigCont => Signal::SIGCONT,
InternalSignal::SigStop => Signal::SIGSTOP,
}
}
/// Convenience wrapper around `send_signal_to_child` for raw unix signals.
/// Its purpose is to hide platform specific logic.
pub fn send_internal_signal_to_child(
child: &Child,
signal: InternalSignal,
send_to_children: bool,
) -> Result<bool> {
let signal = map_internal_signal_to_nix_signal(signal);
send_signal_to_child(child, signal, send_to_children)
}
/// Convenience wrapper around `send_signal_to_child` for internal actions on processes.
/// Its purpose is to hide platform specific logic.
pub fn run_action_on_child(
child: &Child,
action: &ProcessAction,
send_to_children: bool,
) -> Result<bool> {
let signal = map_action_to_signal(action);
send_signal_to_child(child, signal, send_to_children)
}
/// Send a signal to one of Pueue's child process handles.
///
/// There are two scenarios:
///
/// **Normal case**
///
/// A task, such as `sleep 60` get's spawned by the posix shell `sh`.
/// This results in the process `sh -c 'sleep 60'`.
/// Since the posix shell doesn't propagate any process signals to its children, we have to:
/// 1. Send the signal to the shell.
/// 2. Send the signal directly to the children.
/// In our case this would be the `sleep 60` child process.
///
/// If the user also want's to send the signal to all child processes of the task,
/// we have to get all child-processes of the child process.
///
/// **Special case**
///
/// The posix shell `sh` has some some inconsistent behavior.
/// In some circumstances and environments, the `sh -c $command` doesn't spawn a `sh` process with a
/// `$command` child-process, but rather spawns the `$command` as a top-level process directly.
///
/// This makes things a bit more complicated, since we have to find out whether a shell is spawned
/// or not. If a shell is spawned, we do the **Normal case** handling.
///
/// If **no** shell is spawned, we have to send the signal to the top-level process only.
///
/// If the user also want's to send the signal to all child processes of the task,
/// we have to get all child-processes of that `$command` process. and send them the signal.
///
/// Returns `Ok(true)`, if everything went alright
/// Returns `Ok(false)`, if the process went away while we tried to send the signal.
pub fn send_signal_to_child(child: &Child, signal: Signal, send_to_children: bool) -> Result<bool> {
let pid: i32 = child.id().try_into().unwrap();
// Check whether this process actually spawned a shell.
let is_shell = if let Ok(is_shell) = did_process_spawn_shell(pid) {
is_shell
} else {
return Ok(false);
};
if is_shell {
// If it's a shell, we have to send the signal to the actual shell and to all it's children.
// There might be multiple children, for instance, when users use the `&` operator.
// If the `send_to_children` flag is given, the
// Get all children before sending the signal to the parent process.
// Otherwise the parent might go away and we'll no longer be able to access the children.
let shell_children = get_child_processes(pid);
// Send the signal to the shell, don't propagate to its children yet.
send_signal_to_process(pid, signal, false)?;
// Now send the signal to the shells child processes and their respective
// children if the user wants to do so.
for shell_child in shell_children {
send_signal_to_process(shell_child.pid(), signal, send_to_children)?;
}
} else {
// If it isn't a shell, send the signal directly to the process.
// Handle children normally.
send_signal_to_process(pid, signal, send_to_children)?;
}
signal::kill(Pid::from_raw(pid), signal)?;
Ok(true)
}
/// This is a helper function to safely kill a child process.
/// Its purpose is to properly kill all processes and prevent any dangling processes.
///
/// Sadly, this needs some extra handling. Check the docstring of `send_signal_to_child` for
/// additional information on why this needs to be done.
///
/// Returns `true`, if everything went alright
/// Returns `false`, if the process went away while we tried to send the signal.
pub fn kill_child(task_id: usize, child: &mut Child, kill_children: bool) -> bool {
let pid: i32 = child.id().try_into().unwrap();
// Check whether this process actually spawned a shell.
let is_shell = if let Ok(is_shell) = did_process_spawn_shell(pid) {
is_shell
} else {
return false;
};
// We have to kill the root process first, to prevent it from spawning new processes.
// However, this prevents us from getting its child processes afterwards.
// That's why we have to get the list of child processes already now.
let mut child_processes = None;
if kill_children || is_shell {
child_processes = Some(get_child_processes(pid));
}
// Kill the parent first
let kill_result = child.kill();
if kill_result.is_err() {
info!("Task {task_id} has already finished by itself.");
return false;
}
// Do an early return, if we don't need to kill any children.
let child_processes = if let Some(child_processes) = child_processes {
child_processes
} else {
return true;
};
// Now kill all remaining children. The parent has been already been killed at this point.
// If a shell is spawned, we have to manually send the kill signal to all children.
// Otherwise only send a signal to all children if the `kill_children` flag is set.
if is_shell {
for child_process in child_processes {
// Send the signal to each child process, show warning if this fails.
let process_pid = child_process.pid();
if let Err(error) = send_signal_to_process(process_pid, Signal::SIGKILL, kill_children)
{
warn!("Failed to send kill to pid {process_pid} with error {error:?}");
}
}
} else if kill_children {
send_signal_to_processes(child_processes, Signal::SIGKILL);
}
true
}
/// Check whether a process's commandline string is actually a shell or not
fn did_process_spawn_shell(pid: i32) -> Result<bool> {
// Get the /proc representation of the child, so we can do some checks
let process = if let Ok(process) = Process::new(pid) {
process
} else {
info!("Process to kill has probably just gone away. Process {pid}");
bail!("Process has just gone away");
};
// Get the root command and check whether it's actually a shell with `sh -c`.
let mut cmdline = if let Ok(cmdline) = process.cmdline() {
cmdline
} else {
info!("Process to kill has probably just gone away. Process {pid}");
bail!("Process has just gone away");
};
if cmdline.len() < 3 {
return Ok(false);
}
if cmdline.remove(0) != "sh" {
return Ok(false);
}
if cmdline.remove(0) != "-c" {
return Ok(false);
}
Ok(true)
}
/// Send a signal to a unix process.
/// In case the user wants to send the signal to all children as well:
/// 1. Get the children before sending the signal to the parent (as it might go away)
/// 2. Send the signal to the parent first, as it might spawn new children otherwise.
/// 3. Send the signal to all children.
fn send_signal_to_process(
pid: i32,
signal: Signal,
send_to_children: bool,
) -> Result<bool, nix::Error> {
debug!("Sending signal {signal} to {pid}");
if send_to_children {
let children = get_child_processes(pid);
signal::kill(Pid::from_raw(pid), signal)?;
send_signal_to_processes(children, signal);
} else {
signal::kill(Pid::from_raw(pid), signal)?;
}
Ok(true)
}
/// Send a signal to a list of processes.
/// This is a convenience wrapper around `send_signal_to_process`.
fn send_signal_to_processes(processes: Vec<Process>, signal: Signal) {
for process in processes {
// Process is no longer alive, skip this one.
if !process.is_alive() {
continue;
}
let pid = Pid::from_raw(process.pid);
if let Err(error) = signal::kill(pid, signal) {
warn!("Failed to send signal {signal:?} to Pid {pid}: {error:?}");
}
}
}
/// Get all children of a specific process
fn get_child_processes(ppid: i32) -> Vec<Process> {
let all_processes = match all_processes() {
Err(error) => {
warn!("Failed to get full process list: {error}");
return Vec::new();
}
Ok(processes) => processes,
};
// Get all processes whose `stat` can be access without any errors.
// We then search for processes with the correct parent process id.
all_processes
.into_iter()
.filter_map(|process| process.ok())
.filter(|process| {
if let Ok(stat) = process.stat() {
stat.ppid == ppid
} else {
false
}
})
.collect()
}
use procfs::process;
/// Check, whether a specific process is exists or not
pub fn process_exists(pid: u32) -> bool {
match Process::new(pid as i32) {
Ok(process) => process.is_alive(),
match pid.try_into() {
Err(_) => false,
Ok(pid) => match process::Process::new(pid) {
Ok(process) => process.is_alive(),
Err(_) => false,
},
}
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use std::time::Duration;
pub mod tests {
use log::warn;
use anyhow::Result;
use pretty_assertions::assert_eq;
use super::process;
use super::*;
/// Get all processes in a process group
pub fn get_process_group_pids(pgrp: i32) -> Vec<i32> {
let all_processes = match process::all_processes() {
Err(error) => {
warn!("Failed to get full process list: {error}");
return Vec::new();
}
Ok(processes) => processes,
};
/// Assert that certain process id no longer exists
fn process_is_gone(pid: i32) -> bool {
!process_exists(pid as u32)
}
#[test]
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
assert!(ecode.success());
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes.
fn test_shell_command_is_killed() -> Result<()> {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(did_process_spawn_shell(pid).unwrap());
// Get all child processes, so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 2);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
Ok(())
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes when using unix
/// signals directly.
fn test_shell_command_is_killed_with_signal() -> Result<()> {
let child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(did_process_spawn_shell(pid).unwrap());
// Get all child processes, so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 2);
// Kill the process and make sure it'll be killed.
send_signal_to_child(&child, Signal::SIGKILL, false).unwrap();
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
Ok(())
}
#[test]
/// Ensure that a `sh -c` process with a child process that has children of its own
/// will properly kill all processes and their children's children without detached processes.
fn test_shell_command_children_are_killed() -> Result<()> {
let mut child = compile_shell_command("bash -c 'sleep 60 && sleep 60' && sleep 60")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(did_process_spawn_shell(pid).unwrap());
// Get all child processes and all childrens children,
// so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 1);
let mut childrens_children = Vec::new();
for child_process in &child_processes {
childrens_children.extend(get_child_processes(child_process.stat()?.pid));
}
assert_eq!(childrens_children.len(), 1);
// Kill the process and make sure its childen will be killed.
assert!(kill_child(0, &mut child, true));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
// Assert that all children's child processes have been killed.
for child_process in childrens_children {
assert!(!child_process.is_alive());
}
Ok(())
}
#[test]
/// Ensure a normal command without `sh -c` will be killed.
fn test_normal_command_is_killed() -> Result<()> {
let mut child = Command::new("sleep")
.arg("60")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process did not spawn a shell.
assert!(!did_process_spawn_shell(pid).unwrap());
// No child processes exist
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 0);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
assert!(process_is_gone(pid));
Ok(())
}
#[test]
/// Ensure a normal command and all its children will be
/// properly killed without any detached processes.
fn test_normal_command_children_are_killed() -> Result<()> {
let mut child = Command::new("bash")
.arg("-c")
.arg("sleep 60 & sleep 60 && sleep 60")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(!did_process_spawn_shell(pid).unwrap());
// Get all child processes, so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 2);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, true));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
Ok(())
// Get all processes whose `stat` can be accessed without any errors.
// If the stat() result matches the process group, use the process PID.
all_processes
.into_iter()
.filter_map(|result| result.ok())
.filter_map(|process| match process.stat() {
Ok(stat) if stat.pgrp == pgrp => Some(process.pid),
_ => None,
})
.collect()
}
}

View file

@ -5,16 +5,27 @@
//! Depending on the target, the respective platform is read and loaded into this scope.
// Unix specific process handling
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
mod linux;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
pub use self::linux::*;
// Shared between Linux and Apple
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use self::unix::*;
// Apple specific process handling
#[cfg(any(target_vendor = "apple"))]
// Linux specific process support
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "linux")]
pub use self::linux::process_exists;
#[cfg(all(test, target_os = "linux"))]
use self::linux::tests;
// Apple specific process support
#[cfg(target_vendor = "apple")]
mod apple;
#[cfg(target_vendor = "apple")]
pub use self::apple::*;
pub use self::apple::process_exists;
#[cfg(all(test, target_vendor = "apple"))]
use self::apple::tests;
// Windows specific process handling
#[cfg(any(target_os = "windows"))]

View file

@ -0,0 +1,328 @@
use std::process::Command;
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::Result;
use command_group::{GroupChild, Signal, UnixChildExt};
use log::info;
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
pub fn compile_shell_command(command_string: &str) -> Command {
let mut command = Command::new("sh");
command.arg("-c").arg(command_string);
command
}
fn map_action_to_signal(action: &ProcessAction) -> Signal {
match action {
ProcessAction::Pause => Signal::SIGSTOP,
ProcessAction::Resume => Signal::SIGCONT,
}
}
fn map_internal_signal_to_nix_signal(signal: InternalSignal) -> Signal {
match signal {
InternalSignal::SigKill => Signal::SIGKILL,
InternalSignal::SigInt => Signal::SIGINT,
InternalSignal::SigTerm => Signal::SIGTERM,
InternalSignal::SigCont => Signal::SIGCONT,
InternalSignal::SigStop => Signal::SIGSTOP,
}
}
/// Convenience wrapper around `send_signal_to_child` for raw unix signals.
/// Its purpose is to hide platform specific logic.
pub fn send_internal_signal_to_child(
child: &mut GroupChild,
signal: InternalSignal,
send_to_children: bool,
) -> Result<()> {
let signal = map_internal_signal_to_nix_signal(signal);
send_signal_to_child(child, signal, send_to_children)
}
/// Convenience wrapper around `send_signal_to_child` for internal actions on processes.
/// Its purpose is to hide platform specific logic.
pub fn run_action_on_child(
child: &mut GroupChild,
action: &ProcessAction,
send_to_children: bool,
) -> Result<()> {
let signal = map_action_to_signal(action);
send_signal_to_child(child, signal, send_to_children)
}
/// Send a signal to one of Pueue's child process or process group handles.
pub fn send_signal_to_child(
child: &mut GroupChild,
signal: Signal,
send_to_children: bool,
) -> Result<()> {
if send_to_children {
// Send the signal to the process group
child.signal(signal)?;
} else {
// Send the signal to the process itself
child.inner().signal(signal)?;
}
Ok(())
}
/// This is a helper function to safely kill a child process or process group.
/// Its purpose is to properly kill all processes and prevent any dangling processes.
pub fn kill_child(
task_id: usize,
child: &mut GroupChild,
kill_children: bool,
) -> std::io::Result<()> {
let result = if kill_children {
child.kill()
} else {
child.inner().kill()
};
match result {
Ok(_) => Ok(()),
Err(ref e) if e.kind() == std::io::ErrorKind::InvalidData => {
// Process already exited
info!("Task {task_id} has already finished by itself.");
Ok(())
}
Err(err) => Err(err),
}
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use std::time::Duration;
use anyhow::Result;
use command_group::CommandGroup;
use pretty_assertions::assert_eq;
use super::*;
use crate::process_helper::process_exists;
use crate::process_helper::tests::get_process_group_pids;
/// Assert that certain process id no longer exists
fn process_is_gone(pid: u32) -> bool {
!process_exists(pid)
}
#[test]
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.group_spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
assert!(ecode.success());
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes.
fn test_shell_command_is_killed() -> Result<()> {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, true).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes when using unix
/// signals directly.
fn test_shell_command_is_killed_with_signal() -> Result<()> {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
send_signal_to_child(&mut child, Signal::SIGKILL, true).unwrap();
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure that a `sh -c` process with a child process that has children of its own
/// will properly kill all processes and their children's children without detached processes.
fn test_shell_command_children_are_killed() -> Result<()> {
let mut child = compile_shell_command("bash -c 'sleep 60 && sleep 60' && sleep 60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure its childen will be killed.
assert!(kill_child(0, &mut child, true).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure a normal command without `sh -c` will be killed.
fn test_normal_command_is_killed() -> Result<()> {
let mut child = Command::new("sleep")
.arg("60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// No further processes exist in the group
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 1);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
assert!(process_is_gone(pid as u32));
Ok(())
}
#[test]
/// Ensure a normal command and all its children will be
/// properly killed without any detached processes.
fn test_normal_command_children_are_killed() -> Result<()> {
let mut child = Command::new("bash")
.arg("-c")
.arg("sleep 60 & sleep 60 && sleep 60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, true).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure a command with children can be killed separately from
/// the child process.
fn test_kill_command_only() -> Result<()> {
let mut child = Command::new("bash")
.arg("-c")
.arg("sleep 60 & sleep 60 && sleep 60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false).is_ok());
// Sleep a little to give the process time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
assert!(kill_child(0, &mut child, false).is_err());
// Assert that the remaining processes are still there
assert_eq!(get_process_group_pids(pid).len(), 2);
// Kill the group
assert!(kill_child(0, &mut child, true).is_ok());
sleep(Duration::from_millis(500));
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
}

View file

@ -1,22 +1,21 @@
use std::process::{Child, Command};
use std::process::Command;
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::{bail, Result};
use command_group::GroupChild;
use log::{error, info, warn};
use winapi::shared::minwindef::FALSE;
use winapi::shared::ntdef::NULL;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::{
OpenProcess, OpenThread, ResumeThread, SuspendThread, TerminateProcess,
};
use winapi::um::processthreadsapi::{OpenThread, ResumeThread, SuspendThread};
use winapi::um::tlhelp32::{
CreateToolhelp32Snapshot, Process32First, Process32Next, Thread32First, Thread32Next,
PROCESSENTRY32, TH32CS_SNAPPROCESS, TH32CS_SNAPTHREAD, THREADENTRY32,
};
use winapi::um::winnt::{PROCESS_TERMINATE, THREAD_SUSPEND_RESUME};
use winapi::um::winnt::THREAD_SUSPEND_RESUME;
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
@ -33,15 +32,19 @@ pub fn compile_shell_command(command_string: &str) -> Command {
}
pub fn send_internal_signal_to_child(
_child: &Child,
_child: &mut GroupChild,
_signal: InternalSignal,
_send_to_children: bool,
) -> Result<bool> {
) -> Result<()> {
bail!("Trying to send unix signal on a windows machine. This isn't supported.");
}
/// Send a signal to a windows process.
pub fn run_action_on_child(child: &Child, action: &ProcessAction, _children: bool) -> Result<bool> {
pub fn run_action_on_child(
child: &mut GroupChild,
action: &ProcessAction,
_children: bool,
) -> Result<()> {
let pids = get_cur_task_processes(child.id());
if pids.is_empty() {
bail!("Process has just gone away");
@ -64,28 +67,34 @@ pub fn run_action_on_child(child: &Child, action: &ProcessAction, _children: boo
}
}
Ok(true)
Ok(())
}
/// Kill a child process
pub fn kill_child(task_id: usize, child: &mut Child, _kill_children: bool) -> bool {
match child.kill() {
Err(_) => {
info!("Task {task_id} has already finished by itself");
false
}
Ok(_) => {
let pids = get_cur_task_processes(child.id());
for pid in pids {
terminate_process(pid);
}
true
pub fn kill_child(
task_id: usize,
child: &mut GroupChild,
kill_children: bool,
) -> std::io::Result<()> {
let result = if kill_children {
child.kill()
} else {
child.inner().kill()
};
match result {
Ok(_) => Ok(()),
Err(ref e) if e.kind() == std::io::ErrorKind::InvalidData => {
// Process already exited
info!("Task {task_id} has already finished by itself.");
Ok(())
}
Err(err) => Err(err),
}
}
/// Get current task pid, all child pid and all children's children
/// TODO: see if this can be simplified using QueryInformationJobObject
/// on the job object created by command_group.
fn get_cur_task_processes(task_pid: u32) -> Vec<u32> {
let mut all_pids = Vec::new();
@ -221,22 +230,6 @@ fn resume_thread(tid: u32) {
}
}
/// Terminate a process
/// [TerminateProcess](https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-terminateprocess)
fn terminate_process(pid: u32) {
unsafe {
// Get a handle for the target process
let process_handle = OpenProcess(PROCESS_TERMINATE, FALSE, pid);
// If TerminateProcess fails, the return value is zero.
if 0 == TerminateProcess(process_handle, 1) {
let err_code = GetLastError();
warn!("Failed to terminate process {pid} with error code {err_code}");
}
CloseHandle(process_handle);
}
}
/// Assert that certain process id no longer exists
pub fn process_exists(pid: u32) -> bool {
unsafe {
@ -269,6 +262,8 @@ mod test {
use std::thread::sleep;
use std::time::Duration;
use command_group::CommandGroup;
use super::*;
/// Assert that certain process id no longer exists
@ -306,7 +301,7 @@ mod test {
#[test]
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
@ -322,7 +317,7 @@ mod test {
/// See https://github.com/Nukesor/pueue/issues/315
fn test_shell_command_is_killed() -> Result<()> {
let mut child = compile_shell_command("sleep 60; sleep 60; echo 'this is a test'")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let pid = child.id();
@ -330,7 +325,7 @@ mod test {
let process_ids = assert_process_ids(pid, 1, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
assert!(kill_child(0, &mut child, false).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
@ -352,14 +347,14 @@ mod test {
/// will properly kill all processes and their children's children without detached processes.
fn test_shell_command_children_are_killed() -> Result<()> {
let mut child = compile_shell_command("powershell -c 'sleep 60; sleep 60'; sleep 60")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let pid = child.id();
// Get all processes, so we can make sure they no longer exist afterwards.
let process_ids = assert_process_ids(pid, 2, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
assert!(kill_child(0, &mut child, false).is_ok());
// Assert that the direct child (powershell -c) has been killed.
sleep(Duration::from_millis(500));
@ -380,7 +375,7 @@ mod test {
let mut child = Command::new("ping")
.arg("localhost")
.arg("-t")
.spawn()
.group_spawn()
.expect("Failed to spawn ping");
let pid = child.id();
@ -388,7 +383,7 @@ mod test {
let _ = assert_process_ids(pid, 1, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
assert!(kill_child(0, &mut child, false).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
@ -406,7 +401,7 @@ mod test {
let mut child = Command::new("powershell")
.arg("-c")
.arg("sleep 60; sleep 60; sleep 60")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let pid = child.id();
@ -414,7 +409,7 @@ mod test {
let process_ids = assert_process_ids(pid, 1, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, true));
assert!(kill_child(0, &mut child, true).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));

View file

@ -16,7 +16,7 @@ async fn multiple_tasks() -> Result<()> {
// Run a command that'll run for a short time after a delay.
// The `pueue wait` command will be spawne directly afterwards, resulting in the spawned
// process to wait for this command to finish.
run_client_command(shared, &["add", "--delay", "1 second", "sleep 1"])?;
run_client_command(shared, &["add", "--delay", "2 seconds", "sleep 1"])?;
// Spawn the `pueue wait` command in a separate thread.
// We expect it to finish later on its own.

View file

@ -1,5 +1,3 @@
use std::convert::TryInto;
use anyhow::Result;
use pretty_assertions::assert_eq;
@ -14,12 +12,12 @@ use crate::helper::*;
#[tokio::test]
async fn test_start_running() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
let child = standalone_daemon(&settings.shared).await?;
let mut child = standalone_daemon(&settings.shared).await?;
let shared = &settings.shared;
// Kill the daemon and wait for it to shut down.
assert_success(shutdown_daemon(shared).await?);
wait_for_shutdown(child.id().try_into()?).await?;
wait_for_shutdown(&mut child).await?;
// Boot it up again
let mut child = standalone_daemon(&settings.shared).await?;
@ -40,7 +38,7 @@ async fn test_start_running() -> Result<()> {
#[tokio::test]
async fn test_start_paused() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
let child = standalone_daemon(&settings.shared).await?;
let mut child = standalone_daemon(&settings.shared).await?;
let shared = &settings.shared;
// This pauses the daemon
@ -48,7 +46,7 @@ async fn test_start_paused() -> Result<()> {
// Kill the daemon and wait for it to shut down.
assert_success(shutdown_daemon(shared).await?);
wait_for_shutdown(child.id().try_into()?).await?;
wait_for_shutdown(&mut child).await?;
// Boot it up again
let mut child = standalone_daemon(&settings.shared).await?;

View file

@ -1,5 +1,3 @@
use std::convert::TryInto;
use anyhow::{Context, Result};
use crate::fixtures::*;
@ -12,10 +10,11 @@ async fn test_ctrlc() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
let mut child = standalone_daemon(&settings.shared).await?;
use nix::sys::signal::{kill, Signal};
use command_group::{Signal, UnixChildExt};
// Send SIGTERM signal to process via nix
let nix_pid = nix::unistd::Pid::from_raw(child.id() as i32);
kill(nix_pid, Signal::SIGTERM).context("Failed to send SIGTERM to pid")?;
child
.signal(Signal::SIGTERM)
.context("Failed to send SIGTERM to daemon")?;
// Sleep for 500ms and give the daemon time to shut down
sleep_ms(500).await;
@ -37,7 +36,7 @@ async fn test_graceful_shutdown() -> Result<()> {
// Kill the daemon gracefully and wait for it to shut down.
assert_success(shutdown_daemon(&settings.shared).await?);
wait_for_shutdown(child.id().try_into()?).await?;
wait_for_shutdown(&mut child).await?;
// Sleep for 500ms and give the daemon time to shut down
sleep_ms(500).await;

View file

@ -1,9 +1,9 @@
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::process::Child;
use anyhow::{anyhow, bail, Context, Result};
use procfs::process::Process;
use pueue_lib::network::message::*;
use pueue_lib::settings::*;
@ -56,26 +56,23 @@ pub async fn get_pid(pid_path: &Path) -> Result<i32> {
}
/// Waits for a daemon to shut down.
/// This is done by waiting for the pid to disappear.
pub async fn wait_for_shutdown(pid: i32) -> Result<()> {
// Try to read the process. If this fails, the daemon already exited.
let process = match Process::new(pid) {
Ok(process) => process,
Err(_) => return Ok(()),
};
pub async fn wait_for_shutdown(child: &mut Child) -> Result<()> {
// Give the daemon about 1 sec to shutdown.
let tries = 40;
let mut current_try = 0;
while current_try < tries {
// Process is still alive, wait a little longer
if process.is_alive() {
// Try to read the process exit code. If this succeeds or
// an error is returned, the process is gone.
if let Ok(None) = child.try_wait() {
// Process is still alive, wait a little longer
sleep_ms(50).await;
current_try += 1;
continue;
}
// Process is gone; either there was a status code
// or the child is not a child of this process (highly
// unlikely).
return Ok(());
}