Migrate client to new std async

This commit is contained in:
Arne Beer 2019-10-30 01:04:12 +01:00
parent d4e6608920
commit a750d507ef
8 changed files with 67 additions and 137 deletions

View file

@ -16,8 +16,7 @@ maintenance = { status = "actively-developed" }
daemonize = "^0.4"
chrono = { version = "^0.4", features = ["serde"] }
users = "^0.9"
failure = "0.1.3"
failure_derive = "0.1.3"
anyhow = "1"
byteorder = "^1"
serde = "^1.0"
@ -31,9 +30,7 @@ shellexpand = "^1.0"
config = "^0.9"
toml = "^0.5"
futures = "^0.1"
tokio = "^0.1"
tokio-process= "^0.2"
tokio = "0.2.0-alpha.6"
bytes = "^0.4"
uuid = { version = "^0.8", features = ["serde", "v4"] }

View file

@ -1,9 +1,10 @@
use tokio;
use ::anyhow::Result;
use pueue::client::client::Client;
use pueue::settings::Settings;
use ::pueue::client::client::Client;
use ::pueue::settings::Settings;
fn main() {
#[tokio::main]
async fn main() -> Result<()> {
let settings = Settings::new().unwrap();
let save_result = settings.save();
@ -12,7 +13,9 @@ fn main() {
println!("{:?}", save_result.err());
}
let client = Client::new(settings);
let mut client = Client::new(settings)?;
tokio::run(client);
client.run().await?;
Ok(())
}

View file

@ -1,7 +1,5 @@
use tokio;
use pueue::daemon::daemon::Daemon;
use pueue::settings::Settings;
//use pueue::daemon::daemon::Daemon;
use ::pueue::settings::Settings;
fn main() {
let settings = Settings::new().unwrap();
@ -12,7 +10,7 @@ fn main() {
println!("{:?}", save_result.err());
}
let daemon = Daemon::new(&settings);
tokio::run(daemon);
// let daemon = Daemon::new(&settings);
//
// tokio::run(daemon);
}

View file

@ -1,8 +1,9 @@
use clap::{App, Arg, SubCommand};
use ::anyhow::{anyhow, Result};
use clap::{App, Arg};
use crate::communication::message::*;
pub fn handle_cli() -> Message {
pub fn handle_cli() -> Result<Message> {
let matches = App::new("Pueue client")
.version("0.1")
.author("Arne Beer <contact@arne.beer>")
@ -15,10 +16,17 @@ pub fn handle_cli() -> Message {
)
.get_matches();
let command: Vec<String> = matches.value_of("command").unwrap().to_string().split(" ").map(|x| x.to_string()).collect();
let command = matches
.value_of("command")
.ok_or(anyhow!("You need to specify a command"))?;
let command: Vec<String> = command
.to_string()
.split(" ")
.map(|x| x.to_string())
.collect();
Message::Add(AddMessage {
Ok(Message::Add(AddMessage {
command: command,
path: String::from("/"),
})
}))
}

View file

@ -1,9 +1,7 @@
use ::anyhow::Result;
use ::byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use ::failure::Error;
use ::futures::Future;
use ::std::io::Cursor;
use ::tokio::io as tokio_io;
use ::tokio::net::UnixStream;
use ::tokio::net::TcpStream;
use ::tokio::prelude::*;
use crate::client::cli::handle_cli;
@ -15,21 +13,30 @@ use crate::settings::Settings;
pub struct Client {
settings: Settings,
message: Message,
response: Option<String>,
communication_future:
Option<Box<dyn Future<Item = (UnixStream, Vec<u8>), Error = Error> + Send>>,
}
impl Client {
pub fn new(settings: Settings) -> Self {
let message = handle_cli();
pub fn new(settings: Settings) -> Result<Self> {
let message = handle_cli()?;
Client {
Ok(Client {
settings: settings,
message: message,
response: None,
communication_future: None,
}
})
}
pub async fn run(&mut self) -> Result<()> {
// Connect to stream
let mut stream = TcpStream::connect(get_socket_path(&self.settings)).await?;
// Create the message payload and send it to the daemon.
self.send_message(&mut stream).await?;
// Check if we can receive the response from the daemon
let response = self.receive_answer(&mut stream).await?;
println!("{}", &response);
Ok(())
}
/// Send a message to the daemon.
@ -37,12 +44,7 @@ impl Client {
/// Some payloads are serialized `Add` or `Remove` messages.
/// Before we send the actual payload, a header is sent with two u64.
/// One signals the type of the message, whilst the other signals the length of the payload.
pub fn send_message(&mut self) {
// Early return if we are already waiting for a future.
if self.communication_future.is_some() {
return;
}
async fn send_message(&mut self, stream: &mut TcpStream) -> Result<()> {
// Prepare command for transfer and determine message byte size
let payload = serde_json::to_string(&self.message)
.expect("Failed to serialize message.")
@ -54,102 +56,24 @@ impl Client {
// Send the request size header first.
// Afterwards send the request.
let communication_future = UnixStream::connect(get_socket_path(&self.settings))
.and_then(|stream| tokio_io::write_all(stream, header))
.and_then(|(stream, _written)| tokio_io::write_all(stream, payload))
.and_then(|(stream, _written)| tokio_io::read_exact(stream, vec![0; 8]))
.and_then(|(stream, header)| {
// Extract the instruction size from the header bytes
let mut header = Cursor::new(header);
let instruction_size = header.read_u64::<BigEndian>().unwrap() as usize;
stream.write_all(&header).await?;
stream.write_all(&payload).await?;
tokio_io::read_exact(stream, vec![0; instruction_size])
})
.map_err(|error| Error::from(error));
self.communication_future = Some(Box::new(communication_future));
Ok(())
}
/// Receive the response of the daemon and handle it.
pub fn receive_answer(&mut self) -> bool {
// Now receive the response until the connection closes.
let result = self.communication_future.poll();
async fn receive_answer(&mut self, stream: &mut TcpStream) -> Result<String> {
// Extract the instruction size from the header bytes
let mut header_buffer = vec![0; 8];
stream.read_exact(&mut header_buffer).await?;
let mut header = Cursor::new(header_buffer);
let instruction_size = header.read_u64::<BigEndian>().unwrap() as usize;
// Handle socket error
if result.is_err() {
println!("Socket errored during read");
println!("{:?}", result.err());
// Receive the instruction
let mut buffer = vec![0; instruction_size];
stream.read(&mut buffer).await?;
panic!("Communication failed.");
}
// We received a response from the daemon. Handle it
match result.unwrap() {
Async::Ready(received_bytes_result) => {
// Check whether we received something from the daemon.
let (_, received_bytes) =
if let Some((stream, received_bytes)) = received_bytes_result {
(stream, received_bytes)
} else {
// Handle socket error
println!("Received an empty message from the daemon.");
panic!("Communication failed.");
};
// Extract response and handle invalid utf8
let response_result = String::from_utf8(received_bytes);
let response = if let Ok(response) = response_result {
response
} else {
println!("Didn't receive valid utf8.");
println!("{:?}", response_result.err());
panic!("Communication failed.");
};
self.response = Some(response);
true
}
Async::NotReady => false,
}
}
/// Handle the response of the daemon.
pub fn handle_response(&self) -> bool {
let response = if let Some(ref response) = self.response {
response
} else {
return false;
};
println!("{}", &response);
return true;
}
}
impl Future for Client {
type Item = ();
type Error = ();
/// The poll function of the client.
/// Send a message, receive the response and handle it accordingly to the current task.
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
// Create the message payload and send it to the daemon.
self.send_message();
// Check if we can receive the response from the daemon
let answer_received = self.receive_answer();
// Return NotReady until the response has been received and handled.
if answer_received {
// Handle the response from the daemon
self.handle_response();
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
Ok(String::from_utf8(buffer)?)
}
}

View file

@ -1,6 +1,6 @@
use ::serde_derive::{Deserialize, Serialize};
use crate::daemon::error::DaemonError;
use ::anyhow::Error;
/// The Message used to add a new command to the daemon.
#[derive(Serialize, Deserialize, Debug)]
@ -66,7 +66,7 @@ pub struct FailureMessage {
pub text: String,
}
pub fn create_success_message(text: String) -> Result<Message, DaemonError> {
pub fn create_success_message(text: String) -> Result<Message, Error> {
Ok(Message::Success(SuccessMessage { text: text }))
}

View file

@ -1,4 +1,4 @@
use ::failure::Error;
use ::anyhow::Error;
use ::std::fs::File;
pub fn open_log_file_handles(index: usize) -> Result<(File, File), Error> {

View file

@ -1,5 +1,5 @@
pub mod client;
pub mod communication;
pub mod daemon;
//pub mod daemon;
pub mod file;
pub mod settings;