Rewrite state

This commit is contained in:
Arne Beer 2019-11-11 15:59:01 +01:00
parent d886d98495
commit 5862eb1a90
8 changed files with 117 additions and 103 deletions

View file

@ -42,7 +42,7 @@ impl Client {
/// The JSON payload is highly dependent on the commandline input parameters
/// Some payloads are serialized `Add` or `Remove` messages.
/// Before we send the actual payload, a header is sent with two u64.
/// One signals the type of the message, whilst the other signals the length of the payload.
/// The first represents the type of the message, the second is length of the payload.
async fn send_message(&mut self, stream: &mut TcpStream) -> Result<()> {
// Prepare command for transfer and determine message byte size
let payload = serde_json::to_string(&self.message)

View file

@ -1,4 +1,4 @@
pub mod queue;
pub mod state;
pub mod socket_handler;
pub mod task;
pub mod task_handler;

View file

@ -1,77 +0,0 @@
use ::std::mem;
use ::anyhow::{Error, Result};
use ::std::process::Child;
use crate::communication::message::*;
use crate::daemon::task::{Task, TaskStatus};
use crate::daemon::task_handler::*;
pub type Queue = Vec<Option<Box<Task>>>;
pub fn add_task(queue: &mut Queue, message: AddMessage) -> Result<Message> {
let mut command = message.command.clone();
let arguments = command.split_off(1);
let task = Task {
command: command.pop().expect("Expected command"),
arguments: arguments,
path: message.path.clone(),
status: TaskStatus::Queued,
returncode: None,
stdout: None,
stderr: None,
start: None,
end: None,
};
queue.push(Some(Box::new(task)));
create_success_message(String::from("New task added."))
}
pub fn remove_task(
queue: &mut Queue,
task_handler: &mut TaskHandler,
message: RemoveMessage,
) -> Result<Message> {
create_success_message(String::from("Task removed"))
}
pub fn get_next_task(queue: &mut Queue) -> Option<(usize, Task)> {
for (i, task) in queue.iter().enumerate() {
match task {
None => continue,
Some(task) => match task.status {
TaskStatus::Queued => {
return Some((i, *task.clone()));
}
_ => continue,
},
}
}
None
}
pub fn update_task(queue: &mut Queue, index: usize, task: Task) {
mem::replace(&mut queue[index], Some(Box::new(task)));
}
pub fn change_status(queue: &mut Queue, index: usize, status: TaskStatus) {
let ref mut task = if let Some(ref mut task) = queue[index] {
task
} else {
return;
};
task.status = status;
}
pub fn get_task_status(queue: &Queue, index: usize) -> Option<TaskStatus> {
if let Some(ref task) = queue[index] {
Some(task.status.clone())
} else {
None
}
}
pub fn handle_finished_child(_queue: &mut Queue, _index: usize, _child: Child) {}

View file

@ -16,7 +16,7 @@ pub async fn accept_incoming(settings: &Settings) -> Result<()> {
// Poll if we have a new incoming connection.
let (socket, _) = listener.accept().await?;
tokio::spawn(async move {
handle_incoming(socket).await;
let result = handle_incoming(socket).await;
});
}
}

77
src/daemon/state.rs Normal file
View file

@ -0,0 +1,77 @@
use ::std::sync::{Arc, Mutex};
use ::std::collections::BTreeMap;
use ::anyhow::{Error, Result};
use ::std::process::Child;
use crate::communication::message::*;
use crate::daemon::task::{Task, TaskStatus};
use crate::daemon::task_handler::*;
pub type SharedState = Arc<Mutex<State>>;
pub struct State {
max_id: i32,
pub queued: BTreeMap<i32, Task>,
pub running: BTreeMap<i32, Task>,
pub finished: BTreeMap<i32, Task>,
}
impl State {
pub fn add_task(&mut self, message: AddMessage) -> Result<Message> {
let mut command = message.command.clone();
let arguments = command.split_off(1);
let task = Task {
id: self.max_id,
command: command.pop().expect("Expected command"),
arguments: arguments,
path: message.path.clone(),
status: TaskStatus::Queued,
returncode: None,
stdout: None,
stderr: None,
start: None,
end: None,
};
self.queued.insert(self.max_id, task);
self.max_id += 1;
create_success_message(String::from("New task added."))
}
pub fn remove_task(
&mut self,
task_handler: &mut TaskHandler,
message: RemoveMessage,
) -> Result<Message> {
create_success_message(String::from("Task removed"))
}
pub fn get_next_task(&mut self) -> Option<(i32)> {
for (id, task) in self.queued.iter() {
match task.status {
TaskStatus::Queued => {
return Some(*id);
}
_ => continue,
}
}
None
}
pub fn change_status(&mut self, index: i32, status: TaskStatus) {
let ref mut task = if let Some(ref mut task) = self.queued.get_mut(&index) {
task.status = status;
};
}
pub fn get_task_status(&mut self, index: i32) -> Option<TaskStatus> {
if let Some(ref task) = self.queued.get(&index) {
return Some(task.status.clone());
};
None
}
pub fn handle_finished_child(&mut self, _index: i32, _child: Child) {}
}

View file

@ -4,6 +4,7 @@ use ::serde_derive::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone)]
pub struct Task {
pub id: i32,
pub command: String,
pub arguments: Vec<String>,
pub path: String,

View file

@ -1,51 +1,52 @@
use ::std::collections::HashMap;
use ::std::collections::BTreeMap;
use ::std::process::{ExitStatus, Stdio};
use ::std::task::Poll;
use ::std::future::Future;
use ::std::process::{Command, Child};
use ::anyhow::{Error, Result};
use ::anyhow::{Error, Result, anyhow};
use crate::daemon::queue::*;
use crate::daemon::state::SharedState;
use crate::daemon::task::{Task, TaskStatus};
use crate::file::log::{create_log_file_handles, open_log_file_handles};
pub struct TaskHandler {
children: HashMap<usize, Child>,
state: SharedState,
children: BTreeMap<i32, Child>,
is_running: bool,
}
impl TaskHandler {
pub fn new() -> Self {
pub fn new(state: SharedState) -> Self {
TaskHandler {
children: HashMap::new(),
state: state,
children: BTreeMap::new(),
is_running: true,
}
}
}
impl TaskHandler {
pub fn check(&mut self, queue: &mut Queue) {
self.process_finished(queue);
self.check_new(queue);
pub fn check(&mut self) {
self.process_finished();
self.check_new();
}
/// Check whether there are any finished processes
fn process_finished(&mut self, queue: &mut Queue) {
fn process_finished(&mut self) {
let (finished, errored) = self.get_finished();
let mut state = self.state.lock().unwrap();
// Iterate over everything.
for index in finished.iter() {
let child = self.children.remove(index).expect("Child went missing");
handle_finished_child(queue, *index, child);
state.handle_finished_child(*index, child);
}
for index in errored.iter() {
let child = self.children.remove(index).expect("Child went missing");
change_status(queue, *index, TaskStatus::Failed);
state.change_status(*index, TaskStatus::Failed);
}
}
fn get_finished(&mut self) -> (Vec<usize>, Vec<usize>) {
fn get_finished(&mut self) -> (Vec<i32>, Vec<i32>) {
let mut finished = Vec::new();
let mut errored = Vec::new();
for (index, child) in self.children.iter_mut() {
@ -66,9 +67,8 @@ impl TaskHandler {
}
/// See if the task manager has a free slot and can start a new process.
fn check_new(&mut self, queue: &mut Queue) -> Result<(), Error> {
let next_task = get_next_task(queue);
let (index, task) = if let Some((index, task)) = next_task {
fn check_new(&mut self) -> Result<()> {
let (index, task) = if let Some((index, task)) = self.get_next()? {
(index, task)
} else {
return Ok(());
@ -76,12 +76,10 @@ impl TaskHandler {
self.start_process(index, &task)?;
change_status(queue, index, TaskStatus::Running);
Ok(())
}
fn start_process(&mut self, index: usize, task: &Task) -> Result<()> {
fn start_process(&mut self, index: i32, task: &Task) -> Result<()> {
let (stdout_log, stderr_log) = create_log_file_handles(index)?;
let child = Command::new(task.command.clone())
.args(task.arguments.clone())
@ -92,6 +90,21 @@ impl TaskHandler {
.spawn()?;
self.children.insert(index, child);
let mut state = self.state.lock().unwrap();
state.change_status(index, TaskStatus::Running);
Ok(())
}
fn get_next(&mut self) -> Result<Option<(i32, Task)>> {
let mut state = self.state.lock().unwrap();
let next_task = state.get_next_task();
match next_task {
Some(index) => {
let task = state.queued.remove(&index).ok_or(anyhow!("Expected queued item"))?;
Ok(Some((index, task)))
}
None => Ok(None)
}
}
}

View file

@ -1,14 +1,14 @@
use ::anyhow::Error;
use ::std::fs::File;
pub fn open_log_file_handles(index: usize) -> Result<(File, File), Error> {
pub fn open_log_file_handles(index: i32) -> Result<(File, File), Error> {
let stdout_log = File::open(format!("/tmp/{}_stdout.log", index))?;
let stderr_log = File::open(format!("/tmp/{}_stderr.log", index))?;
Ok((stdout_log, stderr_log))
}
pub fn create_log_file_handles(index: usize) -> Result<(File, File), Error> {
pub fn create_log_file_handles(index: i32) -> Result<(File, File), Error> {
let stdout_log = File::create(format!("/tmp/{}_stdout.log", index))?;
let stderr_log = File::create(format!("/tmp/{}_stderr.log", index))?;