Merge pull request #376 from mjpieters/process_groups

Give each task a separate process group
This commit is contained in:
Arne Beer 2022-11-21 13:20:24 +01:00 committed by GitHub
commit 28e4caedb4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 805 additions and 904 deletions

View file

@ -20,10 +20,19 @@ on:
jobs:
publish:
name: Create test coverage
runs-on: ubuntu-latest
name: Create test coverage on ${{ matrix.os }} for ${{ matrix.target }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
target:
- x86_64-unknown-linux-gnu
- x86_64-apple-darwin
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-gnu
- os: macos-latest
target: x86_64-apple-darwin
steps:
- name: Checkout code
@ -35,6 +44,7 @@ jobs:
toolchain: stable
override: true
components: llvm-tools-preview
target: ${{ matrix.target }}
- uses: actions/cache@v3
with:
@ -44,8 +54,9 @@ jobs:
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-coverage-${{ hashFiles('**/Cargo.lock') }}
key: ${{ runner.os }}-cargo-coverage-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-coverage-${{ matrix.target }}-
${{ runner.os }}-cargo-${{ matrix.target }}-
- name: Install cargo-llvm-cov and nextest

View file

@ -32,7 +32,7 @@ jobs:
- arm-unknown-linux-musleabihf
- x86_64-pc-windows-msvc
- x86_64-apple-darwin
- aarch64-apple-ios
- aarch64-apple-darwin
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-gnu
@ -47,7 +47,7 @@ jobs:
- os: macos-latest
target: x86_64-apple-darwin
- os: macos-latest
target: aarch64-apple-ios
target: aarch64-apple-darwin
steps:
- name: Checkout code

View file

@ -28,7 +28,7 @@ jobs:
- arm-unknown-linux-musleabihf
- x86_64-pc-windows-msvc
- x86_64-apple-darwin
- aarch64-apple-ios
- aarch64-apple-darwin
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-musl
@ -79,11 +79,11 @@ jobs:
cross: false
strip: true
- os: macos-latest
target: aarch64-apple-ios
client_artifact_name: target/aarch64-apple-ios/release/pueue
daemon_artifact_name: target/aarch64-apple-ios/release/pueued
client_release_name: pueue-ios-aarch64
daemon_release_name: pueued-ios-aarch64
target: aarch64-apple-darwin
client_artifact_name: target/aarch64-apple-darwin/release/pueue
daemon_artifact_name: target/aarch64-apple-darwin/release/pueued
client_release_name: pueue-darwin-aarch64
daemon_release_name: pueued-darwin-aarch64
cross: false
strip: true

View file

@ -19,7 +19,7 @@ jobs:
- arm-unknown-linux-musleabihf
- x86_64-pc-windows-msvc
- x86_64-apple-darwin
- aarch64-apple-ios
- aarch64-apple-darwin
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-musl
@ -70,11 +70,11 @@ jobs:
cross: false
strip: true
- os: macos-latest
target: aarch64-apple-ios
client_artifact_name: target/aarch64-apple-ios/release/pueue
daemon_artifact_name: target/aarch64-apple-ios/release/pueued
client_release_name: pueue-ios-aarch64
daemon_release_name: pueued-ios-aarch64
target: aarch64-apple-darwin
client_artifact_name: target/aarch64-apple-darwin/release/pueue
daemon_artifact_name: target/aarch64-apple-darwin/release/pueued
client_release_name: pueue-darwin-aarch64
daemon_release_name: pueued-darwin-aarch64
cross: false
strip: true

View file

@ -32,7 +32,7 @@ jobs:
- arm-unknown-linux-musleabihf
- x86_64-pc-windows-msvc
- x86_64-apple-darwin
- aarch64-apple-ios
- aarch64-apple-darwin
include:
- os: ubuntu-latest
target: x86_64-unknown-linux-gnu
@ -59,7 +59,7 @@ jobs:
cross: false
strip: true
- os: macos-latest
target: aarch64-apple-ios
target: aarch64-apple-darwin
cross: true
strip: true

View file

@ -8,7 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- pueue log output now includes the task label, if any. [#355](https://github.com/Nukesor/pueue/issues/355)
- `pueue log` output now includes the task label, if any. [#355](https://github.com/Nukesor/pueue/issues/355)
- Enable `pueue edit` to edit multiple properties in one go.
- Tasks are now started in a process group, and pueue kill will kill all processes in the group. The `--children` cli flag has been deprecated (signals go to the whole group, always). This makes brings pueue's task handling in line with how interactive shells handle jobs. [#372](https://github.com/Nukesor/pueue/issues/372)
### Added
@ -29,10 +31,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added the `created_at` and `enqueued_at` metadata fields on `Task` [#356](https://github.com/Nukesor/pueue/issues/356).
They'll only be exposed when running `status --json` for now.
### Changed
- Enable `pueue edit` to edit multiple properties in one go.
### Fixed
- Interpret the `$EDITOR` command, when editing a task's command/path, as a shell expression instead of an executable ([#336](https://github.com/Nukesor/pueue/issues/336)).
@ -40,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Don't show the version warning message between daemon and client, when using any `--json` flag.
- Fix some test failures in non-standard environments for NixOS test suite ([#346](https://github.com/Nukesor/pueue/issues/346)).
- The time in pueue's logs will now be in localtime instead of UTC [#385](https://github.com/Nukesor/pueue/issues/385).
- macos support has been brought on par with Linux.
### Misc
@ -47,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- A codecov.yml syntax error was corrected, which prevented Codecov from applying the
repository-specific configuration.
- CI tests are now run using cargo nextest, for faster test execution, flaky test handling and better test output.
- The macos test suite is now the same as that for Linux, including the client and daemon test suites.
## [2.1.0] - 2022-07-21

175
Cargo.lock generated
View file

@ -123,6 +123,25 @@ dependencies = [
"console",
]
[[package]]
name = "bindgen"
version = "0.59.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8"
dependencies = [
"bitflags",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -173,6 +192,15 @@ version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -205,6 +233,17 @@ dependencies = [
"scanlex",
]
[[package]]
name = "clang-sys"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa2e27ae6ab525c3d369ded447057bca5438d86dc3a68f6faafb8269ba82ebf3"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
name = "clap"
version = "3.2.17"
@ -265,6 +304,16 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "command-group"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7a8a86f409b4a59df3a3e4bee2de0b83f1755fdd2a25e3a9684c396fc4bed2c"
dependencies = [
"nix 0.22.3",
"winapi",
]
[[package]]
name = "console"
version = "0.15.1"
@ -293,6 +342,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@ -368,6 +426,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "darwin-libproc"
version = "0.2.0"
source = "git+https://github.com/jamwaffles/darwin-libproc.git?rev=cbd571e4be87a6603de328d1f1f0ad54d7bdfedf#cbd571e4be87a6603de328d1f1f0ad54d7bdfedf"
dependencies = [
"darwin-libproc-sys",
"libc",
"memchr",
]
[[package]]
name = "darwin-libproc-sys"
version = "0.2.0"
source = "git+https://github.com/jamwaffles/darwin-libproc.git?rev=cbd571e4be87a6603de328d1f1f0ad54d7bdfedf#cbd571e4be87a6603de328d1f1f0ad54d7bdfedf"
[[package]]
name = "diff"
version = "0.1.13"
@ -471,6 +544,16 @@ dependencies = [
"instant",
]
[[package]]
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@ -599,6 +682,12 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d"
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "half"
version = "1.8.2"
@ -721,10 +810,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.132"
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.135"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c"
[[package]]
name = "libloading"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f"
dependencies = [
"cfg-if",
"winapi",
]
[[package]]
name = "libproc"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b799ad155d75ce914c467ee5627b62247c20d4aedbd446f821484cebf3cded7"
dependencies = [
"bindgen",
"errno",
"libc",
]
[[package]]
name = "linux-raw-sys"
@ -766,6 +882,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.5.3"
@ -789,11 +911,12 @@ dependencies = [
[[package]]
name = "nix"
version = "0.24.2"
version = "0.22.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "195cdbc1741b8134346d515b3a56a1c94b0912758009cfd53f99ea0f57b065fc"
checksum = "e4916f159ed8e5de0082076562152a76b7a1f64a01fd9d1e0fea002c37624faf"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
@ -811,6 +934,16 @@ dependencies = [
"libc",
]
[[package]]
name = "nom"
version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@ -902,6 +1035,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "1.1.0"
@ -1062,6 +1201,8 @@ checksum = "2dfb6451c91904606a1abe93e83a8ec851f45827fa84273f256ade45dc095818"
dependencies = [
"bitflags",
"byteorder",
"chrono",
"flate2",
"hex",
"lazy_static",
"rustix",
@ -1079,17 +1220,16 @@ dependencies = [
"clap",
"clap_complete",
"comfy-table",
"command-group",
"crossbeam-channel",
"crossterm",
"ctrlc",
"env_logger",
"handlebars",
"log",
"nix 0.24.2",
"pest",
"pest_derive",
"pretty_assertions",
"procfs",
"pueue-lib",
"rstest",
"serde",
@ -1103,7 +1243,6 @@ dependencies = [
"tempfile",
"test-log",
"tokio",
"whoami",
]
[[package]]
@ -1115,9 +1254,11 @@ dependencies = [
"better-panic",
"byteorder",
"chrono",
"command-group",
"darwin-libproc",
"dirs",
"libproc",
"log",
"nix 0.24.2",
"portpicker",
"pretty_assertions",
"procfs",
@ -1338,6 +1479,12 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
@ -1349,9 +1496,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.35.9"
version = "0.35.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72c825b8aa8010eb9ee99b75f05e10180b9278d161583034d7574c9d617aeada"
checksum = "fbb2fda4666def1433b1b05431ab402e42a1084285477222b72d6c564c417cef"
dependencies = [
"bitflags",
"errno",
@ -1499,6 +1646,12 @@ dependencies = [
"dirs",
]
[[package]]
name = "shlex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook"
version = "0.3.14"

View file

@ -14,6 +14,7 @@ rust-version = "1.63"
[workspace.dependencies]
chrono = { version = "0.4", features = ["serde"] }
command-group = "1.0.8"
log = "0.4"
serde = "1"
serde_json = "1"
@ -73,6 +74,7 @@ shell-escape = "0.1"
simplelog = "0.12"
tempfile = "3"
command-group = { workspace = true }
chrono = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
@ -95,9 +97,3 @@ similar-asserts = "1"
# nextest run --no-capture`)
env_logger = "0.9.1"
test-log = "0.2.11"
# Test specific dev-dependencies
[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies]
nix = "0.24"
whoami = "1"
procfs = { version = "0.14", default-features = false }

View file

@ -53,9 +53,8 @@ The queue will be continuously processed, even if you no longer have any active
- A lot more. Check the -h options for each subcommand for detailed options.
- Cross Platform
* Linux is fully supported and battle-tested.
* MacOS is fully supported and on par with Linux.
* Windows is fully supported and working fine for quite a while.
* MacOS only has **rudimentary** process handling, but it's still usable.
Check this [issue](https://github.com/Nukesor/pueue/issues/115) to find out what's missing.
- [Why should I use it](https://github.com/Nukesor/pueue/wiki/FAQ#why-should-i-use-it)
- [Advantages over Using a Terminal Multiplexer](https://github.com/Nukesor/pueue/wiki/FAQ#advantages-over-using-a-terminal-multiplexer)

View file

@ -134,8 +134,7 @@ pub enum SubCommand {
#[clap(short, long)]
all: bool,
/// Also resume direct child processes of your paused tasks.
/// By default only the main process will get a SIGSTART.
/// Deprecated: this switch no longer has any effect.
#[clap(short, long)]
children: bool,
},
@ -212,10 +211,7 @@ pub enum SubCommand {
#[clap(short, long)]
wait: bool,
/// Also pause direct child processes of a task's main process.
/// By default only the main process will get a SIGSTOP.
/// This is useful when calling bash scripts, which start other processes themselves.
/// This operation is not recursive!
/// Deprecated: this switch no longer has any effect.
#[clap(short, long)]
children: bool,
},
@ -234,8 +230,7 @@ pub enum SubCommand {
#[clap(short, long)]
all: bool,
/// Send the SIGTERM signal to all children as well.
/// Useful when working with shell scripts.
/// Deprecated: this switch no longer has any effect.
#[clap(short, long)]
children: bool,
@ -384,8 +379,7 @@ pub enum SubCommand {
/// Kill all tasks, clean up afterwards and reset EVERYTHING!
Reset {
/// Send the SIGTERM signal to all children as well.
/// Useful when working with shell scripts.
/// Deprecated: this switch no longer has any effect.
#[clap(short, long)]
children: bool,

View file

@ -84,6 +84,22 @@ async fn main() -> Result<()> {
bail!("Couldn't find a configuration file. Did you start the daemon yet?");
}
// Warn if the deprecated --children option was used
if let Some(subcommand) = &opt.cmd {
if matches!(
subcommand,
SubCommand::Start { children: true, .. }
| SubCommand::Pause { children: true, .. }
| SubCommand::Kill { children: true, .. }
| SubCommand::Reset { children: true, .. }
) {
println!(concat!(
"Note: The --children flag is deprecated and will be removed in a future release. ",
"It no longer has any effect, as this command now always applies to all processes in a task."
));
}
}
// Create client to talk with the daemon and connect.
let mut client = Client::new(settings, opt)
.await

View file

@ -93,7 +93,7 @@ macro_rules! ok_or_return_failure_message {
mod fixtures {
pub use crossbeam_channel::Sender;
use std::collections::HashMap;
use std::path::PathBuf;
use std::env::temp_dir;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;
@ -135,7 +135,7 @@ mod fixtures {
pub fn get_stub_task_in_group(id: &str, group: &str, status: TaskStatus) -> Task {
Task::new(
id.to_string(),
PathBuf::from("/tmp"),
temp_dir(),
HashMap::new(),
group.to_string(),
status,

View file

@ -1,12 +1,12 @@
use command_group::GroupChild;
use std::collections::BTreeMap;
use std::process::Child;
/// This structure is needed to manage worker pools for groups.
/// It's a newtype pattern around a nested BTreeMap, which implements some convenience functions.
///
/// The datastructure contains these types of data:
/// BTreeMap<group, BTreeMap<group_worker_id, (task_id, Subprocess handle)>
pub struct Children(pub BTreeMap<String, BTreeMap<usize, (usize, Child)>>);
pub struct Children(pub BTreeMap<String, BTreeMap<usize, (usize, GroupChild)>>);
impl Children {
/// Returns whether there are any active tasks across all groups.
@ -28,25 +28,10 @@ impl Children {
false
}
/// A convenience function to get a child by its respective task_id.
/// We have to do a nested linear search over all children of all pools,
/// beceause these datastructure aren't indexed via task_ids.
pub fn get_child(&self, task_id: usize) -> Option<&Child> {
for pool in self.0.values() {
for (child_task_id, child) in pool.values() {
if child_task_id == &task_id {
return Some(child);
}
}
}
None
}
/// A convenience function to get a mutable child by its respective task_id.
/// We have to do a nested linear search over all children of all pools,
/// beceause these datastructure aren't indexed via task_ids.
pub fn get_child_mut(&mut self, task_id: usize) -> Option<&mut Child> {
pub fn get_child_mut(&mut self, task_id: usize) -> Option<&mut GroupChild> {
for pool in self.0.values_mut() {
for (child_task_id, child) in pool.values_mut() {
if child_task_id == &task_id {
@ -107,7 +92,7 @@ impl Children {
/// This function should only be called when spawning a new process.
/// At this point, we're sure that the worker pool for the given group already exists, hence
/// the expect call.
pub fn add_child(&mut self, group: &str, worker_id: usize, task_id: usize, child: Child) {
pub fn add_child(&mut self, group: &str, worker_id: usize, task_id: usize, child: GroupChild) {
let pool = self
.0
.get_mut(group)

View file

@ -15,17 +15,10 @@ impl TaskHandler {
/// By default, this kills tasks with Rust's subprocess handling "kill" logic.
/// However, the user can decide to send unix signals to the processes as well.
///
/// `kill_children` Kill all direct child processes as well
/// `pause_groups` If `group` or `all` is given, the groups should be paused under some
/// circumstances. This is mostly to prevent any further task execution during an emergency
/// `signal` Don't kill the task as usual, but rather send a unix process signal.
pub fn kill(
&mut self,
tasks: TaskSelection,
kill_children: bool,
pause_groups: bool,
signal: Option<Signal>,
) {
pub fn kill(&mut self, tasks: TaskSelection, pause_groups: bool, signal: Option<Signal>) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
// Get the keys of all tasks that should be resumed
@ -63,9 +56,9 @@ impl TaskHandler {
for task_id in task_ids {
if let Some(signal) = signal.clone() {
self.send_internal_signal(task_id, signal, kill_children);
self.send_internal_signal(task_id, signal);
} else {
self.kill_task(task_id, kill_children);
self.kill_task(task_id);
}
}
@ -73,9 +66,9 @@ impl TaskHandler {
}
/// Send a signal to a specific child process.
/// This is a wrapper around [send_internal_signal_to_child], which does a little bit of
/// This is a wrapper around [send_signal_to_child], which does a little bit of
/// additional error handling.
pub fn send_internal_signal(&mut self, task_id: usize, signal: Signal, send_to_children: bool) {
pub fn send_internal_signal(&mut self, task_id: usize, signal: Signal) {
let child = match self.children.get_child_mut(task_id) {
Some(child) => child,
None => {
@ -84,16 +77,18 @@ impl TaskHandler {
}
};
if let Err(err) = send_internal_signal_to_child(child, signal, send_to_children) {
if let Err(err) = send_signal_to_child(child, signal) {
warn!("Failed to send signal to task {task_id} with error: {err}");
};
}
/// Kill a specific task and handle it accordingly.
/// Triggered on `reset` and `kill`.
pub fn kill_task(&mut self, task_id: usize, kill_children: bool) {
pub fn kill_task(&mut self, task_id: usize) {
if let Some(child) = self.children.get_child_mut(task_id) {
kill_child(task_id, child, kill_children);
kill_child(task_id, child).unwrap_or_else(|err| {
warn!("Failed to send kill to task {task_id} child process {child:?} with error {err:?}");
})
} else {
warn!("Tried to kill non-existing child: {task_id}");
}

View file

@ -25,13 +25,11 @@ impl TaskHandler {
fn handle_message(&mut self, message: Message) {
match message {
Message::Pause(message) => self.pause(message.tasks, message.children, message.wait),
Message::Start(message) => self.start(message.tasks, message.children),
Message::Kill(message) => {
self.kill(message.tasks, message.children, true, message.signal)
}
Message::Pause(message) => self.pause(message.tasks, message.wait),
Message::Start(message) => self.start(message.tasks),
Message::Kill(message) => self.kill(message.tasks, true, message.signal),
Message::Send(message) => self.send(message.task_id, message.input),
Message::Reset(message) => self.reset(message.children),
Message::Reset(_) => self.reset(),
Message::Group(message) => self.handle_group_message(message),
Message::DaemonShutdown(shutdown) => {
self.initiate_shutdown(shutdown);

View file

@ -11,9 +11,8 @@ use crate::task_handler::{ProcessAction, Shutdown, TaskHandler};
impl TaskHandler {
/// Pause specific tasks or groups.
///
/// `pause_children` decides, whether the pause signal will be send to child processes as well.
/// `wait` decides, whether running tasks will kept running until they finish on their own.
pub fn pause(&mut self, tasks: TaskSelection, pause_children: bool, wait: bool) {
pub fn pause(&mut self, tasks: TaskSelection, wait: bool) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
@ -49,7 +48,7 @@ impl TaskHandler {
// Pause all tasks that were found.
if !wait {
for id in keys {
self.pause_task(&mut state, id, pause_children);
self.pause_task(&mut state, id);
}
}
@ -57,8 +56,8 @@ impl TaskHandler {
}
/// Pause a specific task.
/// Send a signal to the process to actually pause the OS process.
fn pause_task(&mut self, state: &mut LockedState, id: usize, pause_children: bool) {
match self.perform_action(id, ProcessAction::Pause, pause_children) {
fn pause_task(&mut self, state: &mut LockedState, id: usize) {
match self.perform_action(id, ProcessAction::Pause) {
Err(err) => error!("Failed pausing task {id}: {err:?}"),
Ok(success) => {
if success {

View file

@ -15,7 +15,7 @@ impl TaskHandler {
}
};
{
let child_stdin = child.stdin.as_mut().unwrap();
let child_stdin = child.inner().stdin.as_mut().unwrap();
if let Err(err) = child_stdin.write_all(&input.into_bytes()) {
error!("Failed to send input to task {task_id} with err {err:?}");
};

View file

@ -14,10 +14,7 @@ impl TaskHandler {
/// By default, this command only resumes tasks.
/// However, if specific task_ids are provided, tasks can actually be force-started.
/// Of course, they can only be started if they're in a valid status, i.e. Queued/Stashed.
///
/// `start_children` decides, whether the resume Signal will be send to child processes as well.
/// Of course, this only applies to processes that are resumend and not force-spawned.
pub fn start(&mut self, tasks: TaskSelection, start_children: bool) {
pub fn start(&mut self, tasks: TaskSelection) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
@ -29,7 +26,7 @@ impl TaskHandler {
for task_id in task_ids {
// Continue all children that are simply paused
if self.children.has_child(task_id) {
self.continue_task(&mut state, task_id, start_children);
self.continue_task(&mut state, task_id);
} else {
// Start processes for all tasks that haven't been started yet
self.start_process(task_id, &mut state);
@ -66,14 +63,14 @@ impl TaskHandler {
// Resume all specified paused tasks
for task_id in task_ids {
self.continue_task(&mut state, task_id, start_children);
self.continue_task(&mut state, task_id);
}
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
/// Send a start signal to a paused task to continue execution.
fn continue_task(&mut self, state: &mut LockedState, task_id: usize, start_children: bool) {
fn continue_task(&mut self, state: &mut LockedState, task_id: usize) {
// Task doesn't exist
if !self.children.has_child(task_id) {
return;
@ -84,7 +81,7 @@ impl TaskHandler {
return;
}
let success = match self.perform_action(task_id, ProcessAction::Resume, start_children) {
let success = match self.perform_action(task_id, ProcessAction::Resume) {
Err(err) => {
warn!("Failed to resume task {}: {:?}", task_id, err);
false

View file

@ -5,6 +5,7 @@ use std::process::Stdio;
use anyhow::Result;
use chrono::prelude::*;
use command_group::CommandGroup;
use crossbeam_channel::{Receiver, SendError, Sender};
use handlebars::Handlebars;
use log::{debug, error, info};
@ -167,7 +168,7 @@ impl TaskHandler {
fn initiate_shutdown(&mut self, shutdown: Shutdown) {
self.shutdown = Some(shutdown);
self.kill(TaskSelection::All, false, false, None);
self.kill(TaskSelection::All, false, None);
}
/// Check if all tasks are killed.
@ -226,9 +227,9 @@ impl TaskHandler {
/// Kill all children by using the `kill` function.
/// Set the respective group's statuses to `Reset`. This will prevent new tasks from being spawned.
fn reset(&mut self, kill_children: bool) {
fn reset(&mut self) {
self.full_reset = true;
self.kill(TaskSelection::All, kill_children, false, None);
self.kill(TaskSelection::All, false, None);
}
/// As time passes, some delayed tasks may need to be enqueued.
@ -260,11 +261,11 @@ impl TaskHandler {
/// This is a small wrapper around the real platform dependant process handling logic
/// It only ensures, that the process we want to manipulate really does exists.
fn perform_action(&mut self, id: usize, action: ProcessAction, children: bool) -> Result<bool> {
match self.children.get_child(id) {
fn perform_action(&mut self, id: usize, action: ProcessAction) -> Result<bool> {
match self.children.get_child_mut(id) {
Some(child) => {
debug!("Executing action {action:?} to {id}");
run_action_on_child(child, &action, children)?;
send_signal_to_child(child, &action)?;
Ok(true)
}

View file

@ -128,7 +128,7 @@ impl TaskHandler {
.envs(envs.clone())
.stdout(Stdio::from(stdout_log))
.stderr(Stdio::from(stderr_log))
.spawn();
.group_spawn();
// Check if the task managed to spawn
let child = match spawned_command {

View file

@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
This project adheres **somewhat** to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
The concept of SemVer is applied to the daemon/client API, but not the library API itself.
## [2.2.0] - unreleased
### Changed
- Tasks are now started in a process group, with signals (including SIGTERM) optionally sent to the whole group. [#372](https://github.com/Nukesor/pueue/issues/372)
## [0.21.0] - unreleased
### Added

View file

@ -33,6 +33,7 @@ strum_macros = "0.24"
thiserror = "1"
tokio-rustls = "0.23"
command-group = { workspace = true }
log = { workspace = true }
snap = { workspace = true }
serde = { workspace = true }
@ -60,10 +61,18 @@ winapi = { version = "0.3", features = [
] }
# Unix
[target.'cfg(not(windows))'.dependencies]
nix = "0.24"
[target.'cfg(unix)'.dependencies]
whoami = "1"
# Linux / Freebsd
[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies]
procfs = { version = "0.14", default-features = false }
# Linux only
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.14.1"
# Apple only
[target.'cfg(target_vendor = "apple")'.dependencies]
libproc = "0.12.0"
[target.'cfg(target_vendor = "apple")'.dev-dependencies]
# fork that fixes a version conflict, see heim-rs/darwin-libproc#3
# only used as a test dependency.
darwin-libproc = { version="0.2.0", git = "https://github.com/jamwaffles/darwin-libproc.git", rev = "cbd571e4be87a6603de328d1f1f0ad54d7bdfedf" }

View file

@ -144,7 +144,7 @@ impl_into_message!(EnqueueMessage, Message::Enqueue);
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct StartMessage {
pub tasks: TaskSelection,
pub children: bool,
pub children: bool, // deprecated, no longer has any effect
}
impl_into_message!(StartMessage, Message::Start);
@ -178,7 +178,7 @@ pub struct TaskToRestart {
pub struct PauseMessage {
pub tasks: TaskSelection,
pub wait: bool,
pub children: bool,
pub children: bool, // deprecated, no longer has any effect
}
impl_into_message!(PauseMessage, Message::Pause);
@ -205,7 +205,7 @@ pub enum Signal {
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct KillMessage {
pub tasks: TaskSelection,
pub children: bool,
pub children: bool, // deprecated, no longer has any effect
pub signal: Option<Signal>,
}
@ -263,7 +263,7 @@ impl_into_message!(GroupResponseMessage, Message::GroupResponse);
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct ResetMessage {
pub children: bool,
pub children: bool, // deprecated, no longer has any effect
}
impl_into_message!(ResetMessage, Message::Reset);

View file

@ -1,166 +1,24 @@
use std::convert::TryInto;
use std::path::Path;
use std::process::{Child, Command};
use libproc::libproc::{proc_pid, task_info};
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::Result;
use log::debug;
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
pub fn compile_shell_command(command_string: &str) -> Command {
let mut command = Command::new("sh");
command.arg("-c").arg(command_string);
command
}
fn map_action_to_signal(action: &ProcessAction) -> Signal {
match action {
ProcessAction::Pause => Signal::SIGSTOP,
ProcessAction::Resume => Signal::SIGCONT,
}
}
fn map_internal_signal_to_nix_signal(signal: InternalSignal) -> Signal {
match signal {
InternalSignal::SigKill => Signal::SIGKILL,
InternalSignal::SigInt => Signal::SIGINT,
InternalSignal::SigTerm => Signal::SIGTERM,
InternalSignal::SigCont => Signal::SIGCONT,
InternalSignal::SigStop => Signal::SIGSTOP,
}
}
/// Convenience wrapper around `send_signal_to_child` for raw unix signals.
/// Its purpose is to hide platform specific logic.
pub fn send_internal_signal_to_child(
child: &Child,
signal: InternalSignal,
send_to_children: bool,
) -> Result<bool> {
let signal = map_internal_signal_to_nix_signal(signal);
send_signal_to_child(child, signal, send_to_children)
}
/// Convenience wrapper around `send_signal_to_child` for internal actions on processes.
/// Its purpose is to hide platform specific logic.
pub fn run_action_on_child(
child: &Child,
action: &ProcessAction,
send_to_children: bool,
) -> Result<bool> {
let signal = map_action_to_signal(action);
send_signal_to_child(child, signal, send_to_children)
}
/// Send a signal to one of Pueue's child process handles.
/// We need a special since we assume that there's also a `sh -c` around the actuall process.
pub fn send_signal_to_child(
child: &Child,
signal: Signal,
_send_to_children: bool,
) -> Result<bool> {
let pid = child.id();
// Send the signal to the shell, don't propagate to its children yet.
send_signal_to_process(pid, signal, false)?;
signal::kill(Pid::from_raw(pid.try_into().unwrap()), signal)?;
Ok(true)
}
/// This is a helper function to safely kill a child process.
/// Its purpose is to properly kill all processes and prevent any dangling processes.
pub fn kill_child(task_id: usize, child: &mut Child, _kill_children: bool) -> bool {
match child.kill() {
Err(_) => {
debug!("Task {task_id} has already finished by itself");
false
}
_ => true,
}
}
/// Send a signal to a unix process.
fn send_signal_to_process(pid: u32, signal: Signal, _children: bool) -> Result<bool, nix::Error> {
debug!("Sending signal {signal} to {pid}");
signal::kill(Pid::from_raw(pid.try_into().unwrap()), signal)?;
Ok(true)
}
/// Check, whether a specific process is exists or not
/// Check, whether a specific process exists or not
pub fn process_exists(pid: u32) -> bool {
Path::new(&format!("/proc/{pid}")).exists()
proc_pid::pidinfo::<task_info::TaskInfo>(pid.try_into().unwrap(), 0).is_ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;
use std::time::Duration;
pub mod tests {
// TODO: swap darwin_libproc out for libproc when that project supports listing pids by
// group id.
use darwin_libproc;
use log::warn;
/// Assert that certain process id no longer exists
fn process_is_gone(pid: u32) -> bool {
!process_exists(pid)
}
#[test]
/// Simply check, whether spawning of a shell command works
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
assert!(ecode.success());
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes.
fn test_shell_command_is_killed() {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.spawn()
.expect("Failed to spawn echo");
let pid = child.id();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
}
#[test]
/// Ensure a normal command without `sh -c` will be killed.
fn test_normal_command_is_killed() {
let mut child = Command::new("sleep")
.arg("60")
.spawn()
.expect("Failed to spawn echo");
let pid = child.id();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
assert!(process_is_gone(pid));
pub fn get_process_group_pids(pgrp: i32) -> Vec<i32> {
match darwin_libproc::pgrp_only_pids(pgrp) {
Err(error) => {
warn!("Failed to get list of processes in process group {pgrp}: {error}");
Vec::new()
}
Ok(processes) => processes,
}
}
}

View file

@ -1,501 +1,41 @@
use std::convert::TryInto;
use std::process::{Child, Command};
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::{bail, Result};
use log::{debug, info, warn};
use nix::{
sys::signal::{self, Signal},
unistd::Pid,
};
use procfs::process::{all_processes, Process};
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
pub fn compile_shell_command(command_string: &str) -> Command {
let mut command = Command::new("sh");
command.arg("-c").arg(command_string);
command
}
fn map_action_to_signal(action: &ProcessAction) -> Signal {
match action {
ProcessAction::Pause => Signal::SIGSTOP,
ProcessAction::Resume => Signal::SIGCONT,
}
}
fn map_internal_signal_to_nix_signal(signal: InternalSignal) -> Signal {
match signal {
InternalSignal::SigKill => Signal::SIGKILL,
InternalSignal::SigInt => Signal::SIGINT,
InternalSignal::SigTerm => Signal::SIGTERM,
InternalSignal::SigCont => Signal::SIGCONT,
InternalSignal::SigStop => Signal::SIGSTOP,
}
}
/// Convenience wrapper around `send_signal_to_child` for raw unix signals.
/// Its purpose is to hide platform specific logic.
pub fn send_internal_signal_to_child(
child: &Child,
signal: InternalSignal,
send_to_children: bool,
) -> Result<bool> {
let signal = map_internal_signal_to_nix_signal(signal);
send_signal_to_child(child, signal, send_to_children)
}
/// Convenience wrapper around `send_signal_to_child` for internal actions on processes.
/// Its purpose is to hide platform specific logic.
pub fn run_action_on_child(
child: &Child,
action: &ProcessAction,
send_to_children: bool,
) -> Result<bool> {
let signal = map_action_to_signal(action);
send_signal_to_child(child, signal, send_to_children)
}
/// Send a signal to one of Pueue's child process handles.
///
/// There are two scenarios:
///
/// **Normal case**
///
/// A task, such as `sleep 60` get's spawned by the posix shell `sh`.
/// This results in the process `sh -c 'sleep 60'`.
/// Since the posix shell doesn't propagate any process signals to its children, we have to:
/// 1. Send the signal to the shell.
/// 2. Send the signal directly to the children.
/// In our case this would be the `sleep 60` child process.
///
/// If the user also want's to send the signal to all child processes of the task,
/// we have to get all child-processes of the child process.
///
/// **Special case**
///
/// The posix shell `sh` has some some inconsistent behavior.
/// In some circumstances and environments, the `sh -c $command` doesn't spawn a `sh` process with a
/// `$command` child-process, but rather spawns the `$command` as a top-level process directly.
///
/// This makes things a bit more complicated, since we have to find out whether a shell is spawned
/// or not. If a shell is spawned, we do the **Normal case** handling.
///
/// If **no** shell is spawned, we have to send the signal to the top-level process only.
///
/// If the user also want's to send the signal to all child processes of the task,
/// we have to get all child-processes of that `$command` process. and send them the signal.
///
/// Returns `Ok(true)`, if everything went alright
/// Returns `Ok(false)`, if the process went away while we tried to send the signal.
pub fn send_signal_to_child(child: &Child, signal: Signal, send_to_children: bool) -> Result<bool> {
let pid: i32 = child.id().try_into().unwrap();
// Check whether this process actually spawned a shell.
let is_shell = if let Ok(is_shell) = did_process_spawn_shell(pid) {
is_shell
} else {
return Ok(false);
};
if is_shell {
// If it's a shell, we have to send the signal to the actual shell and to all it's children.
// There might be multiple children, for instance, when users use the `&` operator.
// If the `send_to_children` flag is given, the
// Get all children before sending the signal to the parent process.
// Otherwise the parent might go away and we'll no longer be able to access the children.
let shell_children = get_child_processes(pid);
// Send the signal to the shell, don't propagate to its children yet.
send_signal_to_process(pid, signal, false)?;
// Now send the signal to the shells child processes and their respective
// children if the user wants to do so.
for shell_child in shell_children {
send_signal_to_process(shell_child.pid(), signal, send_to_children)?;
}
} else {
// If it isn't a shell, send the signal directly to the process.
// Handle children normally.
send_signal_to_process(pid, signal, send_to_children)?;
}
signal::kill(Pid::from_raw(pid), signal)?;
Ok(true)
}
/// This is a helper function to safely kill a child process.
/// Its purpose is to properly kill all processes and prevent any dangling processes.
///
/// Sadly, this needs some extra handling. Check the docstring of `send_signal_to_child` for
/// additional information on why this needs to be done.
///
/// Returns `true`, if everything went alright
/// Returns `false`, if the process went away while we tried to send the signal.
pub fn kill_child(task_id: usize, child: &mut Child, kill_children: bool) -> bool {
let pid: i32 = child.id().try_into().unwrap();
// Check whether this process actually spawned a shell.
let is_shell = if let Ok(is_shell) = did_process_spawn_shell(pid) {
is_shell
} else {
return false;
};
// We have to kill the root process first, to prevent it from spawning new processes.
// However, this prevents us from getting its child processes afterwards.
// That's why we have to get the list of child processes already now.
let mut child_processes = None;
if kill_children || is_shell {
child_processes = Some(get_child_processes(pid));
}
// Kill the parent first
let kill_result = child.kill();
if kill_result.is_err() {
info!("Task {task_id} has already finished by itself.");
return false;
}
// Do an early return, if we don't need to kill any children.
let child_processes = if let Some(child_processes) = child_processes {
child_processes
} else {
return true;
};
// Now kill all remaining children. The parent has been already been killed at this point.
// If a shell is spawned, we have to manually send the kill signal to all children.
// Otherwise only send a signal to all children if the `kill_children` flag is set.
if is_shell {
for child_process in child_processes {
// Send the signal to each child process, show warning if this fails.
let process_pid = child_process.pid();
if let Err(error) = send_signal_to_process(process_pid, Signal::SIGKILL, kill_children)
{
warn!("Failed to send kill to pid {process_pid} with error {error:?}");
}
}
} else if kill_children {
send_signal_to_processes(child_processes, Signal::SIGKILL);
}
true
}
/// Check whether a process's commandline string is actually a shell or not
fn did_process_spawn_shell(pid: i32) -> Result<bool> {
// Get the /proc representation of the child, so we can do some checks
let process = if let Ok(process) = Process::new(pid) {
process
} else {
info!("Process to kill has probably just gone away. Process {pid}");
bail!("Process has just gone away");
};
// Get the root command and check whether it's actually a shell with `sh -c`.
let mut cmdline = if let Ok(cmdline) = process.cmdline() {
cmdline
} else {
info!("Process to kill has probably just gone away. Process {pid}");
bail!("Process has just gone away");
};
if cmdline.len() < 3 {
return Ok(false);
}
if cmdline.remove(0) != "sh" {
return Ok(false);
}
if cmdline.remove(0) != "-c" {
return Ok(false);
}
Ok(true)
}
/// Send a signal to a unix process.
/// In case the user wants to send the signal to all children as well:
/// 1. Get the children before sending the signal to the parent (as it might go away)
/// 2. Send the signal to the parent first, as it might spawn new children otherwise.
/// 3. Send the signal to all children.
fn send_signal_to_process(
pid: i32,
signal: Signal,
send_to_children: bool,
) -> Result<bool, nix::Error> {
debug!("Sending signal {signal} to {pid}");
if send_to_children {
let children = get_child_processes(pid);
signal::kill(Pid::from_raw(pid), signal)?;
send_signal_to_processes(children, signal);
} else {
signal::kill(Pid::from_raw(pid), signal)?;
}
Ok(true)
}
/// Send a signal to a list of processes.
/// This is a convenience wrapper around `send_signal_to_process`.
fn send_signal_to_processes(processes: Vec<Process>, signal: Signal) {
for process in processes {
// Process is no longer alive, skip this one.
if !process.is_alive() {
continue;
}
let pid = Pid::from_raw(process.pid);
if let Err(error) = signal::kill(pid, signal) {
warn!("Failed to send signal {signal:?} to Pid {pid}: {error:?}");
}
}
}
/// Get all children of a specific process
fn get_child_processes(ppid: i32) -> Vec<Process> {
let all_processes = match all_processes() {
Err(error) => {
warn!("Failed to get full process list: {error}");
return Vec::new();
}
Ok(processes) => processes,
};
// Get all processes whose `stat` can be access without any errors.
// We then search for processes with the correct parent process id.
all_processes
.into_iter()
.filter_map(|process| process.ok())
.filter(|process| {
if let Ok(stat) = process.stat() {
stat.ppid == ppid
} else {
false
}
})
.collect()
}
use procfs::process;
/// Check, whether a specific process is exists or not
pub fn process_exists(pid: u32) -> bool {
match Process::new(pid as i32) {
Ok(process) => process.is_alive(),
match pid.try_into() {
Err(_) => false,
Ok(pid) => match process::Process::new(pid) {
Ok(process) => process.is_alive(),
Err(_) => false,
},
}
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use std::time::Duration;
pub mod tests {
use log::warn;
use anyhow::Result;
use pretty_assertions::assert_eq;
use super::process;
use super::*;
/// Get all processes in a process group
pub fn get_process_group_pids(pgrp: i32) -> Vec<i32> {
let all_processes = match process::all_processes() {
Err(error) => {
warn!("Failed to get full process list: {error}");
return Vec::new();
}
Ok(processes) => processes,
};
/// Assert that certain process id no longer exists
fn process_is_gone(pid: i32) -> bool {
!process_exists(pid as u32)
}
#[test]
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
assert!(ecode.success());
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes.
fn test_shell_command_is_killed() -> Result<()> {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(did_process_spawn_shell(pid).unwrap());
// Get all child processes, so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 2);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
Ok(())
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes when using unix
/// signals directly.
fn test_shell_command_is_killed_with_signal() -> Result<()> {
let child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(did_process_spawn_shell(pid).unwrap());
// Get all child processes, so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 2);
// Kill the process and make sure it'll be killed.
send_signal_to_child(&child, Signal::SIGKILL, false).unwrap();
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
Ok(())
}
#[test]
/// Ensure that a `sh -c` process with a child process that has children of its own
/// will properly kill all processes and their children's children without detached processes.
fn test_shell_command_children_are_killed() -> Result<()> {
let mut child = compile_shell_command("bash -c 'sleep 60 && sleep 60' && sleep 60")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(did_process_spawn_shell(pid).unwrap());
// Get all child processes and all childrens children,
// so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 1);
let mut childrens_children = Vec::new();
for child_process in &child_processes {
childrens_children.extend(get_child_processes(child_process.stat()?.pid));
}
assert_eq!(childrens_children.len(), 1);
// Kill the process and make sure its childen will be killed.
assert!(kill_child(0, &mut child, true));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
// Assert that all children's child processes have been killed.
for child_process in childrens_children {
assert!(!child_process.is_alive());
}
Ok(())
}
#[test]
/// Ensure a normal command without `sh -c` will be killed.
fn test_normal_command_is_killed() -> Result<()> {
let mut child = Command::new("sleep")
.arg("60")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process did not spawn a shell.
assert!(!did_process_spawn_shell(pid).unwrap());
// No child processes exist
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 0);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
assert!(process_is_gone(pid));
Ok(())
}
#[test]
/// Ensure a normal command and all its children will be
/// properly killed without any detached processes.
fn test_normal_command_children_are_killed() -> Result<()> {
let mut child = Command::new("bash")
.arg("-c")
.arg("sleep 60 & sleep 60 && sleep 60")
.spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Make sure the process indeed spawned a shell.
assert!(!did_process_spawn_shell(pid).unwrap());
// Get all child processes, so we can make sure they no longer exist afterwards.
let child_processes = get_child_processes(pid);
assert_eq!(child_processes.len(), 2);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, true));
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid));
// Assert that all child processes have been killed.
for child_process in child_processes {
assert!(!child_process.is_alive());
}
Ok(())
// Get all processes whose `stat` can be accessed without any errors.
// If the stat() result matches the process group, use the process PID.
all_processes
.into_iter()
.filter_map(|result| result.ok())
.filter_map(|process| match process.stat() {
Ok(stat) if stat.pgrp == pgrp => Some(process.pid),
_ => None,
})
.collect()
}
}

View file

@ -4,17 +4,32 @@
//! each supported platform.
//! Depending on the target, the respective platform is read and loaded into this scope.
// Unix specific process handling
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
mod linux;
#[cfg(any(target_os = "linux", target_os = "freebsd"))]
pub use self::linux::*;
use crate::network::message::Signal as InternalSignal;
// Apple specific process handling
#[cfg(any(target_vendor = "apple"))]
// Unix specific process handling
// Shared between Linux and Apple
#[cfg(unix)]
mod unix;
#[cfg(unix)]
pub use self::unix::*;
#[cfg(unix)]
use command_group::Signal;
// Linux specific process support
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "linux")]
pub use self::linux::process_exists;
#[cfg(all(test, target_os = "linux"))]
use self::linux::tests;
// Apple specific process support
#[cfg(target_vendor = "apple")]
mod apple;
#[cfg(target_vendor = "apple")]
pub use self::apple::*;
pub use self::apple::process_exists;
#[cfg(all(test, target_vendor = "apple"))]
use self::apple::tests;
// Windows specific process handling
#[cfg(any(target_os = "windows"))]
@ -31,3 +46,24 @@ pub enum ProcessAction {
Pause,
Resume,
}
impl From<&ProcessAction> for Signal {
fn from(action: &ProcessAction) -> Self {
match action {
ProcessAction::Pause => Signal::SIGSTOP,
ProcessAction::Resume => Signal::SIGCONT,
}
}
}
impl From<InternalSignal> for Signal {
fn from(signal: InternalSignal) -> Self {
match signal {
InternalSignal::SigKill => Signal::SIGKILL,
InternalSignal::SigInt => Signal::SIGINT,
InternalSignal::SigTerm => Signal::SIGTERM,
InternalSignal::SigCont => Signal::SIGCONT,
InternalSignal::SigStop => Signal::SIGSTOP,
}
}
}

View file

@ -0,0 +1,229 @@
use std::process::Command;
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::Result;
use command_group::{GroupChild, Signal, UnixChildExt};
use log::info;
pub fn compile_shell_command(command_string: &str) -> Command {
let mut command = Command::new("sh");
command.arg("-c").arg(command_string);
command
}
/// Send a signal to one of Pueue's child process group handle.
pub fn send_signal_to_child<T>(child: &mut GroupChild, signal: T) -> Result<()>
where
T: Into<Signal>,
{
child.signal(signal.into())?;
Ok(())
}
/// This is a helper function to safely kill a child process group.
/// Its purpose is to properly kill all processes and prevent any dangling processes.
pub fn kill_child(task_id: usize, child: &mut GroupChild) -> std::io::Result<()> {
match child.kill() {
Ok(_) => Ok(()),
Err(ref e) if e.kind() == std::io::ErrorKind::InvalidData => {
// Process already exited
info!("Task {task_id} has already finished by itself.");
Ok(())
}
Err(err) => Err(err),
}
}
#[cfg(test)]
mod tests {
use std::thread::sleep;
use std::time::Duration;
use anyhow::Result;
use command_group::CommandGroup;
use pretty_assertions::assert_eq;
use super::*;
use crate::process_helper::process_exists;
use crate::process_helper::tests::get_process_group_pids;
/// Assert that certain process id no longer exists
fn process_is_gone(pid: u32) -> bool {
!process_exists(pid)
}
#[test]
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.group_spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
assert!(ecode.success());
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes.
fn test_shell_command_is_killed() -> Result<()> {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure a `sh -c` command will be properly killed without detached processes when using unix
/// signals directly.
fn test_shell_command_is_killed_with_signal() -> Result<()> {
let mut child = compile_shell_command("sleep 60 & sleep 60 && echo 'this is a test'")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
send_signal_to_child(&mut child, Signal::SIGKILL).unwrap();
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure that a `sh -c` process with a child process that has children of its own
/// will properly kill all processes and their children's children without detached processes.
fn test_shell_command_children_are_killed() -> Result<()> {
let mut child = compile_shell_command("bash -c 'sleep 60 && sleep 60' && sleep 60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure its childen will be killed.
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
#[test]
/// Ensure a normal command without `sh -c` will be killed.
fn test_normal_command_is_killed() -> Result<()> {
let mut child = Command::new("sleep")
.arg("60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// No further processes exist in the group
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 1);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
assert!(process_is_gone(pid as u32));
Ok(())
}
#[test]
/// Ensure a normal command and all its children will be
/// properly killed without any detached processes.
fn test_normal_command_children_are_killed() -> Result<()> {
let mut child = Command::new("bash")
.arg("-c")
.arg("sleep 60 & sleep 60 && sleep 60")
.group_spawn()
.expect("Failed to spawn echo");
let pid: i32 = child.id().try_into().unwrap();
// Sleep a little to give everything a chance to spawn.
sleep(Duration::from_millis(500));
// Get all child processes, so we can make sure they no longer exist afterwards.
// The process group id is the same as the parent process id.
let group_pids = get_process_group_pids(pid);
assert_eq!(group_pids.len(), 3);
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
// collect the exit status; otherwise the child process hangs around as a zombie.
child.try_wait().unwrap_or_default();
// Assert that the direct child (sh -c) has been killed.
assert!(process_is_gone(pid as u32));
// Assert that all child processes have been killed.
assert_eq!(get_process_group_pids(pid).len(), 0);
Ok(())
}
}

View file

@ -1,25 +1,30 @@
use std::process::{Child, Command};
use std::process::Command;
// We allow anyhow in here, as this is a module that'll be strictly used internally.
// As soon as it's obvious that this is code is intended to be exposed to library users, we have to
// go ahead and replace any `anyhow` usage by proper error handling via our own Error type.
use anyhow::{bail, Result};
use command_group::GroupChild;
use log::{error, info, warn};
use winapi::shared::minwindef::FALSE;
use winapi::shared::ntdef::NULL;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
use winapi::um::processthreadsapi::{
OpenProcess, OpenThread, ResumeThread, SuspendThread, TerminateProcess,
};
use winapi::um::processthreadsapi::{OpenThread, ResumeThread, SuspendThread};
use winapi::um::tlhelp32::{
CreateToolhelp32Snapshot, Process32First, Process32Next, Thread32First, Thread32Next,
PROCESSENTRY32, TH32CS_SNAPPROCESS, TH32CS_SNAPTHREAD, THREADENTRY32,
};
use winapi::um::winnt::{PROCESS_TERMINATE, THREAD_SUSPEND_RESUME};
use winapi::um::winnt::THREAD_SUSPEND_RESUME;
use super::ProcessAction;
use crate::network::message::Signal as InternalSignal;
/// Shim signal enum for windows.
pub enum Signal {
SIGINT,
SIGKILL,
SIGTERM,
SIGCONT,
SIGSTOP,
}
pub fn compile_shell_command(command_string: &str) -> Command {
// Chain two `powershell` commands, one that sets the output encoding to utf8 and then the user provided one.
@ -32,60 +37,57 @@ pub fn compile_shell_command(command_string: &str) -> Command {
command
}
pub fn send_internal_signal_to_child(
_child: &Child,
_signal: InternalSignal,
_send_to_children: bool,
) -> Result<bool> {
bail!("Trying to send unix signal on a windows machine. This isn't supported.");
}
/// Send a signal to a windows process.
pub fn run_action_on_child(child: &Child, action: &ProcessAction, _children: bool) -> Result<bool> {
pub fn send_signal_to_child<T>(child: &mut GroupChild, signal: T) -> Result<()>
where
T: Into<Signal>,
{
let pids = get_cur_task_processes(child.id());
if pids.is_empty() {
bail!("Process has just gone away");
}
match action {
ProcessAction::Pause => {
let signal: Signal = signal.into();
match signal {
Signal::SIGSTOP => {
for pid in pids {
for thread in get_threads(pid) {
suspend_thread(thread);
}
}
}
ProcessAction::Resume => {
Signal::SIGCONT => {
for pid in pids {
for thread in get_threads(pid) {
resume_thread(thread);
}
}
}
_ => {
bail!("Trying to send unix signal on a windows machine. This isn't supported.");
}
}
Ok(true)
Ok(())
}
/// Kill a child process
pub fn kill_child(task_id: usize, child: &mut Child, _kill_children: bool) -> bool {
pub fn kill_child(task_id: usize, child: &mut GroupChild) -> std::io::Result<()> {
match child.kill() {
Err(_) => {
info!("Task {task_id} has already finished by itself");
false
}
Ok(_) => {
let pids = get_cur_task_processes(child.id());
for pid in pids {
terminate_process(pid);
}
true
Ok(_) => Ok(()),
Err(ref e) if e.kind() == std::io::ErrorKind::InvalidData => {
// Process already exited
info!("Task {task_id} has already finished by itself.");
Ok(())
}
Err(err) => Err(err),
}
}
/// Get current task pid, all child pid and all children's children
/// TODO: see if this can be simplified using QueryInformationJobObject
/// on the job object created by command_group.
fn get_cur_task_processes(task_pid: u32) -> Vec<u32> {
let mut all_pids = Vec::new();
@ -221,22 +223,6 @@ fn resume_thread(tid: u32) {
}
}
/// Terminate a process
/// [TerminateProcess](https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-terminateprocess)
fn terminate_process(pid: u32) {
unsafe {
// Get a handle for the target process
let process_handle = OpenProcess(PROCESS_TERMINATE, FALSE, pid);
// If TerminateProcess fails, the return value is zero.
if 0 == TerminateProcess(process_handle, 1) {
let err_code = GetLastError();
warn!("Failed to terminate process {pid} with error code {err_code}");
}
CloseHandle(process_handle);
}
}
/// Assert that certain process id no longer exists
pub fn process_exists(pid: u32) -> bool {
unsafe {
@ -269,6 +255,8 @@ mod test {
use std::thread::sleep;
use std::time::Duration;
use command_group::CommandGroup;
use super::*;
/// Assert that certain process id no longer exists
@ -306,7 +294,7 @@ mod test {
#[test]
fn test_spawn_command() {
let mut child = compile_shell_command("sleep 0.1")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let ecode = child.wait().expect("failed to wait on echo");
@ -322,7 +310,7 @@ mod test {
/// See https://github.com/Nukesor/pueue/issues/315
fn test_shell_command_is_killed() -> Result<()> {
let mut child = compile_shell_command("sleep 60; sleep 60; echo 'this is a test'")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let pid = child.id();
@ -330,7 +318,7 @@ mod test {
let process_ids = assert_process_ids(pid, 1, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
@ -352,14 +340,14 @@ mod test {
/// will properly kill all processes and their children's children without detached processes.
fn test_shell_command_children_are_killed() -> Result<()> {
let mut child = compile_shell_command("powershell -c 'sleep 60; sleep 60'; sleep 60")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let pid = child.id();
// Get all processes, so we can make sure they no longer exist afterwards.
let process_ids = assert_process_ids(pid, 2, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
assert!(kill_child(0, &mut child).is_ok());
// Assert that the direct child (powershell -c) has been killed.
sleep(Duration::from_millis(500));
@ -380,7 +368,7 @@ mod test {
let mut child = Command::new("ping")
.arg("localhost")
.arg("-t")
.spawn()
.group_spawn()
.expect("Failed to spawn ping");
let pid = child.id();
@ -388,7 +376,7 @@ mod test {
let _ = assert_process_ids(pid, 1, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, false));
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));
@ -406,7 +394,7 @@ mod test {
let mut child = Command::new("powershell")
.arg("-c")
.arg("sleep 60; sleep 60; sleep 60")
.spawn()
.group_spawn()
.expect("Failed to spawn echo");
let pid = child.id();
@ -414,7 +402,7 @@ mod test {
let process_ids = assert_process_ids(pid, 1, 5000)?;
// Kill the process and make sure it'll be killed.
assert!(kill_child(0, &mut child, true));
assert!(kill_child(0, &mut child).is_ok());
// Sleep a little to give all processes time to shutdown.
sleep(Duration::from_millis(500));

View file

@ -2,13 +2,13 @@ Group "default" (1 parallel): running
─────────────────────────────────────────────────────────────────────────────────────────────────────────
Id Status Enqueue At Command Path Start End
═════════════════════════════════════════════════════════════════════════════════════════════════════════
0 Failed (255) sleep 60 /tmp {{ task_0_start }} {{ task_0_end }}
0 Failed (255) sleep 60 $TMP {{ task_0_start }} {{ task_0_end }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
1 Success sleep 60 /tmp {{ task_1_start }} {{ task_1_end }}
1 Success sleep 60 $TMP {{ task_1_start }} {{ task_1_end }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
2 Stashed sleep 60 /tmp
2 Stashed sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────────────────────────────────────
3 Stashed {{ task_3_enqueue_at }} sleep 60 /tmp
3 Stashed {{ task_3_enqueue_at }} sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────────────────────────────────────
4 Running sleep 60 {{ cwd }} {{ task_4_start }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────

View file

@ -2,7 +2,7 @@ Group "default" (1 parallel): running
─────────────────────────────────────────────────────────────────────────────────
Id Status Command Path Start End
═════════════════════════════════════════════════════════════════════════════════
0 Failed (255) sleep 60 /tmp {{ task_0_start }} {{ task_0_end }}
0 Failed (255) sleep 60 $TMP {{ task_0_start }} {{ task_0_end }}
─────────────────────────────────────────────────────────────────────────────────
1 Success sleep 60 /tmp {{ task_1_start }} {{ task_1_end }}
1 Success sleep 60 $TMP {{ task_1_start }} {{ task_1_end }}
─────────────────────────────────────────────────────────────────────────────────

View file

@ -2,7 +2,7 @@ Group "default" (1 parallel): running
────────────────────────────────────────────────────────────────────────────────────────────
Id Status Command Path Start End
════════════════════════════════════════════════════════════════════════════════════════════
0 Failed (255) sleep 60 /tmp {{ task_0_start }} {{ task_0_end }}
0 Failed (255) sleep 60 $TMP {{ task_0_start }} {{ task_0_end }}
────────────────────────────────────────────────────────────────────────────────────────────
4 Running sleep 60 {{ cwd }} {{ task_4_start }}
────────────────────────────────────────────────────────────────────────────────────────────

View file

@ -2,13 +2,13 @@ Group "default" (1 parallel): running
─────────────────────────────────────────────────────────────────────────────────────────────────────────
Id Status Enqueue At Command Path Start End
═════════════════════════════════════════════════════════════════════════════════════════════════════════
0 Failed (255) sleep 60 /tmp {{ task_0_start }} {{ task_0_end }}
0 Failed (255) sleep 60 $TMP {{ task_0_start }} {{ task_0_end }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
1 Success sleep 60 /tmp {{ task_1_start }} {{ task_1_end }}
1 Success sleep 60 $TMP {{ task_1_start }} {{ task_1_end }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
2 Stashed sleep 60 /tmp
2 Stashed sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────────────────────────────────────
3 Stashed {{ task_3_enqueue_at }} sleep 60 /tmp
3 Stashed {{ task_3_enqueue_at }} sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────────────────────────────────────
4 Running sleep 60 {{ cwd }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────

View file

@ -2,11 +2,11 @@ Group "default" (1 parallel): running
──────────────────────────────────────────────────────────────────────────────────────────────
Id Status Enqueue At Command Path Start End
══════════════════════════════════════════════════════════════════════════════════════════════
0 Failed (255) sleep 60 /tmp {{ task_0_start }} {{ task_0_end }}
0 Failed (255) sleep 60 $TMP {{ task_0_start }} {{ task_0_end }}
──────────────────────────────────────────────────────────────────────────────────────────────
1 Success sleep 60 /tmp {{ task_1_start }} {{ task_1_end }}
1 Success sleep 60 $TMP {{ task_1_start }} {{ task_1_end }}
──────────────────────────────────────────────────────────────────────────────────────────────
2 Stashed sleep 60 /tmp
2 Stashed sleep 60 $TMP
──────────────────────────────────────────────────────────────────────────────────────────────
3 Stashed {{ task_3_enqueue_at }} sleep 60 /tmp
3 Stashed {{ task_3_enqueue_at }} sleep 60 $TMP
──────────────────────────────────────────────────────────────────────────────────────────────

View file

@ -2,7 +2,7 @@ Group "default" (1 parallel): running
─────────────────────────────────────────────────────────────────────────
Id Status Enqueue At Command Path Start End
═════════════════════════════════════════════════════════════════════════
3 Stashed {{ task_3_enqueue_at }} sleep 60 /tmp
3 Stashed {{ task_3_enqueue_at }} sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────
4 Running sleep 60 {{ cwd }} {{ task_4_start }}
─────────────────────────────────────────────────────────────────────────

View file

@ -2,9 +2,9 @@ Group "default" (1 parallel): running
─────────────────────────────────────────────────────────────────────────────────────────────────────────
Id Status Enqueue At Command Path Start End
═════════════════════════════════════════════════════════════════════════════════════════════════════════
2 Stashed sleep 60 /tmp
2 Stashed sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────────────────────────────────────
3 Stashed {{ task_3_enqueue_at }} sleep 60 /tmp
3 Stashed {{ task_3_enqueue_at }} sleep 60 $TMP
─────────────────────────────────────────────────────────────────────────────────────────────────────────
5 Queued sleep 60 {{ cwd }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
@ -12,7 +12,7 @@ Group "default" (1 parallel): running
─────────────────────────────────────────────────────────────────────────────────────────────────────────
4 Running sleep 60 {{ cwd }} {{ task_4_start }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
0 Failed (255) sleep 60 /tmp {{ task_0_start }} {{ task_0_end }}
0 Failed (255) sleep 60 $TMP {{ task_0_start }} {{ task_0_end }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────
1 Success sleep 60 /tmp {{ task_1_start }} {{ task_1_end }}
1 Success sleep 60 $TMP {{ task_1_start }} {{ task_1_end }}
─────────────────────────────────────────────────────────────────────────────────────────────────────────

View file

@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::env::temp_dir;
use anyhow::Result;
use chrono::{Duration, Local};
@ -16,7 +16,7 @@ use crate::helper::*;
fn build_task() -> Task {
Task::new(
"sleep 60".to_owned(),
PathBuf::from("/tmp"),
temp_dir(),
HashMap::new(),
PUEUE_DEFAULT_GROUP.to_owned(),
TaskStatus::Queued,

View file

@ -16,7 +16,7 @@ async fn multiple_tasks() -> Result<()> {
// Run a command that'll run for a short time after a delay.
// The `pueue wait` command will be spawne directly afterwards, resulting in the spawned
// process to wait for this command to finish.
run_client_command(shared, &["add", "--delay", "1 second", "sleep 1"])?;
run_client_command(shared, &["add", "--delay", "2 seconds", "sleep 1"])?;
// Spawn the `pueue wait` command in a separate thread.
// We expect it to finish later on its own.

View file

@ -1,10 +1,10 @@
#[cfg(target_os = "linux")]
#[cfg(unix)]
mod helper;
#[cfg(target_os = "linux")]
#[cfg(unix)]
mod client;
// We allow some dead code in here, as some fixtures are only needed in the daemon tests.
#[cfg(target_os = "linux")]
#[cfg(unix)]
#[allow(dead_code)]
mod fixtures;

View file

@ -1,5 +1,3 @@
use std::convert::TryInto;
use anyhow::Result;
use pretty_assertions::assert_eq;
@ -14,12 +12,12 @@ use crate::helper::*;
#[tokio::test]
async fn test_start_running() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
let child = standalone_daemon(&settings.shared).await?;
let mut child = standalone_daemon(&settings.shared).await?;
let shared = &settings.shared;
// Kill the daemon and wait for it to shut down.
assert_success(shutdown_daemon(shared).await?);
wait_for_shutdown(child.id().try_into()?).await?;
wait_for_shutdown(&mut child).await?;
// Boot it up again
let mut child = standalone_daemon(&settings.shared).await?;
@ -40,7 +38,7 @@ async fn test_start_running() -> Result<()> {
#[tokio::test]
async fn test_start_paused() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
let child = standalone_daemon(&settings.shared).await?;
let mut child = standalone_daemon(&settings.shared).await?;
let shared = &settings.shared;
// This pauses the daemon
@ -48,7 +46,7 @@ async fn test_start_paused() -> Result<()> {
// Kill the daemon and wait for it to shut down.
assert_success(shutdown_daemon(shared).await?);
wait_for_shutdown(child.id().try_into()?).await?;
wait_for_shutdown(&mut child).await?;
// Boot it up again
let mut child = standalone_daemon(&settings.shared).await?;

View file

@ -1,5 +1,3 @@
use std::convert::TryInto;
use anyhow::{Context, Result};
use crate::fixtures::*;
@ -12,10 +10,11 @@ async fn test_ctrlc() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
let mut child = standalone_daemon(&settings.shared).await?;
use nix::sys::signal::{kill, Signal};
use command_group::{Signal, UnixChildExt};
// Send SIGTERM signal to process via nix
let nix_pid = nix::unistd::Pid::from_raw(child.id() as i32);
kill(nix_pid, Signal::SIGTERM).context("Failed to send SIGTERM to pid")?;
child
.signal(Signal::SIGTERM)
.context("Failed to send SIGTERM to daemon")?;
// Sleep for 500ms and give the daemon time to shut down
sleep_ms(500).await;
@ -37,7 +36,7 @@ async fn test_graceful_shutdown() -> Result<()> {
// Kill the daemon gracefully and wait for it to shut down.
assert_success(shutdown_daemon(&settings.shared).await?);
wait_for_shutdown(child.id().try_into()?).await?;
wait_for_shutdown(&mut child).await?;
// Sleep for 500ms and give the daemon time to shut down
sleep_ms(500).await;

View file

@ -1,8 +1,8 @@
#[cfg(target_os = "linux")]
#[cfg(unix)]
mod helper;
#[cfg(target_os = "linux")]
#[cfg(unix)]
mod daemon;
#[cfg(target_os = "linux")]
#[cfg(unix)]
mod fixtures;

View file

@ -1,12 +1,13 @@
use std::collections::HashMap;
use std::fs::File;
use std::env::temp_dir;
use std::fs::{canonicalize, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use anyhow::{bail, Context, Result};
use assert_cmd::prelude::*;
use tempfile::TempDir;
use tempfile::{Builder, TempDir};
use tokio::io::{self, AsyncWriteExt};
use pueue_daemon_lib::run;
@ -126,7 +127,8 @@ pub fn daemon_base_setup() -> Result<(Settings, TempDir)> {
//let _ = SimpleLogger::init(LevelFilter::Info, Config::default());
// Create a temporary directory used for testing.
let tempdir = TempDir::new().unwrap();
// The path is canonicalized to ensure test consistency across platforms.
let tempdir = Builder::new().tempdir_in(canonicalize(temp_dir())?)?;
let tempdir_path = tempdir.path();
std::fs::create_dir(tempdir_path.join("certs")).unwrap();

View file

@ -1,4 +1,6 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::env::temp_dir;
use std::fs::read_to_string;
use std::path::PathBuf;
use std::process::{Command, Output, Stdio};
@ -133,12 +135,6 @@ pub fn assert_stdout_matches(
.join(name);
let actual = String::from_utf8(stdout).context("Got invalid utf8 as stdout!")?;
// Trim all trailing whitespaces from the actual stdout output.
let actual = actual
.lines()
.map(|line| line.trim_end())
.collect::<Vec<&str>>()
.join("\n");
let template = read_to_string(&path);
let template = match template {
@ -159,12 +155,9 @@ pub fn assert_stdout_matches(
"Failed to render template for file: {name} with context {context:?}"
))?;
// Trim all trailing whitespaces from the expected output.
let expected = expected
.lines()
.map(|line| line.trim_end())
.collect::<Vec<&str>>()
.join("\n");
let expected = canonicalize_snapshot(expected, None);
let path_column_width = find_path_column(&expected);
let actual = canonicalize_snapshot(actual, path_column_width);
if expected != actual {
println!("Expected output:\n-----\n{expected}\n-----");
@ -178,3 +171,105 @@ pub fn assert_stdout_matches(
Ok(())
}
fn is_table(line: &str) -> bool {
line.chars().all(|c| c == '\u{2500}')
}
/// Find the position and length of the Path column in the expected output
/// Path must be a space-separate word on first line following a table starting line.
fn find_path_column(output: &str) -> Option<(usize, usize)> {
let header = output.lines().skip_while(|&line| !is_table(line)).nth(1)?;
// Scan through the columns until we find the Path column, then produce its offset
// and length (including whitespace padding).
// colwidth doubles as a flag; if it is 0 we have not found the Path label (yet).
let mut colwidth = 0;
let mut offset = 0;
for chunk in header.split_inclusive(char::is_whitespace) {
match (colwidth, chunk) {
(0, "Path") | (0, "Path ") | (1.., " ") => {
// We found the Path column, accumulate column width
colwidth += chunk.len();
}
(0, _) => {
// We haven't yet found the Path label, accumulate column offset.
offset += chunk.len();
}
_ => {
// Path column has ended, we have reached the next label
break;
}
}
}
if colwidth == 0 {
None
} else {
Some((offset, colwidth))
}
}
/// Canonicalize test and template outputs to handle expected differences.
/// If path_column is given, use the width to trim the Path column in this output.
fn canonicalize_snapshot(output: String, path_column: Option<(usize, usize)>) -> String {
// Replace the temporary path with a symbolic reference, both the base
// temporary directory and its canonical path (which on some platforms can differ)
// These replacements should only apply to the Path column.
const TMPVAR: &str = "$TMP";
let tmp = temp_dir();
let tmp_canonical = std::fs::canonicalize(&tmp).unwrap();
let replacements = vec![
(tmp_canonical.to_string_lossy(), &TMPVAR),
(tmp.to_string_lossy(), &TMPVAR),
];
// Set optional path column information to configure a line scan operationn below.
let trim_path_col = match path_column {
// Expected output has no Path column, nothing to trim here.
None => None,
// Determine the output Path column to see if trimming is needed.
Some((_, target_width)) => match find_path_column(&output) {
Some((col, actual_width)) if actual_width > target_width => {
Some((col + target_width, col + actual_width))
}
// Output has no Path column or the column is not wider than the expected width
_ => None,
},
};
output
.lines()
.map(|line| {
// - Trim all trailing whitespace and apply Path column replacements
let mut tmp = Cow::from(line.trim_end());
let before = tmp.len();
for (from, to) in replacements.iter() {
tmp = tmp.replace(&**from, to).into();
}
// pass on the trimmed string as well as how much we removed when replacing;
// this is used to adjust column trimming in the scan operation, below.
(tmp.to_string(), before - tmp.len())
})
.scan(
false,
|table_started, (line, trimmed)| match trim_path_col {
// No trim configuration set, no further trimming needed
None => Some(line),
// Use trim configuration to trim Path columns
Some((from, until)) => {
if !(*table_started || is_table(&line)) {
Some(line)
} else {
*table_started = true;
// Once we are inside a table, trim the path column by cutting out the characters
// at [from, until) (taking into account how much the line has already shrunk due
// to replacements)
let until = until - trimmed;
let chars = line.chars();
Some(chars.clone().take(from).chain(chars.skip(until)).collect())
}
}
},
)
.collect::<Vec<String>>()
.join("\n")
}

View file

@ -1,9 +1,9 @@
use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::process::Child;
use anyhow::{anyhow, bail, Context, Result};
use procfs::process::Process;
use pueue_lib::network::message::*;
use pueue_lib::settings::*;
@ -56,26 +56,23 @@ pub async fn get_pid(pid_path: &Path) -> Result<i32> {
}
/// Waits for a daemon to shut down.
/// This is done by waiting for the pid to disappear.
pub async fn wait_for_shutdown(pid: i32) -> Result<()> {
// Try to read the process. If this fails, the daemon already exited.
let process = match Process::new(pid) {
Ok(process) => process,
Err(_) => return Ok(()),
};
pub async fn wait_for_shutdown(child: &mut Child) -> Result<()> {
// Give the daemon about 1 sec to shutdown.
let tries = 40;
let mut current_try = 0;
while current_try < tries {
// Process is still alive, wait a little longer
if process.is_alive() {
// Try to read the process exit code. If this succeeds or
// an error is returned, the process is gone.
if let Ok(None) = child.try_wait() {
// Process is still alive, wait a little longer
sleep_ms(50).await;
current_try += 1;
continue;
}
// Process is gone; either there was a status code
// or the child is not a child of this process (highly
// unlikely).
return Ok(());
}