diff --git a/Cargo.lock b/Cargo.lock index 4dd1509..5bf9466 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,9 +388,11 @@ dependencies = [ "bytes", "common", "communicator", + "futures-util", "h2", "lazy_static", "log", + "reqwest", "serde", "serde_json", "simplelog", diff --git a/client/Cargo.toml b/client/Cargo.toml index 9b3f120..57abeb7 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,4 +18,6 @@ simplelog = { workspace = true } anyhow = { workspace = true } h2 = { workspace = true } serde_json = { workspace = true } -tokio-util = {workspace = true} +tokio-util = { workspace = true } +reqwest = { workspace = true } +futures-util = { workspace = true } diff --git a/client/src/config.rs b/client/src/config.rs index 025e9a1..5d42bae 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -11,6 +11,12 @@ pub struct ClientConfig { pub profiles: HashMap, } +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct DynamicLoad { + pub url: String, + pub map: HashMap, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Profile { #[serde(flatten)] @@ -18,4 +24,51 @@ pub struct Profile { pub path: String, pub interval: Option, pub concurrent: Option, + dynamic_load: Option, +} + +impl Profile { + pub fn need_update(&self) -> bool { + self.dynamic_load.is_some() + } + + pub async fn update(&mut self) -> anyhow::Result<()> { + let Some(load) = &self.dynamic_load else { + return Ok(()); + }; + let response = reqwest::get(&load.url).await?; + let body = response.text().await?; + let data: serde_json::Value = serde_json::from_str(&body)?; + if !data.is_object() { + return Err(anyhow::Error::msg("response is not an object")); + }; + let obj = data.as_object().unwrap(); + macro_rules! load_field { + (from $obj:expr; $($tail:tt)*) => { + load_field!(@munch $obj, $($tail)*); + }; + (@munch $obj:expr, $field:ident$(.$f_more:ident)* = $key:ident as $t:ident $($rest:tt)*) => { + load_field!(@suffix $obj, ($field$(.$f_more)*), $key, $t, [], $($rest)*); + }; + (@suffix $obj:expr, ($($f_full:tt)*), $key:ident, $t:ident, [$($acc:tt)*], ; $($rest:tt)*) => { + if let Some(k) = load.map.get(stringify!($key)) { + if let Some(serde_json::Value::$t(v)) = $obj.get(k) { + $($f_full)* = Some(v.clone() $($acc)*); + } + } + load_field!(@munch $obj, $($rest)*); + }; + (@suffix $obj:expr, ($($f_full:tt)*), $key:ident, $t:ident, [$($acc:tt)*], $token:tt $($rest:tt)*) => { + load_field!(@suffix $obj, ($($f_full)*), $key, $t, [$($acc)* $token], $($rest)*); + }; + (@munch $obj:expr, ) => {}; + } + load_field! { + from obj; + self.sync_context.asset_version = asset_version as String; + self.sync_context.asset_hash = asset_hash as String; + self.concurrent = concurrent as Number.as_u64().ok_or(anyhow::Error::msg("concurrent is not usize"))? as usize; + } + Ok(()) + } } diff --git a/client/src/main.rs b/client/src/main.rs index 2b564da..376422a 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -2,16 +2,17 @@ use crate::config::{ClientConfig, Profile}; use crate::task::run; use common::strings::REGION_NOT_FOUND; use communicator::{ClientManager, Identity, TunnelEndpoint, TunnelListener, connect_tunnel}; +use futures_util::future::join_all; use lazy_static::lazy_static; use log::{LevelFilter, error, info}; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use std::collections::{HashMap, VecDeque}; use std::fs; use std::str::FromStr; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; -use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc}; +use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore, mpsc}; use tokio::task::JoinSet; use tokio::time::sleep; use tokio_util::sync::CancellationToken; @@ -51,17 +52,23 @@ async fn main() -> anyhow::Result<()> { std::process::exit(1); } - let profiles = command_opts - .profile - .iter() - .map(|p| { - let profile = CONFIG.profiles.get(p.as_str()); - match profile { - Some(profile) => Ok((p.to_string(), profile.clone())), - None => Err(anyhow::anyhow!("Profile `{}` not found in config", p)), + let profiles = join_all(command_opts.profile.iter().map(async |p| { + let profile = CONFIG.profiles.get(p.as_str()); + match profile { + Some(profile) => { + let mut profile = profile.clone(); + if profile.need_update() { + info!("Loading async config of {}", p); + profile.update().await?; + } + Ok((p.to_string(), Arc::new(RwLock::new(profile)))) } - }) - .collect::, _>>()?; + None => Err(anyhow::anyhow!("Profile `{}` not found in config", p)), + } + })) + .await + .into_iter() + .collect::, _>>()?; let mut join_set = JoinSet::new(); @@ -121,15 +128,25 @@ async fn main() -> anyhow::Result<()> { let semaphore = Arc::new(Semaphore::new(1)); let cancel_token = CancellationToken::new(); let post_task = { - async |profile: Arc<(String, Profile)>, + async |profile: Arc<(String, Arc>)>, permit: OwnedSemaphorePermit, cancel_token: CancellationToken| { - match profile.1.interval { + let (interval, need_update) = { + let p = profile.1.read().await; + (p.interval, p.need_update()) + }; + match interval { None => { cancel_token.cancel(); } Some(interval) => { sleep(Duration::from_secs(interval)).await; + if need_update { + info!("Trying to update profile for {}", profile.0); + if let Err(e) = profile.1.write().await.update().await { + error!("Failed to update profile for {}: {}", profile.0, e); + } + } } } drop(permit); @@ -263,13 +280,13 @@ async fn main() -> anyhow::Result<()> { } struct Sender { - inner: RwLock>, + inner: std::sync::RwLock>, } impl Sender { pub fn new() -> Self { Self { - inner: RwLock::new(VecDeque::new()), + inner: std::sync::RwLock::new(VecDeque::new()), } } diff --git a/client/src/task.rs b/client/src/task.rs index ffb62cc..8c0586b 100644 --- a/client/src/task.rs +++ b/client/src/task.rs @@ -6,27 +6,22 @@ use log::{error, info}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc}; use tokio::sync::{RwLock, Semaphore}; use tokio::task::JoinSet; pub async fn run( client: Arc, - profile: Arc<(String, Profile)>, + profile: Arc<(String, Arc>)>, ) -> anyhow::Result { info!("[{}]: Starting sync", profile.0); - tokio::fs::create_dir_all(&profile.1.path).await?; - let sync_resp = sync(&mut client.get_client().await?, &profile.1).await?; + let p1 = Arc::new(profile.1.read().await.clone()); + tokio::fs::create_dir_all(&p1.path).await?; + let sync_resp = sync(&mut client.get_client().await?, &p1).await?; let id = sync_resp.id; let local_manifest = Arc::new( - AutoSaveManifest::new( - 5, - Path::new(&profile.1.path) - .join("manifest.json") - .to_path_buf(), - ) - .await?, + AutoSaveManifest::new(5, Path::new(&p1.path).join("manifest.json").to_path_buf()).await?, ); let manifest_snapshot = { local_manifest.manifest.read().await.clone() }; let all_cnt = sync_resp.tasks.len(); @@ -53,7 +48,7 @@ pub async fn run( close(&mut client.get_client().await?, &req).await?; return Ok(true); } - let n = profile.1.concurrent.unwrap_or(5); + let n = p1.concurrent.unwrap_or(5); info!("[{}]: Start sync with {} thread", profile.0, n); let semaphore = Arc::new(Semaphore::new(n)); let mut join_set = JoinSet::new(); @@ -62,18 +57,18 @@ pub async fn run( let client = client.clone(); let id = id.clone(); let local_manifest = local_manifest.clone(); - let profile = profile.clone(); + let p1 = p1.clone(); join_set.spawn(async move { let req = DownloadRequest { id: id.clone(), task: task.clone(), }; - let result = download(&mut client.get_client().await.unwrap(), &req, &profile.1).await; + let result = download(&mut client.get_client().await.unwrap(), &req, &p1).await; if let Err(e) = result && let Some(_) = e.downcast_ref::() { - download(&mut client.get_client().await.unwrap(), &req, &profile.1) + download(&mut client.get_client().await.unwrap(), &req, &p1) .await .unwrap(); } diff --git a/sekai-unpacker-client.yaml b/sekai-unpacker-client.yaml index bc8af83..ab18448 100644 --- a/sekai-unpacker-client.yaml +++ b/sekai-unpacker-client.yaml @@ -58,8 +58,11 @@ profiles: - "stamp" skip: [ ] file_ext: [ ] - asset_version: 6.4.0.30 - asset_hash: cce60d07-d60e-48dd-be22-12eac2e67950 + dynamic_load: + url: "https://github.com/Team-Haruki/haruki-sekai-master/raw/refs/heads/main/versions/current_version.json" + map: + asset_version: assetVersion + asset_hash: assetHash export: by_category: false usm: @@ -81,5 +84,5 @@ profiles: convert_to_mp3: false convert_to_flac: false remove_wav: false -# exact_single_file_bundle: true + exact_single_file_bundle: true path: "./data/jp" \ No newline at end of file