🐛 cross channel parallel downloads

This commit is contained in:
JMARyA 2025-03-09 06:13:33 +01:00
parent b6b4e6253c
commit 010c412824
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
2 changed files with 49 additions and 28 deletions

4
Cargo.lock generated
View file

@ -241,7 +241,7 @@ dependencies = [
[[package]] [[package]]
name = "comrade" name = "comrade"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.hydrar.de/jmarya/comrade#00e448ac316623bb27cc3a3b799fc201cd6302d7" source = "git+https://git.hydrar.de/jmarya/comrade#b043db3f4ebff63aebeaf6eef3a201761fc8cd3f"
dependencies = [ dependencies = [
"comrade-macro", "comrade-macro",
"crossbeam", "crossbeam",
@ -260,7 +260,7 @@ dependencies = [
[[package]] [[package]]
name = "comrade-macro" name = "comrade-macro"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.hydrar.de/jmarya/comrade#00e448ac316623bb27cc3a3b799fc201cd6302d7" source = "git+https://git.hydrar.de/jmarya/comrade#b043db3f4ebff63aebeaf6eef3a201761fc8cd3f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View file

@ -6,11 +6,9 @@ use std::{
}; };
pub mod config; pub mod config;
use comrade::job::{JobDispatcher, JobOrder}; use comrade::job::{JobResult, LabelPendingTaskIterator};
use comrade::service::ServiceManager;
use comrade::worker; use comrade::worker;
use config::YtDlpConfig; use config::YtDlpConfig;
use crossbeam::channel::Receiver;
use crate::ensure_dir_exists; use crate::ensure_dir_exists;
@ -118,48 +116,46 @@ impl YtDlpModule {
} }
} }
fn check_item(&self, item: &str, item_url: &str, cwd: &PathBuf) { fn check_item(
&self,
item: &str,
item_url: &str,
cwd: &PathBuf,
) -> Vec<(
(String, String, String, String),
JobResult<serde_json::Value>,
)> {
let mut downloads_tasks = Vec::new();
log::info!("Fetching \"{item}\" videos"); log::info!("Fetching \"{item}\" videos");
match Self::get_latest_entries(item_url, self.config.limit.unwrap_or(10)) { match Self::get_latest_entries(item_url, self.config.limit.unwrap_or(10)) {
Ok(latest_videos) => { Ok(latest_videos) => {
let mut downloaded_videos = Vec::new();
for (video_title, video_url) in &latest_videos { for (video_title, video_url) in &latest_videos {
if self.db.check_for_url(&video_url) { if self.db.check_for_url(&video_url) {
log::trace!("Skipping \"{video_title}\" because it was already downloaded"); log::trace!("Skipping \"{video_title}\" because it was already downloaded");
} else { } else {
downloaded_videos.push(( downloads_tasks.push((
(
video_title.clone(),
video_url.clone(),
item.to_string(),
item_url.to_string(),
),
download_yt_dlp_async( download_yt_dlp_async(
self.config.clone(), self.config.clone(),
video_url.clone(), video_url.clone(),
video_title.clone(), video_title.clone(),
cwd.to_str().unwrap().to_string(), cwd.to_str().unwrap().to_string(),
), ),
video_title.clone(),
video_url.clone(),
)); ));
} }
} }
for (video, video_title, video_url) in downloaded_videos {
match video.wait().as_bool().unwrap() {
true => {
// mark as downloaded
self.db.insert_url(&self.name(), item, &video_url);
self.db.update_new_downloads(&self.name(), item, item_url);
log::info!("Downloaded \"{video_title}\"");
self.webhook_notify(&video_url, &video_title, item, true);
}
false => {
self.webhook_notify(&video_url, &video_title, item, false);
}
}
}
} }
Err(e) => { Err(e) => {
log::error!("Could not get videos from \"{item}\". Reason: {e}"); log::error!("Could not get videos from \"{item}\". Reason: {e}");
} }
} }
downloads_tasks
} }
pub fn name(&self) -> String { pub fn name(&self) -> String {
@ -173,26 +169,51 @@ impl YtDlpModule {
loop { loop {
log::info!("Running {} Module", self.name()); log::info!("Running {} Module", self.name());
log::info!("Checking {} items", self.config.items.len()); log::info!("Checking {} items", self.config.items.len());
let mut download_tasks = Vec::new();
for (item, item_url) in &self.config.items { for (item, item_url) in &self.config.items {
match item_url { match item_url {
toml::Value::String(item_url) => { toml::Value::String(item_url) => {
self.check_item(item, item_url, &self.root_dir.join(item)); let downloads = self.check_item(item, item_url, &self.root_dir.join(item));
download_tasks.extend(downloads);
} }
toml::Value::Array(_) => todo!(), toml::Value::Array(_) => todo!(),
toml::Value::Table(cat) => { toml::Value::Table(cat) => {
let category = item; let category = item;
for (item, item_url) in cat { for (item, item_url) in cat {
let item_url = item_url.as_str().unwrap(); let item_url = item_url.as_str().unwrap();
self.check_item( let downloads = self.check_item(
item, item,
item_url, item_url,
&self.root_dir.join(category).join(item), &self.root_dir.join(category).join(item),
); );
download_tasks.extend(downloads);
} }
} }
_ => {} _ => {}
} }
} }
log::info!("{} finished scan. Still downloading...", self.name());
for ((video_title, video_url, item, item_url), video) in
LabelPendingTaskIterator(download_tasks)
{
match video.as_bool().unwrap() {
true => {
// mark as downloaded
self.db.insert_url(&self.name(), &item, &video_url);
self.db.update_new_downloads(&self.name(), &item, &item_url);
log::info!("Downloaded \"{video_title}\"");
self.webhook_notify(&video_url, &video_title, &item, true);
}
false => {
self.webhook_notify(&video_url, &video_title, &item, false);
}
}
}
log::info!( log::info!(
"{} complete. Sleeping for {} minutes...", "{} complete. Sleeping for {} minutes...",
self.name(), self.name(),