Move communication logic into own module, add secret

This commit is contained in:
Arne Beer 2019-12-17 00:46:59 +01:00
parent 9aaa59b86b
commit 9f38fcaa11
7 changed files with 126 additions and 93 deletions

View file

@ -36,6 +36,7 @@ async-std = { version = "1", features = ["attributes", "unstable"] }
dirs = "2"
users = "^0.9"
chrono = { version = "^0.4", features = ["serde"] }
rand = "^0.7"
nix = "0.16"
strum = "0.16"
strum_macros = "0.16"

View file

@ -1,20 +1,8 @@
# Attention!!!
**This is not stable yet.**
You are looking at the Rust rewrite of the program Pueue.
The original repository is over [here](https://github.com/nukesor/pueue).
Once everything is finished, this master will be pushed to the old repository.
Until then, be aware, that everything is experimental and that the documentation might not be up-to-date.
# Pueue
[![GitHub release](https://img.shields.io/github/tag/nukesor/pueue.svg)](https://github.com/nukesor/pueue/releases/latest)
[![Crates.io](https://img.shields.io/crates/v/pueue)](https://crates.io/crates/pueue)
[![MIT Licence](https://img.shields.io/pypi/l/pueue.svg)](https://github.com/Nukesor/pueue/blob/master/LICENSE)
[![Paypal](https://github.com/Nukesor/images/blob/master/paypal-donate-blue.svg)](https://www.paypal.me/arnebeer/)
[![Patreon](https://github.com/Nukesor/images/blob/master/patreon-donate-blue.svg)](https://www.patreon.com/nukesor)
@ -33,6 +21,7 @@ It provides functionality for:
- Manipulation of the scheduled task order
- Running multiple tasks at once (You can decide how many concurrent tasks you want to run)
**Pueue has been rewritten in Rust!!** If you want the old version that's build with python, please install via pip.
## Why should I use it?
@ -59,9 +48,9 @@ PRs are of course always welcome!
There are three different ways to install Pueue.
1. Use an Arch Linux AUR package manager e.g. Yay: `yaourt -S pueue` . This will deploy the service file automatically.
2. Install by using cargo: `pip install pueue`.
3. Clone the repository and execute `python setup.py install`.
1. Use an Arch Linux AUR package manager e.g. yay: `yay -S pueue` . This will deploy the service file and completions automatically.
2. Install by using cargo: `cargo install pueue`.
3. Clone the repository and execute `cargo install --path .`.
## How to use it:
@ -117,24 +106,23 @@ The configuration file of Pueue is located in `~/.config/pueue.yml`.
```
---
client:
daemon_address: 127.0.0.1
daemon_port: "6924"
secret: "your_secret"
daemon:
pueue_directory: /home/$USER/.local/share/pueue
default_parallel_tasks: 1
address: 127.0.0.1
port: "6924"
secret: "your_secret"
```
**Client**:
- `daemon_port` The port the client tries to connect to.
- `secret` The secret, that's used for authentication
**Daemon**:
- `pueue_directory` The location Pueue uses for it's intermediate files and logs.
- `default_parallel_tasks` Determines how many tasks should be processed concurrently.
- `address` The address the daemon should listen to.
- `port` The port the daemon should listen to.
**Client**:
- `daemon_address` The address the client tries to connect to.
- `daemon_port` The port the client tries to connect to.
- `secret` The secret, that's used for authentication
## Logs

View file

@ -1,21 +1,21 @@
use ::anyhow::Result;
use ::async_std::net::TcpStream;
use ::async_std::prelude::*;
use ::byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use ::log::error;
use ::std::io::{self, Cursor, Write};
use ::std::io::{self, Write};
use crate::cli::{Opt, SubCommand};
use crate::instructions::*;
use crate::output::*;
use ::pueue::message::*;
use ::pueue::settings::Settings;
use ::pueue::protocol::*;
/// The client
pub struct Client {
opt: Opt,
daemon_address: String,
message: Message,
secret: String,
}
impl Client {
@ -41,28 +41,32 @@ impl Client {
opt: opt,
daemon_address: address,
message: message,
secret: settings.client.secret,
})
}
pub async fn run(&mut self) -> Result<()> {
// Connect to stream
let mut stream = TcpStream::connect(&self.daemon_address).await?;
// Connect to socket
let mut socket = TcpStream::connect(&self.daemon_address).await?;
let secret = self.secret.clone().into_bytes();
send_bytes(secret, &mut socket).await?;
// Create the message payload and send it to the daemon.
send_message(&self.message, &mut stream).await?;
send_message(&self.message, &mut socket).await?;
// Check if we can receive the response from the daemon
let mut message = receive_answer(&mut stream).await?;
let mut message = receive_message(&mut socket).await?;
while self.handle_message(message, &mut stream).await? {
while self.handle_message(message, &mut socket).await? {
// Check if we can receive the response from the daemon
message = receive_answer(&mut stream).await?;
message = receive_message(&mut socket).await?;
}
Ok(())
}
async fn handle_message(&mut self, message: Message, stream: &mut TcpStream) -> Result<bool> {
async fn handle_message(&mut self, message: Message, socket: &mut TcpStream) -> Result<bool> {
// Handle some messages directly
match message {
Message::Success(text) => print_success(text),
@ -82,7 +86,7 @@ impl Client {
SubCommand::Edit { task_id: _ } => {
// Create a new message with the edited command
let message = edit(message);
send_message(&message, stream).await?;
send_message(&message, socket).await?;
return Ok(true);
}
_ => error!("Received unhandled response message"),
@ -93,45 +97,3 @@ impl Client {
Ok(false)
}
}
/// Send a message to the daemon.
/// The JSON payload is highly dependent on the commandline input parameters
/// Some payloads are serialized `Add` or `Remove` messages.
/// Before we send the actual payload, a header is sent with two u64.
/// The first represents the type of the message, the second is length of the payload.
async fn send_message(message: &Message, stream: &mut TcpStream) -> Result<()> {
// Prepare command for transfer and determine message byte size
let payload = serde_json::to_string(message)
.expect("Failed to serialize message.")
.into_bytes();
let byte_size = payload.len() as u64;
let mut header = vec![];
header.write_u64::<BigEndian>(byte_size).unwrap();
// Send the request size header first.
// Afterwards send the request.
stream.write_all(&header).await?;
stream.write_all(&payload).await?;
Ok(())
}
/// Receive the response of the daemon and handle it.
async fn receive_answer(stream: &mut TcpStream) -> Result<Message> {
// Extract the instruction size from the header bytes
let mut header_buffer = vec![0; 8];
stream.read(&mut header_buffer).await?;
let mut header = Cursor::new(header_buffer);
let instruction_size = header.read_u64::<BigEndian>().unwrap() as usize;
// Receive the instruction
let mut buffer = vec![0; instruction_size];
stream.read(&mut buffer).await?;
let payload = String::from_utf8(buffer)?;
// Interpret the response
let message: Message = serde_json::from_str(&payload)?;
Ok(message)
}

View file

@ -4,11 +4,8 @@ mod stream;
use ::anyhow::Result;
use ::async_std::net::{TcpListener, TcpStream};
use ::async_std::prelude::*;
use ::async_std::task;
use ::byteorder::{BigEndian, ReadBytesExt};
use ::log::info;
use ::std::io::Cursor;
use ::log::{info, warn};
use ::std::sync::mpsc::Sender;
use crate::socket::instructions::handle_message;
@ -16,6 +13,7 @@ use crate::socket::send::send_message;
use crate::socket::stream::handle_show;
use crate::cli::Opt;
use ::pueue::message::*;
use ::pueue::protocol::*;
use ::pueue::settings::Settings;
use ::pueue::state::SharedState;
@ -59,27 +57,24 @@ pub async fn accept_incoming(
/// Continuously poll the existing incoming futures.
/// In case we received an instruction, handle it and create a response future.
/// The response future is added to unix_responses and handled in a separate function.
pub async fn handle_incoming(
async fn handle_incoming(
mut socket: TcpStream,
sender: Sender<Message>,
state: SharedState,
settings: Settings,
) -> Result<()> {
loop {
// Receive the header with the size and type of the message
let mut header = vec![0; 8];
socket.read(&mut header).await?;
// Receive the secret and check, whether the client is allowed to connect
let payload_bytes = receive_bytes(&mut socket).await?;
let secret = String::from_utf8(payload_bytes)?;
if secret != settings.daemon.secret {
warn!("Received invalid secret: {}", secret);
return Ok(());
}
// Extract the instruction size from the header bytes
let mut header = Cursor::new(header);
let instruction_size = header.read_u64::<BigEndian>()? as usize;
let mut instruction_bytes = vec![0; instruction_size];
socket.read(&mut instruction_bytes).await?;
// Receive the message and deserialize it
let instruction = String::from_utf8(instruction_bytes)?;
let message: Message = serde_json::from_str(&instruction)?;
info!("Received instruction: {}", instruction);
// Receive the actual instruction from the client
let message = receive_message(&mut socket).await?;
info!("Received instruction: {:?}", message);
let response = if let Message::StreamRequest(message) = message {
// The client requested the output of a task

View file

@ -3,3 +3,4 @@ pub mod message;
pub mod settings;
pub mod state;
pub mod task;
pub mod protocol;

63
shared/protocol.rs Normal file
View file

@ -0,0 +1,63 @@
use ::anyhow::Result;
use ::async_std::net::TcpStream;
use ::async_std::prelude::*;
use ::std::io::Cursor;
use ::byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use crate::message::*;
/// Convenience wrapper around send_bytes
/// Deserialize a message and feed the bytes into send_bytes
pub async fn send_message(message: &Message, socket: &mut TcpStream) -> Result<()> {
// Prepare command for transfer and determine message byte size
let payload = serde_json::to_string(message)
.expect("Failed to serialize message.")
.into_bytes();
send_bytes(payload, socket).await
}
/// Send a Vec of bytes. Before the actual bytes are send, the size of the message
/// is transmitted in an header of fixed size (u64).
pub async fn send_bytes(payload: Vec<u8>, socket: &mut TcpStream) -> Result<()> {
let byte_size = payload.len() as u64;
let mut header = vec![];
header.write_u64::<BigEndian>(byte_size).unwrap();
// Send the request size header first.
// Afterwards send the request.
socket.write_all(&header).await?;
socket.write_all(&payload).await?;
Ok(())
}
/// Receive a byte stream depending on a given header
/// This is the basic protocol beneath all pueue communication
pub async fn receive_bytes(socket: &mut TcpStream) -> Result<Vec<u8>> {
// Receive the header with the size
let mut header = vec![0; 8];
socket.read(&mut header).await?;
let mut header = Cursor::new(header);
let message_size = header.read_u64::<BigEndian>()? as usize;
// Receive the payload
let mut payload_bytes = vec![0; message_size];
socket.read(&mut payload_bytes).await?;
Ok(payload_bytes)
}
/// Convenience wrapper that receives a message and converts it into a Message
pub async fn receive_message(socket: &mut TcpStream) -> Result<Message> {
let payload_bytes = receive_bytes(socket).await?;
// Deserialize the message
let message = String::from_utf8(payload_bytes)?;
let message: Message = serde_json::from_str(&message)?;
Ok(message)
}

View file

@ -4,6 +4,7 @@ use ::serde_derive::{Deserialize, Serialize};
use ::std::fs::File;
use ::std::io::prelude::*;
use ::std::path::{Path, PathBuf};
use ::rand::Rng;
use ::config::Config;
@ -11,6 +12,7 @@ use ::config::Config;
pub struct Client {
// pub daemon_address: String,
pub daemon_port: String,
pub secret: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
@ -19,6 +21,7 @@ pub struct Daemon {
pub default_parallel_tasks: usize,
// pub address: String,
pub port: String,
pub secret: String,
}
@ -38,14 +41,17 @@ impl Settings {
pub fn new() -> Result<Settings> {
let mut config = Config::new();
let random_secret = gen_random_secret();
// Set pueue config defaults
config.set_default("daemon.pueue_directory", default_pueue_path()?)?;
// config.set_default("daemon.address", "127.0.0.1")?;
config.set_default("daemon.port", "6924")?;
config.set_default("daemon.default_parallel_tasks", 1)?;
config.set_default("daemon.secret", random_secret.clone())?;
// config.set_default("client.daemon_address", "127.0.0.1")?;
config.set_default("client.daemon_port", "6924")?;
config.set_default("client.secret", random_secret)?;
// Add in the home config file
parse_config(&mut config)?;
@ -85,6 +91,23 @@ fn get_home_dir() -> Result<PathBuf> {
dirs::home_dir().ok_or(anyhow!("Couldn't resolve home dir"))
}
fn gen_random_secret() -> String {
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ\
abcdefghijklmnopqrstuvwxyz\
0123456789)(*&^%$#@!~";
const PASSWORD_LEN: usize = 20;
let mut rng = rand::thread_rng();
let secret: String = (0..PASSWORD_LEN)
.map(|_| {
let idx = rng.gen_range(0, CHARSET.len());
CHARSET[idx] as char
})
.collect();
secret
}
#[cfg(target_os = "linux")]
fn default_config_path() -> Result<PathBuf> {
Ok(get_home_dir()?.join(".config/pueue.yml"))