multithreaded downloading

This commit is contained in:
JMARyA 2025-03-08 21:51:42 +01:00
parent c4c54f78d6
commit d463b48ec2
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
4 changed files with 328 additions and 110 deletions

220
Cargo.lock generated
View file

@ -97,6 +97,12 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "atoi"
version = "2.0.0"
@ -222,14 +228,44 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"memchr",
]
[[package]]
name = "comrade"
version = "0.1.0"
source = "git+https://git.hydrar.de/jmarya/comrade#5cb4facc4872a961a3ad3b11d4b48d01016b6600"
source = "git+https://git.hydrar.de/jmarya/comrade#00e448ac316623bb27cc3a3b799fc201cd6302d7"
dependencies = [
"comrade-macro",
"crossbeam",
"dashmap",
"env_logger",
"log",
"once_cell",
"rand 0.9.0",
"redis",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]]
name = "comrade-macro"
version = "0.1.0"
source = "git+https://git.hydrar.de/jmarya/comrade#00e448ac316623bb27cc3a3b799fc201cd6302d7"
dependencies = [
"proc-macro2",
"quote",
"serde_json",
"syn",
]
[[package]]
@ -287,6 +323,47 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.12"
@ -312,6 +389,20 @@ dependencies = [
"typenum",
]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "der"
version = "0.7.9"
@ -614,6 +705,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.15.2"
@ -631,7 +728,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
dependencies = [
"hashbrown",
"hashbrown 0.15.2",
]
[[package]]
@ -670,6 +767,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"comrade",
"crossbeam",
"env_logger",
"log",
"reqwest",
@ -935,7 +1033,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.15.2",
]
[[package]]
@ -1085,6 +1183,16 @@ dependencies = [
"tempfile",
]
[[package]]
name = "num-bigint"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.4"
@ -1097,7 +1205,7 @@ dependencies = [
"num-integer",
"num-iter",
"num-traits",
"rand",
"rand 0.8.5",
"smallvec",
"zeroize",
]
@ -1280,7 +1388,7 @@ version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [
"zerocopy",
"zerocopy 0.7.35",
]
[[package]]
@ -1308,8 +1416,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"zerocopy 0.8.23",
]
[[package]]
@ -1319,7 +1438,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.6.4",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.3",
]
[[package]]
@ -1331,6 +1460,32 @@ dependencies = [
"getrandom 0.2.15",
]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.1",
]
[[package]]
name = "redis"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8034fb926579ff49d3fe58d288d5dcb580bf11e9bccd33224b45adebf0fd0c23"
dependencies = [
"arc-swap",
"combine",
"itoa",
"num-bigint",
"percent-encoding",
"ryu",
"sha1_smol",
"socket2",
"url",
]
[[package]]
name = "redox_syscall"
version = "0.5.10"
@ -1422,7 +1577,7 @@ dependencies = [
"num-traits",
"pkcs1",
"pkcs8",
"rand_core",
"rand_core 0.6.4",
"signature",
"spki",
"subtle",
@ -1437,9 +1592,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustix"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f8dcd64f141950290e45c99f7710ede1b600297c91818bb30b3667c0f45dc0"
checksum = "dade4812df5c384711475be5fcd8c162555352945401aed22a35bffeab61f657"
dependencies = [
"bitflags 2.9.0",
"errno",
@ -1571,6 +1726,12 @@ dependencies = [
"digest",
]
[[package]]
name = "sha1_smol"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "sha2"
version = "0.10.8"
@ -1604,7 +1765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
"digest",
"rand_core",
"rand_core 0.6.4",
]
[[package]]
@ -1683,7 +1844,7 @@ dependencies = [
"futures-intrusive",
"futures-io",
"futures-util",
"hashbrown",
"hashbrown 0.15.2",
"hashlink",
"indexmap",
"log",
@ -1772,7 +1933,7 @@ dependencies = [
"memchr",
"once_cell",
"percent-encoding",
"rand",
"rand 0.8.5",
"rsa",
"serde",
"sha1",
@ -1812,7 +1973,7 @@ dependencies = [
"md-5",
"memchr",
"once_cell",
"rand",
"rand 0.8.5",
"serde",
"serde_json",
"sha2",
@ -1983,9 +2144,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.43.0"
version = "1.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a"
dependencies = [
"backtrace",
"bytes",
@ -2189,6 +2350,9 @@ name = "uuid"
version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587"
dependencies = [
"getrandom 0.3.1",
]
[[package]]
name = "vcpkg"
@ -2557,7 +2721,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive",
"zerocopy-derive 0.7.35",
]
[[package]]
name = "zerocopy"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6"
dependencies = [
"zerocopy-derive 0.8.23",
]
[[package]]
@ -2571,6 +2744,17 @@ dependencies = [
"syn",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zerofrom"
version = "0.1.6"

View file

@ -16,3 +16,4 @@ reqwest = { version = "0.11.26", features = ["blocking", "json"] }
sqlx = { version = "0.8", features = ["postgres", "sqlite", "runtime-tokio-native-tls", "derive", "uuid", "chrono", "json"] }
tokio = { version = "1.42.0", features = ["full"] }
comrade = { git = "https://git.hydrar.de/jmarya/comrade" }
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }

View file

@ -1,4 +1,5 @@
use hoard::config::GlobalConfig;
use hoard::yt_dlp::download_yt_dlp_init;
use hoard::{ensure_dir_exists, Module};
// todo : migrate to async code?
@ -26,6 +27,9 @@ async fn main() {
toml::from_str(&std::fs::read_to_string("config.toml").unwrap()).unwrap();
ensure_dir_exists(&config.hoard.data_dir);
let mut sm = comrade::service::ServiceManager::new();
sm = download_yt_dlp_init(sm);
let db = hoard::db::DatabaseBackend::new(&config.hoard.database).await;
let mut modules: Vec<Module> = vec![];
@ -60,10 +64,8 @@ async fn main() {
)));
}
let mut sm = comrade::service::ServiceManager::new();
for module in modules {
sm.register(&module.name(), move |_| {
sm = sm.register(&module.name(), move |_| {
module.run();
});
}

View file

@ -2,13 +2,106 @@ use std::{
io::{BufRead, BufReader},
path::PathBuf,
process::Command,
str::FromStr,
};
pub mod config;
use comrade::job::{JobDispatcher, JobOrder};
use comrade::service::ServiceManager;
use comrade::worker;
use config::YtDlpConfig;
use crossbeam::channel::Receiver;
use crate::ensure_dir_exists;
#[worker(3)]
fn download_yt_dlp(
config: YtDlpConfig,
video_url: String,
video_title: String,
cwd: String,
) -> bool {
log::info!("Started downloading {video_title}");
let cwd = PathBuf::from_str(&cwd).unwrap();
ensure_dir_exists(&cwd);
let mut command = Command::new("yt-dlp");
let mut command = command.current_dir(cwd).arg("--downloader").arg("aria2c");
if config.write_thumbnail.unwrap_or(true) {
command = command.arg("--write-thumbnail");
}
if config.write_description.unwrap_or(false) {
command = command.arg("--write-description");
}
if config.write_info_json.unwrap_or(false) {
command = command.arg("--write-info-json");
}
if config.write_comments.unwrap_or(false) {
command = command.arg("--write-comments");
}
if config.write_subs.unwrap_or(false) {
command = command.arg("--write-subs");
}
if config.audio_only.unwrap_or(false) {
command = command.arg("--extract-audio");
}
if let Some(audio_format) = &config.audio_format {
command = command.arg("--audio-format").arg(audio_format);
}
if config.embed_chapters.unwrap_or(true) {
command = command.arg("--embed-chapters");
}
if config.embed_info_json.unwrap_or(true) {
command = command.arg("--embed-info-json");
}
if config.embed_metadata.unwrap_or(true) {
command = command.arg("--embed-metadata");
}
if config.embed_subs.unwrap_or(false) {
command = command.arg("--embed-subs");
}
if config.embed_thumbnail.unwrap_or(true) {
command = command.arg("--embed-thumbnail");
}
if config.split_chapters.unwrap_or(false) {
command = command.arg("--split-chapters");
}
if let Some(format) = &config.format {
command = command.arg("--format").arg(format);
}
if let Some(cookie) = &config.cookie {
command = command.arg("--cookies").arg(cookie);
}
let output = command
.arg("--convert-thumbnails")
.arg(config.thumbnail_format.as_deref().unwrap_or("jpg"))
.arg("-o")
.arg(
config
.output_format
.as_deref()
.unwrap_or("%(title)s.%(ext)s"),
)
.arg(&video_url)
.output()
.map_err(|_| "yt-dlp command failed".to_string())
.unwrap();
if !output.status.success() {
let error_message = String::from_utf8_lossy(&output.stderr).to_string();
log::error!("Download for {video_url} failed: {error_message}");
false
} else {
true
}
}
#[derive(Clone)]
pub struct YtDlpModule {
config: YtDlpConfig,
@ -29,22 +122,36 @@ impl YtDlpModule {
log::info!("Fetching \"{item}\" videos");
match Self::get_latest_entries(item_url, self.config.limit.unwrap_or(10)) {
Ok(latest_videos) => {
for (video_title, video_url) in latest_videos {
let mut downloaded_videos = Vec::new();
for (video_title, video_url) in &latest_videos {
if self.db.check_for_url(&video_url) {
log::trace!("Skipping \"{video_title}\" because it was already downloaded");
} else {
match self.download(&video_url, cwd) {
Ok(()) => {
// mark as downloaded
self.db.insert_url(&self.name(), item, &video_url);
self.db.update_new_downloads(&self.name(), item, item_url);
log::info!("Downloaded \"{video_title}\"");
self.webhook_notify(&video_url, &video_title, item, true);
}
Err(e) => {
log::error!("Error downloading \"{video_title}\"; Reason: {e}");
self.webhook_notify(&video_url, &video_title, item, false);
}
downloaded_videos.push((
download_yt_dlp_async(
self.config.clone(),
video_url.clone(),
video_title.clone(),
cwd.to_str().unwrap().to_string(),
),
video_title.clone(),
video_url.clone(),
));
}
}
for (video, video_title, video_url) in downloaded_videos {
match video.wait().as_bool().unwrap() {
true => {
// mark as downloaded
self.db.insert_url(&self.name(), item, &video_url);
self.db.update_new_downloads(&self.name(), item, item_url);
log::info!("Downloaded \"{video_title}\"");
self.webhook_notify(&video_url, &video_title, item, true);
}
false => {
self.webhook_notify(&video_url, &video_title, item, false);
}
}
}
@ -157,80 +264,4 @@ impl YtDlpModule {
Ok(videos.into_iter().take(limit as usize).collect())
}
fn download(&self, video_url: &str, cwd: &PathBuf) -> Result<(), String> {
ensure_dir_exists(cwd);
let mut command = Command::new("yt-dlp");
let mut command = command.current_dir(cwd).arg("--downloader").arg("aria2c");
if self.config.write_thumbnail.unwrap_or(true) {
command = command.arg("--write-thumbnail");
}
if self.config.write_description.unwrap_or(false) {
command = command.arg("--write-description");
}
if self.config.write_info_json.unwrap_or(false) {
command = command.arg("--write-info-json");
}
if self.config.write_comments.unwrap_or(false) {
command = command.arg("--write-comments");
}
if self.config.write_subs.unwrap_or(false) {
command = command.arg("--write-subs");
}
if self.config.audio_only.unwrap_or(false) {
command = command.arg("--extract-audio");
}
if let Some(audio_format) = &self.config.audio_format {
command = command.arg("--audio-format").arg(audio_format);
}
if self.config.embed_chapters.unwrap_or(true) {
command = command.arg("--embed-chapters");
}
if self.config.embed_info_json.unwrap_or(true) {
command = command.arg("--embed-info-json");
}
if self.config.embed_metadata.unwrap_or(true) {
command = command.arg("--embed-metadata");
}
if self.config.embed_subs.unwrap_or(false) {
command = command.arg("--embed-subs");
}
if self.config.embed_thumbnail.unwrap_or(true) {
command = command.arg("--embed-thumbnail");
}
if self.config.split_chapters.unwrap_or(false) {
command = command.arg("--split-chapters");
}
if let Some(format) = &self.config.format {
command = command.arg("--format").arg(format);
}
if let Some(cookie) = &self.config.cookie {
command = command.arg("--cookies").arg(cookie);
}
let output = command
.arg("--convert-thumbnails")
.arg(self.config.thumbnail_format.as_deref().unwrap_or("jpg"))
.arg("-o")
.arg(
self.config
.output_format
.as_deref()
.unwrap_or("%(title)s.%(ext)s"),
)
.arg(video_url)
.output()
.map_err(|_| "yt-dlp command failed".to_string())?;
if !output.status.success() {
let error_message = String::from_utf8_lossy(&output.stderr).to_string();
return Err(error_message);
}
Ok(())
}
}