Callbacks when finishing tasks

This also refactors the data structure of the TaskHandler.
State is no longer a member, but rather an argument that's continously
passed through in the `run` function.
This commit is contained in:
Arne Beer 2020-05-09 23:33:08 +02:00
parent 82652dfc60
commit c45e00c4e1
7 changed files with 331 additions and 72 deletions

View file

@ -1,6 +1,10 @@
# v0.5.0
**Features:**
- Add callback notifications on finished tasks.
# v0.4.0
**Features:**
- Add `--after [ids]` option. Task with this option will only start once all specified dependencies finished successfully.
- Add `--after [ids]` option. Task with this option will only be started, if all specified dependencies successfully finish.
Tasks with failed dependencies will fail as well.
- New state `FailedToStart`. Used if the process cannot be started.
- New state `DependencyFailed`. Used if any dependency of a task fails.

145
Cargo.lock generated
View file

@ -170,6 +170,25 @@ dependencies = [
"constant_time_eq 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "block-buffer"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"block-padding 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "block-padding"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "broadcaster"
version = "1.0.0"
@ -183,6 +202,11 @@ dependencies = [
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "byte-tools"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bytecount"
version = "0.4.0"
@ -361,6 +385,14 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "digest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "dirs"
version = "2.0.2"
@ -395,6 +427,11 @@ dependencies = [
"version_check 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "fnv"
version = "1.0.6"
@ -479,6 +516,14 @@ dependencies = [
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "generic-array"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"typenum 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "getrandom"
version = "0.1.14"
@ -494,6 +539,19 @@ name = "glob"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "handlebars"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"pest 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"pest_derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"quick-error 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.52 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "heck"
version = "0.3.1"
@ -592,6 +650,11 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
@ -725,6 +788,11 @@ name = "once_cell"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "parking_lot"
version = "0.10.2"
@ -747,6 +815,45 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"ucd-trie 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pest_derive"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"pest 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"pest_generator 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pest_generator"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"pest 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"pest_meta 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pest_meta"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pest 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"sha-1 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pin-project-lite"
version = "0.1.4"
@ -838,6 +945,7 @@ dependencies = [
"config 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossterm 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"handlebars 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"proptest 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1207,6 +1315,17 @@ dependencies = [
"yaml-rust 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sha-1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "signal-hook"
version = "0.1.13"
@ -1388,6 +1507,16 @@ dependencies = [
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "typenum"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "unicode-segmentation"
version = "1.6.0"
@ -1526,7 +1655,10 @@ dependencies = [
"checksum bit-vec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f59bbe95d4e52a6398ec21238d31577f2b28a9d86807f06ca59d191d8440d0bb"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum blake2b_simd 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a"
"checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
"checksum block-padding 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5"
"checksum broadcaster 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9c972e21e0d055a36cf73e4daae870941fe7a8abcd5ac3396aab9e4c126bd87"
"checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
"checksum bytecount 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b92204551573580e078dc80017f36a213eb77a0450e4ddd8cfa0f3f2d1f0178f"
"checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
@ -1546,10 +1678,12 @@ dependencies = [
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
"checksum crossterm 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b8a3223215bc00c666d6be730e88aef245ad4a4f837e87a16c347e8acf701643"
"checksum crossterm_winapi 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "057b7146d02fb50175fd7dbe5158f6097f33d02831f43b4ee8ae4ddf67b68f5c"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3"
"checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b"
"checksum dtoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3"
"checksum error-chain 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d371106cc88ffdfb1eabd7111e432da544f16f3e2d7bf1dfe8bf575f1df045cd"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
@ -1562,8 +1696,10 @@ dependencies = [
"checksum futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27"
"checksum futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6"
"checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
"checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
"checksum handlebars 3.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ba758d094d31274eb49d15da6f326b96bf3185239a6359bf684f3d5321148900"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum hermit-abi 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "61565ff7aaace3525556587bd2dc31d4a07071957be715e63ce7b1eccf51a8f4"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
@ -1577,6 +1713,7 @@ dependencies = [
"checksum linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83"
"checksum lock_api 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
"checksum maplit 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
"checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
"checksum memoffset 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b4fc2c02a7e374099d4ee95a193111f72d2110197fe200272371758f6c3643d8"
@ -1592,8 +1729,13 @@ dependencies = [
"checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096"
"checksum num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
"checksum once_cell 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
"checksum parking_lot 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
"checksum parking_lot_core 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
"checksum pest 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
"checksum pest_derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "833d1ae558dc601e9a60366421196a8d94bc0ac980476d0b67e1d0988d72b2d0"
"checksum pest_generator 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "99b8db626e31e5b81787b9783425769681b347011cc59471e33ea46d2ea0cf55"
"checksum pest_meta 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "54be6e404f5317079812fc8f9f5279de376d8856929e21c184ecf6bbd692a11d"
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
"checksum pin-utils 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
"checksum ppv-lite86 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
@ -1645,6 +1787,7 @@ dependencies = [
"checksum serde_json 1.0.52 (registry+https://github.com/rust-lang/crates.io-index)" = "a7894c8ed05b7a3a279aeb79025fdec1d3158080b75b98a08faf2806bb799edd"
"checksum serde_test 0.8.23 (registry+https://github.com/rust-lang/crates.io-index)" = "110b3dbdf8607ec493c22d5d947753282f3bae73c0f56d322af1e8c78e4c23d5"
"checksum serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)" = "691b17f19fc1ec9d94ec0b5864859290dff279dbd7b03f017afda54eb36c3c35"
"checksum sha-1 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df"
"checksum signal-hook 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "10b9f3a1686a29f53cfd91ee5e3db3c12313ec02d33765f02c1a9645a1811e2c"
"checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
"checksum simplelog 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "3cf9a002ccce717d066b3ccdb8a28829436249867229291e91b25d99bd723f0d"
@ -1666,6 +1809,8 @@ dependencies = [
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
"checksum time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)" = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
"checksum toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a"
"checksum typenum 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33"
"checksum ucd-trie 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
"checksum unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0"
"checksum unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479"
"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"

View file

@ -35,6 +35,7 @@ chrono-english = "^0.1.0"
rand = "^0.7"
strum = "^0.18"
strum_macros = "^0.18"
handlebars = "3"
bytes = "^0.5"
byteorder = "^1"

View file

@ -220,6 +220,21 @@ This would help me a lot!
## Utilities
### Callbacks
You can specify a callback that should be called every time a task finishes.
The callback can be parameterized with some variables.
For now there are:
- `{{ id }}`
- `{{ command }}`
- `{{ path }}`
- `{{ result }}` (Success, Killed, etc.)
Example callback:
```
notify-send "Task {{ id }}" "Command: {{ command }} finished with {{ result }}"
```
### Shell completion files
Shell completion files can be created on the fly with `pueue completions $shell $directory`.
There's also a `build_completions.sh` script, which creates all completion files in the `utiles/completions` directory.

View file

@ -56,10 +56,11 @@ async fn main() -> Result<()> {
let state = Arc::new(Mutex::new(state));
let (sender, receiver) = channel();
let mut task_handler = TaskHandler::new(settings.clone(), state.clone(), receiver);
let mut task_handler = TaskHandler::new(settings.clone(), receiver);
let state_clone = state.clone();
thread::spawn(move || {
task_handler.run();
task_handler.run(state_clone);
});
accept_incoming(settings, sender, state.clone(), opt).await?;

View file

@ -1,4 +1,4 @@
use ::std::collections::BTreeMap;
use ::std::collections::{BTreeMap, HashMap};
use ::std::io::Write;
use ::std::process::Stdio;
use ::std::process::{Child, Command};
@ -7,6 +7,7 @@ use ::std::time::Duration;
use ::anyhow::Result;
use ::chrono::prelude::*;
use ::handlebars::Handlebars;
use ::log::{debug, error, info, warn};
#[cfg(not(windows))]
use ::nix::{
@ -18,29 +19,25 @@ use ::pueue::log::*;
use ::pueue::message::*;
use ::pueue::settings::Settings;
use ::pueue::state::SharedState;
use ::pueue::task::{TaskResult, TaskStatus};
use ::pueue::task::{TaskResult, TaskStatus, Task};
pub struct TaskHandler {
state: SharedState,
settings: Settings,
receiver: Receiver<Message>,
pub children: BTreeMap<usize, Child>,
pub callbacks: Vec<Child>,
running: bool,
reset: bool,
}
impl TaskHandler {
pub fn new(settings: Settings, state: SharedState, receiver: Receiver<Message>) -> Self {
let running = {
let state = state.lock().unwrap();
state.running
};
pub fn new(settings: Settings, receiver: Receiver<Message>) -> Self {
TaskHandler {
state,
settings,
receiver,
children: BTreeMap::new(),
running,
callbacks: Vec::new(),
running: true,
reset: false,
}
}
@ -66,31 +63,46 @@ impl TaskHandler {
/// 2. Check whether any tasks just finished
/// 3. Check if there are any stashed processes ready for being enqueued
/// 4. Check whether we can spawn new tasks
pub fn run(&mut self) {
pub fn run(&mut self, state: SharedState) {
// Check what's the current status of the state
// and set the own running state accordingly
self.running = {
let state = state.lock().unwrap();
state.running
};
loop {
self.receive_commands();
self.process_finished();
self.check_stashed();
self.check_failed_dependencies();
// Sleep for a short amount of time. We don't want to hurt the CPU
let timeout = Duration::from_millis(100);
// Don't use recv_timeout for now, until this bug get's fixed
// https://github.com/rust-lang/rust/issues/39364
//match self.receiver.recv_timeout(timeout) {
std::thread::sleep(timeout);
self.receive_commands(&state);
self.handle_finished(&state);
self.check_callbacks();
self.check_stashed(&state);
self.check_failed_dependencies(&state);
if self.running && !self.reset {
let _res = self.check_new();
let _res = self.check_new(&state);
}
}
}
/// See if the task manager has a free slot and a queued task.
/// If that's the case, start a new process.
fn check_new(&mut self) -> Result<()> {
fn check_new(&mut self, state: &SharedState) -> Result<()> {
// Check while there are still slots left
// Break the loop if no next task is found
while self.children.len() < self.settings.daemon.default_parallel_tasks {
let next_id = {
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
state.get_next_task_id()
};
match next_id {
Some(id) => {
self.start_process(id);
self.start_process(state, id);
}
None => break,
}
@ -101,8 +113,8 @@ impl TaskHandler {
/// Ensure that no `Queued` tasks have any failed dependencies.
/// Otherwise set their status to `Done` and result to `DependencyFailed`.
pub fn check_failed_dependencies(&mut self) {
let mut state = self.state.lock().unwrap();
pub fn check_failed_dependencies(&mut self, state: &SharedState) {
let mut state = state.lock().unwrap();
let has_failed_deps: Vec<_> = state
.tasks
.iter()
@ -130,10 +142,10 @@ impl TaskHandler {
/// Actually spawn a new sub process
/// The output of subprocesses is piped into a seperate file for easier access
fn start_process(&mut self, task_id: usize) {
fn start_process(&mut self, state: &SharedState, task_id: usize) {
// Already get the mutex here to ensure that the state won't be manipulated
// while we are looking for a task to start.
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
let task = state.tasks.get_mut(&task_id);
let task = match task {
@ -214,9 +226,9 @@ impl TaskHandler {
}
/// As time passes, some delayed tasks may need to be enqueued.
/// Gather all stashed tasks and enqueue them if it is after the task's enqueue_at
fn check_stashed(&mut self) {
let mut state = self.state.lock().unwrap();
/// Check all stashed tasks and enqueue them if it is past the task's enqueue_at
fn check_stashed(&mut self, state: &SharedState) {
let mut state = state.lock().unwrap();
let mut changed = false;
for (_, task) in state.tasks.iter_mut() {
@ -242,12 +254,12 @@ impl TaskHandler {
/// Check whether there are any finished processes
/// In case there are, handle them and update the shared state
fn process_finished(&mut self) {
fn handle_finished(&mut self, state: &SharedState) {
let (finished, errored) = self.get_finished();
// The daemon got a reset request and all children already finished
if self.reset && self.children.is_empty() {
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
state.reset();
self.running = true;
self.reset = false;
@ -258,7 +270,7 @@ impl TaskHandler {
return;
}
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
// We need to know if there are any failed tasks,
// in case the user wants to stop the daemon if a tasks fails
let mut failed_task_exists = false;
@ -284,6 +296,8 @@ impl TaskHandler {
if self.reset {
clean_log_handles(*task_id, &self.settings);
}
self.spawn_callback(&task);
}
// Handle errored tasks
@ -294,6 +308,8 @@ impl TaskHandler {
task.status = TaskStatus::Done;
task.result = Some(TaskResult::Killed);
failed_task_exists = true;
self.spawn_callback(&task);
}
// Pause the daemon, if the settings say so and some process failed
@ -329,27 +345,23 @@ impl TaskHandler {
}
/// Some client instructions require immediate action by the task handler
/// These commands are
fn receive_commands(&mut self) {
let timeout = Duration::from_millis(100);
// Don't use recv_timeout for now, until this bug get's fixed
// https://github.com/rust-lang/rust/issues/39364
//match self.receiver.recv_timeout(timeout) {
std::thread::sleep(timeout);
/// This functions receives those messages from the mpsc channel and
/// handles them accordingly
fn receive_commands(&mut self, state: &SharedState) {
match self.receiver.try_recv() {
Ok(message) => self.handle_message(message),
Ok(message) => self.handle_message(message, state),
Err(_) => {}
};
}
fn handle_message(&mut self, message: Message) {
fn handle_message(&mut self, message: Message, state: &SharedState) {
match message {
Message::Pause(message) => self.pause(message),
Message::Start(message) => self.start(message),
Message::Kill(message) => self.kill(message),
Message::Pause(message) => self.pause(state, message),
Message::Start(message) => self.start(state, message),
Message::Kill(message) => self.kill(state, message),
Message::Send(message) => self.send(message),
Message::Parallel(amount) => self.allow_parallel_tasks(amount),
Message::Reset => self.reset(),
Message::Reset => self.reset(state),
_ => info!("Received unhandled message {:?}", message),
}
}
@ -374,16 +386,16 @@ impl TaskHandler {
/// Handle the start message:
/// 1. Either start the daemon and all tasks.
/// 2. Or force the start of specific tasks.
fn start(&mut self, task_ids: Vec<usize>) {
fn start(&mut self, state: &SharedState, task_ids: Vec<usize>) {
// Only start specific tasks
if !task_ids.is_empty() {
for id in &task_ids {
// Continue all children that are simply paused
if self.children.contains_key(id) {
self.continue_task(*id);
self.continue_task(state, *id);
} else {
// Start processes for all tasks that haven't been started yet
self.start_process(*id);
self.start_process(state, *id);
}
}
return;
@ -392,20 +404,20 @@ impl TaskHandler {
// Start the daemon and all paused tasks
let keys: Vec<usize> = self.children.keys().cloned().collect();
for id in keys {
self.continue_task(id);
self.continue_task(state, id);
}
info!("Resuming daemon (start)");
self.change_running(true);
self.change_running(state, true);
}
/// Send a start signal to a paused task to continue execution
fn continue_task(&mut self, id: usize) {
fn continue_task(&mut self, state: &SharedState, id: usize) {
if !self.children.contains_key(&id) {
return;
}
{
// Task is already done
let state = self.state.lock().unwrap();
let state = state.lock().unwrap();
if state.tasks.get(&id).unwrap().is_done() {
return;
}
@ -420,7 +432,7 @@ impl TaskHandler {
Err(err) => warn!("Failed starting task {}: {:?}", id, err),
Ok(success) => {
if success {
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
state.change_status(id, TaskStatus::Running);
}
}
@ -431,11 +443,11 @@ impl TaskHandler {
/// Handle the pause message:
/// 1. Either pause the daemon and all tasks.
/// 2. Or only pause specific tasks.
fn pause(&mut self, message: PauseMessage) {
fn pause(&mut self, state: &SharedState, message: PauseMessage) {
// Only pause specific tasks
if !message.task_ids.is_empty() {
for id in &message.task_ids {
self.pause_task(*id);
self.pause_task(state, *id);
}
return;
}
@ -444,16 +456,16 @@ impl TaskHandler {
let keys: Vec<usize> = self.children.keys().cloned().collect();
if !message.wait {
for id in keys {
self.pause_task(id);
self.pause_task(state, id);
}
}
info!("Pausing daemon");
self.change_running(false);
self.change_running(state, false);
}
/// Pause a specific task.
/// Send a signal to the process to actually pause the OS process
fn pause_task(&mut self, id: usize) {
fn pause_task(&mut self, state: &SharedState, id: usize) {
if !self.children.contains_key(&id) {
return;
}
@ -467,7 +479,7 @@ impl TaskHandler {
Err(err) => info!("Failed pausing task {}: {:?}", id, err),
Ok(success) => {
if success {
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
state.change_status(id, TaskStatus::Paused);
}
}
@ -478,11 +490,11 @@ impl TaskHandler {
/// Handle the pause message:
/// 1. Either kill all tasks.
/// 2. Or only kill specific tasks.
fn kill(&mut self, message: KillMessage) {
fn kill(&mut self, state: &SharedState, message: KillMessage) {
// Only pause specific tasks
if !message.task_ids.is_empty() {
for id in message.task_ids {
self.kill_task(id);
self.kill_task(state, id);
}
return;
}
@ -490,24 +502,24 @@ impl TaskHandler {
// Pause the daemon and kill all tasks
if message.all {
info!("Killing all spawned children");
self.change_running(false);
self.change_running(state, false);
let keys: Vec<usize> = self.children.keys().cloned().collect();
for id in keys {
self.kill_task(id);
self.kill_task(state, id);
}
}
}
/// Kill a specific task and handle it accordingly
/// Triggered on `reset` and `kill`.
fn kill_task(&mut self, task_id: usize) {
fn kill_task(&mut self, state: &SharedState, task_id: usize) {
if let Some(child) = self.children.get_mut(&task_id) {
match child.kill() {
Err(_) => debug!("Task {} has already finished by itself", task_id),
_ => {
// Already mark the task as killed over here.
// It's hard to distinguish whether it's killed later on.
let mut state = self.state.lock().unwrap();
let mut state = state.lock().unwrap();
let mut task = state.tasks.get_mut(&task_id).unwrap();
task.status = TaskStatus::Done;
task.result = Some(TaskResult::Killed);
@ -546,12 +558,12 @@ impl TaskHandler {
/// Kill all children by reusing the `kill` function
/// Set the `reset` flag, which will prevent new tasks from being spawned.
/// If all children finished, the state will be completely reset.
fn reset(&mut self) {
fn reset(&mut self, state: &SharedState) {
let message = KillMessage {
task_ids: Vec::new(),
all: true,
};
self.kill(message);
self.kill(state, message);
self.reset = true;
}
@ -563,10 +575,89 @@ impl TaskHandler {
}
/// Change the running state consistently
fn change_running(&mut self, running: bool) {
let mut state = self.state.lock().unwrap();
fn change_running(&mut self, state: &SharedState, running: bool) {
let mut state = state.lock().unwrap();
state.running = running;
self.running = running;
state.save();
}
/// Users can specify a callback that's fired whenever a task finishes
/// Execute the callback by spawning a new subprocess.
fn spawn_callback(&mut self, task: &Task) {
// Return early, if there's no callback specified
let callback = if let Some(callback) = &self.settings.daemon.callback {
callback
} else {
return;
};
// Build the callback command from the given template
let mut handlebars = Handlebars::new();
handlebars.set_strict_mode(true);
// Build templating variables
let mut parameters = HashMap::new();
parameters.insert("id", task.id.to_string());
parameters.insert("command", task.command.clone());
parameters.insert("path", task.path.clone());
parameters.insert("result", task.result.clone().unwrap().to_string());
let callback_command = match handlebars.render_template(&callback, &parameters) {
Ok(command) => command,
Err(err) => {
error!("Failed to create callback command from template with error: {}", err);
return;
}
};
let mut spawn_command = Command::new(if cfg!(windows) { "powershell" } else { "sh" });
if cfg!(windows) {
// Chain two `powershell` commands, one that sets the output encoding to utf8 and then the user provided one.
spawn_command.arg("-c").arg(format!(
"[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8; {}",
callback_command
));
} else {
spawn_command.arg("-c").arg(&callback_command);
}
// Spawn the callback subprocess and log if it fails.
let spawn_result = spawn_command.spawn();
let child = match spawn_result {
Err(error) => {
error!("Failed to spawn callback with error: {}", error);
return;
},
Ok(child) => child,
};
debug!("Spawned callback for task {}", task.id);
self.callbacks.push(child);
}
/// Look at all running callbacks and log any errors
/// If everything went smoothly, simply remove them from the list
fn check_callbacks(&mut self) {
let mut finished = Vec::new();
for (id, child) in self.callbacks.iter_mut().enumerate() {
match child.try_wait() {
// Handle a child error.
Err(error) => {
info!("Callback failed with error {:?}", error);
finished.push(id);
}
// Child process did not exit yet
Ok(None) => continue,
Ok(exit_status) => {
info!("Callback finished with exit code {:?}", exit_status);
finished.push(id);
}
}
}
finished.reverse();
for id in finished.iter() {
self.callbacks.remove(*id);
}
}
}

View file

@ -19,12 +19,13 @@ pub struct Client {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Daemon {
pub pueue_directory: String,
pub default_parallel_tasks: usize,
#[serde(default = "pause_on_failure_default")]
pub pause_on_failure: bool,
// pub address: String,
pub port: String,
pub secret: String,
pub default_parallel_tasks: usize,
#[serde(default = "pause_on_failure_default")]
pub pause_on_failure: bool,
pub callback: Option<String>,
}
fn pause_on_failure_default() -> bool {
@ -55,6 +56,7 @@ impl Settings {
config.set_default("daemon.default_parallel_tasks", 1)?;
config.set_default("daemon.pause_on_failure", false)?;
config.set_default("daemon.secret", random_secret.clone())?;
config.set_default("daemon.callback", None::<String>)?;
// config.set_default("client.daemon_address", "127.0.0.1")?;
config.set_default("client.daemon_port", "6924")?;