diff --git a/Cargo.lock b/Cargo.lock index ec9914e..d302aac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "atoi" version = "2.0.0" @@ -222,14 +228,44 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "comrade" version = "0.1.0" -source = "git+https://git.hydrar.de/jmarya/comrade#5cb4facc4872a961a3ad3b11d4b48d01016b6600" +source = "git+https://git.hydrar.de/jmarya/comrade#00e448ac316623bb27cc3a3b799fc201cd6302d7" dependencies = [ + "comrade-macro", + "crossbeam", + "dashmap", "env_logger", "log", + "once_cell", + "rand 0.9.0", + "redis", + "serde", + "serde_json", "tokio", + "uuid", +] + +[[package]] +name = "comrade-macro" +version = "0.1.0" +source = "git+https://git.hydrar.de/jmarya/comrade#00e448ac316623bb27cc3a3b799fc201cd6302d7" +dependencies = [ + "proc-macro2", + "quote", + "serde_json", + "syn", ] [[package]] @@ -287,6 +323,47 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -312,6 +389,20 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.7.9" @@ -614,6 +705,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -631,7 +728,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -670,6 +767,7 @@ version = "0.1.0" dependencies = [ "chrono", "comrade", + "crossbeam", "env_logger", "log", "reqwest", @@ -935,7 +1033,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1085,6 +1183,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-bigint-dig" version = "0.8.4" @@ -1097,7 +1205,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -1280,7 +1388,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -1308,8 +1416,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.23", ] [[package]] @@ -1319,7 +1438,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1331,6 +1460,32 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + +[[package]] +name = "redis" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034fb926579ff49d3fe58d288d5dcb580bf11e9bccd33224b45adebf0fd0c23" +dependencies = [ + "arc-swap", + "combine", + "itoa", + "num-bigint", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.10" @@ -1422,7 +1577,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core", + "rand_core 0.6.4", "signature", "spki", "subtle", @@ -1437,9 +1592,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f8dcd64f141950290e45c99f7710ede1b600297c91818bb30b3667c0f45dc0" +checksum = "dade4812df5c384711475be5fcd8c162555352945401aed22a35bffeab61f657" dependencies = [ "bitflags 2.9.0", "errno", @@ -1571,6 +1726,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" @@ -1604,7 +1765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -1683,7 +1844,7 @@ dependencies = [ "futures-intrusive", "futures-io", "futures-util", - "hashbrown", + "hashbrown 0.15.2", "hashlink", "indexmap", "log", @@ -1772,7 +1933,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "serde", "sha1", @@ -1812,7 +1973,7 @@ dependencies = [ "md-5", "memchr", "once_cell", - "rand", + "rand 0.8.5", "serde", "serde_json", "sha2", @@ -1983,9 +2144,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a" dependencies = [ "backtrace", "bytes", @@ -2189,6 +2350,9 @@ name = "uuid" version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" +dependencies = [ + "getrandom 0.3.1", +] [[package]] name = "vcpkg" @@ -2557,7 +2721,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" +dependencies = [ + "zerocopy-derive 0.8.23", ] [[package]] @@ -2571,6 +2744,17 @@ dependencies = [ "syn", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index c9bc2df..987299c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ reqwest = { version = "0.11.26", features = ["blocking", "json"] } sqlx = { version = "0.8", features = ["postgres", "sqlite", "runtime-tokio-native-tls", "derive", "uuid", "chrono", "json"] } tokio = { version = "1.42.0", features = ["full"] } comrade = { git = "https://git.hydrar.de/jmarya/comrade" } +crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } diff --git a/src/main.rs b/src/main.rs index bace8d1..a4c11bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use hoard::config::GlobalConfig; +use hoard::yt_dlp::download_yt_dlp_init; use hoard::{ensure_dir_exists, Module}; // todo : migrate to async code? @@ -26,6 +27,9 @@ async fn main() { toml::from_str(&std::fs::read_to_string("config.toml").unwrap()).unwrap(); ensure_dir_exists(&config.hoard.data_dir); + let mut sm = comrade::service::ServiceManager::new(); + sm = download_yt_dlp_init(sm); + let db = hoard::db::DatabaseBackend::new(&config.hoard.database).await; let mut modules: Vec = vec![]; @@ -60,10 +64,8 @@ async fn main() { ))); } - let mut sm = comrade::service::ServiceManager::new(); - for module in modules { - sm.register(&module.name(), move |_| { + sm = sm.register(&module.name(), move |_| { module.run(); }); } diff --git a/src/yt_dlp/mod.rs b/src/yt_dlp/mod.rs index 532d574..d760166 100644 --- a/src/yt_dlp/mod.rs +++ b/src/yt_dlp/mod.rs @@ -2,13 +2,106 @@ use std::{ io::{BufRead, BufReader}, path::PathBuf, process::Command, + str::FromStr, }; pub mod config; +use comrade::job::{JobDispatcher, JobOrder}; +use comrade::service::ServiceManager; +use comrade::worker; use config::YtDlpConfig; +use crossbeam::channel::Receiver; use crate::ensure_dir_exists; +#[worker(3)] +fn download_yt_dlp( + config: YtDlpConfig, + video_url: String, + video_title: String, + cwd: String, +) -> bool { + log::info!("Started downloading {video_title}"); + + let cwd = PathBuf::from_str(&cwd).unwrap(); + ensure_dir_exists(&cwd); + + let mut command = Command::new("yt-dlp"); + let mut command = command.current_dir(cwd).arg("--downloader").arg("aria2c"); + + if config.write_thumbnail.unwrap_or(true) { + command = command.arg("--write-thumbnail"); + } + if config.write_description.unwrap_or(false) { + command = command.arg("--write-description"); + } + if config.write_info_json.unwrap_or(false) { + command = command.arg("--write-info-json"); + } + if config.write_comments.unwrap_or(false) { + command = command.arg("--write-comments"); + } + if config.write_subs.unwrap_or(false) { + command = command.arg("--write-subs"); + } + if config.audio_only.unwrap_or(false) { + command = command.arg("--extract-audio"); + } + if let Some(audio_format) = &config.audio_format { + command = command.arg("--audio-format").arg(audio_format); + } + + if config.embed_chapters.unwrap_or(true) { + command = command.arg("--embed-chapters"); + } + if config.embed_info_json.unwrap_or(true) { + command = command.arg("--embed-info-json"); + } + if config.embed_metadata.unwrap_or(true) { + command = command.arg("--embed-metadata"); + } + if config.embed_subs.unwrap_or(false) { + command = command.arg("--embed-subs"); + } + if config.embed_thumbnail.unwrap_or(true) { + command = command.arg("--embed-thumbnail"); + } + + if config.split_chapters.unwrap_or(false) { + command = command.arg("--split-chapters"); + } + + if let Some(format) = &config.format { + command = command.arg("--format").arg(format); + } + if let Some(cookie) = &config.cookie { + command = command.arg("--cookies").arg(cookie); + } + + let output = command + .arg("--convert-thumbnails") + .arg(config.thumbnail_format.as_deref().unwrap_or("jpg")) + .arg("-o") + .arg( + config + .output_format + .as_deref() + .unwrap_or("%(title)s.%(ext)s"), + ) + .arg(&video_url) + .output() + .map_err(|_| "yt-dlp command failed".to_string()) + .unwrap(); + + if !output.status.success() { + let error_message = String::from_utf8_lossy(&output.stderr).to_string(); + log::error!("Download for {video_url} failed: {error_message}"); + false + } else { + true + } +} + #[derive(Clone)] pub struct YtDlpModule { config: YtDlpConfig, @@ -29,22 +122,36 @@ impl YtDlpModule { log::info!("Fetching \"{item}\" videos"); match Self::get_latest_entries(item_url, self.config.limit.unwrap_or(10)) { Ok(latest_videos) => { - for (video_title, video_url) in latest_videos { + let mut downloaded_videos = Vec::new(); + + for (video_title, video_url) in &latest_videos { if self.db.check_for_url(&video_url) { log::trace!("Skipping \"{video_title}\" because it was already downloaded"); } else { - match self.download(&video_url, cwd) { - Ok(()) => { - // mark as downloaded - self.db.insert_url(&self.name(), item, &video_url); - self.db.update_new_downloads(&self.name(), item, item_url); - log::info!("Downloaded \"{video_title}\""); - self.webhook_notify(&video_url, &video_title, item, true); - } - Err(e) => { - log::error!("Error downloading \"{video_title}\"; Reason: {e}"); - self.webhook_notify(&video_url, &video_title, item, false); - } + downloaded_videos.push(( + download_yt_dlp_async( + self.config.clone(), + video_url.clone(), + video_title.clone(), + cwd.to_str().unwrap().to_string(), + ), + video_title.clone(), + video_url.clone(), + )); + } + } + + for (video, video_title, video_url) in downloaded_videos { + match video.wait().as_bool().unwrap() { + true => { + // mark as downloaded + self.db.insert_url(&self.name(), item, &video_url); + self.db.update_new_downloads(&self.name(), item, item_url); + log::info!("Downloaded \"{video_title}\""); + self.webhook_notify(&video_url, &video_title, item, true); + } + false => { + self.webhook_notify(&video_url, &video_title, item, false); } } } @@ -157,80 +264,4 @@ impl YtDlpModule { Ok(videos.into_iter().take(limit as usize).collect()) } - - fn download(&self, video_url: &str, cwd: &PathBuf) -> Result<(), String> { - ensure_dir_exists(cwd); - let mut command = Command::new("yt-dlp"); - let mut command = command.current_dir(cwd).arg("--downloader").arg("aria2c"); - - if self.config.write_thumbnail.unwrap_or(true) { - command = command.arg("--write-thumbnail"); - } - if self.config.write_description.unwrap_or(false) { - command = command.arg("--write-description"); - } - if self.config.write_info_json.unwrap_or(false) { - command = command.arg("--write-info-json"); - } - if self.config.write_comments.unwrap_or(false) { - command = command.arg("--write-comments"); - } - if self.config.write_subs.unwrap_or(false) { - command = command.arg("--write-subs"); - } - if self.config.audio_only.unwrap_or(false) { - command = command.arg("--extract-audio"); - } - if let Some(audio_format) = &self.config.audio_format { - command = command.arg("--audio-format").arg(audio_format); - } - - if self.config.embed_chapters.unwrap_or(true) { - command = command.arg("--embed-chapters"); - } - if self.config.embed_info_json.unwrap_or(true) { - command = command.arg("--embed-info-json"); - } - if self.config.embed_metadata.unwrap_or(true) { - command = command.arg("--embed-metadata"); - } - if self.config.embed_subs.unwrap_or(false) { - command = command.arg("--embed-subs"); - } - if self.config.embed_thumbnail.unwrap_or(true) { - command = command.arg("--embed-thumbnail"); - } - - if self.config.split_chapters.unwrap_or(false) { - command = command.arg("--split-chapters"); - } - - if let Some(format) = &self.config.format { - command = command.arg("--format").arg(format); - } - if let Some(cookie) = &self.config.cookie { - command = command.arg("--cookies").arg(cookie); - } - - let output = command - .arg("--convert-thumbnails") - .arg(self.config.thumbnail_format.as_deref().unwrap_or("jpg")) - .arg("-o") - .arg( - self.config - .output_format - .as_deref() - .unwrap_or("%(title)s.%(ext)s"), - ) - .arg(video_url) - .output() - .map_err(|_| "yt-dlp command failed".to_string())?; - - if !output.status.success() { - let error_message = String::from_utf8_lossy(&output.stderr).to_string(); - return Err(error_message); - } - - Ok(()) - } }