sqlx
Some checks failed
ci/woodpecker/push/build Pipeline failed

This commit is contained in:
JMARyA 2024-12-08 17:59:30 +01:00
parent 3e35dd669a
commit d6d44b457c
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
7 changed files with 1079 additions and 148 deletions

957
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -9,9 +9,9 @@ edition = "2021"
chrono = "0.4.35" chrono = "0.4.35"
env_logger = "0.11.3" env_logger = "0.11.3"
log = "0.4.21" log = "0.4.21"
rusqlite = "0.30.0"
serde = { version = "1.0.196", features = ["derive"] } serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113" serde_json = "1.0.113"
toml = "0.8.10" toml = "0.8.10"
jobdispatcher = { git = "https://git.hydrar.de/jmarya/jobdispatcher" }
reqwest = { version = "0.11.26", features = ["blocking", "json"] } reqwest = { version = "0.11.26", features = ["blocking", "json"] }
sqlx = { version = "0.8", features = ["postgres", "sqlite", "runtime-tokio-native-tls", "derive", "uuid", "chrono", "json"] }
tokio = { version = "1.42.0", features = ["full"] }

View file

@ -2,6 +2,12 @@
# Data Download Directory # Data Download Directory
data_dir = "./download" data_dir = "./download"
# Sqlite
database = "data/download.db"
# Postgres
#database = "postgres://user:password@localhost/dbname"
[youtube] [youtube]
# Interval in minutes for checking # Interval in minutes for checking
interval = 2 interval = 2

14
migrations/0001_init.sql Normal file
View file

@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS urls (
id INTEGER PRIMARY KEY,
url TEXT NOT NULL,
timestamp TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS item_log (
id INTEGER PRIMARY KEY,
module TEXT NOT NULL,
name TEXT NOT NULL,
url TEXT NOT NULL,
timestamp TEXT NOT NULL,
CONSTRAINT unique_module_name_url UNIQUE (module, name, url)
);

View file

@ -9,6 +9,9 @@ use crate::yt_dlp::config::YtDlpConfig;
pub struct HoardConfig { pub struct HoardConfig {
/// Top level data download directory /// Top level data download directory
pub data_dir: PathBuf, pub data_dir: PathBuf,
// Database (Sqlite or Postgres)
pub database: String,
} }
/// Top level global config /// Top level global config

205
src/db.rs
View file

@ -1,101 +1,135 @@
use jobdispatcher::{JobDispatcher, JobOrder}; #[derive(Clone)]
use rusqlite::{Connection, OptionalExtension};
use std::sync::{mpsc::Receiver, Arc};
pub struct DatabaseBackend { pub struct DatabaseBackend {
pub file: String, pub db_url: String,
pub conn: Connection, pub sqlite: Option<sqlx::Pool<sqlx::Sqlite>>,
pub dispatcher: Arc<JobDispatcher<Query, Out>>, pub postgres: Option<sqlx::Pool<sqlx::Postgres>>,
pub recv: Receiver<JobOrder<Query, Out>>, }
pub fn ensure_file_exists(path: &str) {
// Check if the file exists
if !std::path::Path::new(path).exists() {
// If the file does not exist, create an empty one
match std::fs::File::create(path) {
Ok(_) => log::info!("Created {path}"),
Err(e) => log::error!("Failed to create file: {}", e),
}
}
} }
impl DatabaseBackend { impl DatabaseBackend {
pub fn new(file: &str) -> Self { pub async fn new(db_url: &str) -> Self {
let (dispatcher, recv) = jobdispatcher::JobDispatcher::<Query, Out>::new(); let mut sqlite = None;
let conn = Connection::open(file).unwrap(); let mut postgres = None;
conn.execute( if db_url.starts_with("postgres") {
"CREATE TABLE IF NOT EXISTS urls ( postgres = Some(
id INTEGER PRIMARY KEY, sqlx::postgres::PgPoolOptions::new()
url TEXT NOT NULL, .max_connections(5)
timestamp TEXT NOT NULL .connect(&std::env::var("DATABASE_URL").unwrap())
)", .await
[], .unwrap(),
) );
sqlx::migrate!("./migrations")
.run(postgres.as_ref().unwrap())
.await
.unwrap(); .unwrap();
} else {
conn.execute( ensure_file_exists(db_url);
"CREATE TABLE IF NOT EXISTS item_log ( sqlite = Some(sqlx::sqlite::SqlitePool::connect(db_url).await.unwrap());
id INTEGER PRIMARY KEY, sqlx::migrate!("./migrations")
module TEXT NOT NULL, .run(sqlite.as_ref().unwrap())
name TEXT NOT NULL, .await
url TEXT NOT NULL,
timestamp TEXT NOT NULL
)",
[],
)
.unwrap(); .unwrap();
}
let dispatcher = Arc::new(dispatcher);
Self { Self {
file: file.to_string(), db_url: db_url.to_string(),
conn, sqlite,
dispatcher, postgres,
recv,
} }
} }
pub fn take_db(&self) -> Database { pub fn take_db(&self) -> Database {
Database::new(self.dispatcher.clone()) Database::new(self.clone())
} }
pub fn run(&self) { pub async fn query(&self, param: Query) -> Out {
while let Ok(job) = self.recv.recv() { match param {
match job.param {
Query::InsertUrl(ref url) => { Query::InsertUrl(ref url) => {
let timestamp = chrono::Local::now().to_rfc3339(); if let Some(postgres) = self.postgres.as_ref() {
self.conn sqlx::query("INSERT INTO urls (url, timestamp) VALUES ($1, CURRENT_TIMESTAMP)")
.execute( .bind(url)
"INSERT INTO urls (url, timestamp) VALUES (?, ?)", .execute(postgres)
[url, &timestamp], .await
)
.unwrap(); .unwrap();
job.done(Out::Ok); } else {
if let Some(sqlite) = self.sqlite.as_ref() {
sqlx::query(
"INSERT INTO urls (url, timestamp) VALUES ($1, CURRENT_TIMESTAMP)",
)
.bind(url)
.execute(sqlite)
.await
.unwrap();
}
}
return Out::Ok;
} }
Query::CheckForUrl(ref url) => { Query::CheckForUrl(ref url) => {
let mut stmt = self let res: (i64,) = if let Some(postgres) = self.postgres.as_ref() {
.conn sqlx::query_as("SELECT COUNT(*) FROM urls WHERE url = $1")
.prepare("SELECT COUNT(*) FROM urls WHERE url = ?") .bind(url)
.unwrap(); .fetch_one(postgres)
let count: i64 = stmt.query_row([url], |row| row.get(0)).unwrap(); .await
job.done(Out::Bool(count > 0)); .unwrap()
} else {
sqlx::query_as("SELECT COUNT(*) FROM urls WHERE url = $1")
.bind(url)
.fetch_one(self.sqlite.as_ref().unwrap())
.await
.unwrap()
};
let count: i64 = res.0;
return Out::Bool(count > 0);
} }
Query::UpdateNewDownloads(ref module, ref name, ref url) => { Query::UpdateNewDownloads(ref module, ref name, ref url) => {
let timestamp = chrono::Local::now().to_rfc3339(); if let Some(postgres) = self.postgres.as_ref() {
sqlx::query(
// Check if the entry exists r#"
let existing_timestamp: Option<String> = self.conn.query_row( INSERT INTO item_log (module, name, url, CURRENT_TIMESTAMP)
"SELECT timestamp FROM item_log WHERE module = ? AND name = ? AND url = ?", VALUES ($1, $2, $3)
[module, name, url], ON CONFLICT (module, name, url)
|row| row.get(0) DO UPDATE SET timestamp = CURRENT_TIMESTAMP
).optional().unwrap(); "#,
)
if existing_timestamp.is_some() { .bind(module)
// Entry exists, update timestamp .bind(name)
self.conn.execute( .bind(url)
"UPDATE item_log SET timestamp = ? WHERE module = ? AND name = ? AND url = ?", .execute(postgres)
[&timestamp, module, name, url] .await
).unwrap(); .unwrap();
} else { } else {
// Entry doesn't exist, insert new row if let Some(sqlite) = self.sqlite.as_ref() {
self.conn.execute( sqlx::query(
"INSERT INTO item_log (module, name, url, timestamp) VALUES (?, ?, ?, ?)", r#"
[module, name, url, &timestamp] INSERT INTO item_log (module, name, url, timestamp)
).unwrap(); VALUES ($1, $2, $3, CURRENT_TIMESTAMP)
ON CONFLICT (module, name, url)
DO UPDATE SET timestamp = CURRENT_TIMESTAMP
"#,
)
.bind(module)
.bind(name)
.bind(url)
.execute(sqlite)
.await
.unwrap();
}
} }
job.done(Out::Ok); return Out::Ok;
}
} }
} }
} }
@ -115,17 +149,20 @@ pub enum Out {
#[derive(Clone)] #[derive(Clone)]
pub struct Database { pub struct Database {
conn: Arc<JobDispatcher<Query, Out>>, conn: DatabaseBackend,
} }
impl Database { impl Database {
pub fn new(conn: Arc<JobDispatcher<Query, Out>>) -> Self { pub fn new(conn: DatabaseBackend) -> Self {
Self { conn } Self { conn }
} }
/// Insert a URL into the database as already downloaded /// Insert a URL into the database as already downloaded
pub fn insert_url(&self, url: &str) { pub fn insert_url(&self, url: &str) {
self.conn.send(Query::InsertUrl(url.to_string())); let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
self.conn.query(Query::InsertUrl(url.to_string())).await;
});
} }
/// Check if a URL is already in the database /// Check if a URL is already in the database
@ -142,19 +179,27 @@ impl Database {
/// } /// }
/// ``` /// ```
pub fn check_for_url(&self, url: &str) -> bool { pub fn check_for_url(&self, url: &str) -> bool {
match self.conn.send(Query::CheckForUrl(url.to_string())) { let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
match self.conn.query(Query::CheckForUrl(url.to_string())).await {
Out::Ok => false, Out::Ok => false,
Out::Bool(b) => b, Out::Bool(b) => b,
} }
})
} }
/// Keep a record on when download happen. /// Keep a record on when download happen.
/// This takes a `module`, `name` and `url` and saves a timestamp to the db. /// This takes a `module`, `name` and `url` and saves a timestamp to the db.
pub fn update_new_downloads(&self, module: &str, name: &str, url: &str) { pub fn update_new_downloads(&self, module: &str, name: &str, url: &str) {
self.conn.send(Query::UpdateNewDownloads( let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
self.conn
.query(Query::UpdateNewDownloads(
module.to_string(), module.to_string(),
name.to_string(), name.to_string(),
url.to_string(), url.to_string(),
)); ))
.await;
});
} }
} }

View file

@ -1,10 +1,13 @@
#![feature(async_closure)]
use hoard::config::GlobalConfig; use hoard::config::GlobalConfig;
use hoard::{ensure_dir_exists, Module}; use hoard::{ensure_dir_exists, Module};
// todo : migrate to async code? // todo : migrate to async code?
// todo : better log options // todo : better log options
fn main() { #[tokio::main]
async fn main() {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
std::env::set_var("RUST_LOG", "trace"); std::env::set_var("RUST_LOG", "trace");
@ -21,11 +24,12 @@ fn main() {
log::info!("Starting hoard"); log::info!("Starting hoard");
let db = hoard::db::DatabaseBackend::new("data/download.db");
let config: GlobalConfig = let config: GlobalConfig =
toml::from_str(&std::fs::read_to_string("config.toml").unwrap()).unwrap(); toml::from_str(&std::fs::read_to_string("config.toml").unwrap()).unwrap();
ensure_dir_exists(&config.hoard.data_dir); ensure_dir_exists(&config.hoard.data_dir);
let db = hoard::db::DatabaseBackend::new(&config.hoard.database).await;
let mut modules: Vec<Box<dyn Module>> = vec![]; let mut modules: Vec<Box<dyn Module>> = vec![];
if let Some(yt_config) = config.youtube { if let Some(yt_config) = config.youtube {
@ -56,10 +60,6 @@ fn main() {
))); )));
} }
let _db_thread = std::thread::spawn(move || {
db.run();
});
let threads: Vec<_> = modules let threads: Vec<_> = modules
.into_iter() .into_iter()
.map(|x| { .map(|x| {