From d9461d1ef099d971ba00b5d071754fde1fe13238 Mon Sep 17 00:00:00 2001 From: Bluemangoo Date: Fri, 10 Apr 2026 20:10:54 +0800 Subject: [PATCH] fix multi client --- client/Cargo.toml | 2 +- client/src/main.rs | 159 +++++++++++++++++++++++++++---------- client/src/task.rs | 11 +-- sekai-unpacker-client.yaml | 36 ++++++++- sekai-unpacker-server.yaml | 6 +- server/src/main.rs | 27 ++++--- 6 files changed, 179 insertions(+), 62 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 4e13e6f..9b3f120 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,4 +18,4 @@ simplelog = { workspace = true } anyhow = { workspace = true } h2 = { workspace = true } serde_json = { workspace = true } -tokio-util = "0.7.18" +tokio-util = {workspace = true} diff --git a/client/src/main.rs b/client/src/main.rs index 97a1277..e963b69 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -1,15 +1,17 @@ use crate::config::{ClientConfig, Profile}; use crate::task::run; -use communicator::{Identity, TunnelEndpoint, TunnelListener, connect_tunnel}; +use communicator::{ClientManager, Identity, TunnelEndpoint, TunnelListener, connect_tunnel}; use lazy_static::lazy_static; use log::{LevelFilter, error, info}; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; +use std::collections::{HashMap, VecDeque}; use std::fs; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use structopt::StructOpt; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc}; +use tokio::task::JoinSet; use tokio::time::sleep; use tokio_util::sync::CancellationToken; @@ -60,7 +62,59 @@ async fn main() -> anyhow::Result<()> { }) .collect::, _>>()?; - let mut tasks = vec![]; + let mut join_set = JoinSet::new(); + + let mut senders: HashMap>>> = HashMap::new(); + for profile in &profiles { + senders.insert(profile.0.clone(), Arc::new(Sender::new())); + } + let (liveness_tx, mut liveness_rx) = mpsc::channel::<()>(1); + let senders = Arc::new(senders); + let cancel_token = CancellationToken::new(); + { + let cancel_token = cancel_token.clone(); + join_set.spawn(async move { + if liveness_rx.recv().await.is_none() { + cancel_token.cancel(); + } + }); + } + + for server_conf in &CONFIG.connect.server { + let server_conf = server_conf.clone().into_tunnel_config(Identity::Client)?; + let url = server_conf.url.clone(); + let server = TunnelListener::bind(server_conf).await?; + let senders = senders.clone(); + let cancel_token = cancel_token.clone(); + info!("tcp server started on {}", url); + join_set.spawn(async move { + loop { + tokio::select! { + _ = cancel_token.cancelled() => { + info!("tcp server {} stopped", url); + break; + } + + endpoint_res = server.accept() => { + let endpoint = match endpoint_res { + Ok(e) => e, + Err(e) => { + error!("Failed to accept connection: {}", e); + continue; + } + }; + + if let TunnelEndpoint::Client(client) = endpoint { + for (_, sender) in senders.iter() { + sender.send(client.clone()); + } + } + } + } + } + }); + } + for profile in profiles { let profile = Arc::new(profile.clone()); let semaphore = Arc::new(Semaphore::new(1)); @@ -80,48 +134,45 @@ async fn main() -> anyhow::Result<()> { drop(permit); } }; - for server_conf in &CONFIG.connect.server { - let server_conf = server_conf.clone().into_tunnel_config(Identity::Client)?; - let url = server_conf.url.clone(); - let server = TunnelListener::bind(server_conf).await?; + + { + let sender = senders.get(&profile.0).unwrap().clone(); let semaphore = semaphore.clone(); let cancel_token = cancel_token.clone(); let profile = profile.clone(); - info!("tcp server started on {}", url); - tasks.push(tokio::task::spawn(async move { + let liveness_tx = liveness_tx.clone(); + join_set.spawn(async move { + let _guard = liveness_tx; loop { - let endpoint = server - .accept() - .await - .map_err(|e| error!("Failed to accept connection: {}", e)); - let endpoint = if let Ok(endpoint) = endpoint { - endpoint - } else { + let client = sender.recv(); + if client.is_none() { continue; - }; - if let TunnelEndpoint::Client(client) = endpoint { - loop { - if client.get_client().await.is_err() { - break; + } + let client = client.unwrap(); + if cancel_token.is_cancelled() { + return; + } + loop { + if client.get_client().await.is_err() { + break; + } + if cancel_token.is_cancelled() { + return; + } + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let result = run(client.clone(), profile.clone()).await; + match result { + Ok(true) => { + post_task(profile.clone(), permit, cancel_token.clone()).await; } - if cancel_token.is_cancelled() { - return; - } - let permit = semaphore.clone().acquire_owned().await.unwrap(); - let result = run(client.clone(), profile.clone()).await; - match result { - Ok(true) => { - post_task(profile.clone(), permit, cancel_token.clone()).await; - } - Err(error) => { - error!("{}", error); - } - _ => {} + Err(error) => { + error!("{}", error); } + _ => {} } } } - })); + }); } for client_conf in &CONFIG.connect.client { @@ -130,7 +181,7 @@ async fn main() -> anyhow::Result<()> { let cancel_token = cancel_token.clone(); let profile = profile.clone(); info!("tcp client started for {}", client_conf.url); - tasks.push(tokio::task::spawn(async move { + join_set.spawn(async move { loop { if cancel_token.is_cancelled() { return; @@ -164,18 +215,46 @@ async fn main() -> anyhow::Result<()> { } } } + if cancel_token.is_cancelled() { + return; + } sleep(Duration::from_secs(10)).await; } - })); + }); } } - for task in tasks { - let _ = task.await.map_err(|e| error!("{}", e)); + drop(liveness_tx); + + while let Some(res) = join_set.join_next().await { + if let Err(e) = res { + error!("{}", e); + } } Ok(()) } + +struct Sender { + inner: RwLock>, +} + +impl Sender { + pub fn new() -> Self { + Self { + inner: RwLock::new(VecDeque::new()), + } + } + + pub fn send(&self, item: T) { + self.inner.write().unwrap().push_back(item); + } + + pub fn recv(&self) -> Option { + self.inner.write().unwrap().pop_front() + } +} + lazy_static! { pub static ref CONFIG: ClientConfig = { let raw = fs::read_to_string("sekai-unpacker-client.yaml").unwrap(); diff --git a/client/src/task.rs b/client/src/task.rs index 2b04d67..c56c547 100644 --- a/client/src/task.rs +++ b/client/src/task.rs @@ -9,12 +9,14 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::{RwLock, Semaphore}; +use tokio::task::JoinSet; pub async fn run( client: Arc, profile: Arc<(String, Profile)>, ) -> anyhow::Result { info!("[{}]: Starting sync", profile.0); + tokio::fs::create_dir_all(&profile.1.path).await?; let sync_resp = sync(&mut client.get_client().await?, &profile.1).await?; let id = sync_resp.id; let local_manifest = Arc::new( @@ -41,7 +43,7 @@ pub async fn run( info!("[{}]: Collected {} tasks", profile.0, tasks.len()); let n = 5; let semaphore = Arc::new(Semaphore::new(n)); - let mut handles = Vec::new(); + let mut join_set = JoinSet::new(); for task in tasks { let permit = semaphore.clone().acquire_owned().await?; let client = client.clone(); @@ -49,7 +51,7 @@ pub async fn run( let local_manifest = local_manifest.clone(); let profile = profile.clone(); - handles.push(tokio::task::spawn(async move { + join_set.spawn(async move { let req = DownloadRequest { id: id.clone(), task: task.clone(), @@ -68,12 +70,11 @@ pub async fn run( .await .unwrap(); drop(permit); - })); + }); } let mut succeed = 0; let mut failed = 0; - for handle in handles { - let r = handle.await; + while let Some(r) = join_set.join_next().await { if let Err(e) = r { error!("{}", e); failed += 1; diff --git a/sekai-unpacker-client.yaml b/sekai-unpacker-client.yaml index dd2ec9c..22c450b 100644 --- a/sekai-unpacker-client.yaml +++ b/sekai-unpacker-client.yaml @@ -12,7 +12,7 @@ server: profiles: cn: region: cn - interval: 3 + # interval: 3 # seconds filters: start_app: - "thumbnail" @@ -40,4 +40,36 @@ profiles: convert_to_mp3: false convert_to_flac: false remove_wav: false - path: "./data/cn" \ No newline at end of file + path: "./data/cn" + jp: + region: jp + filters: + start_app: + - "thumbnail" + on_demand: [ ] + skip: [ ] + file_ext: [ ] + asset_version: 6.4.0.30 + asset_hash: cce60d07-d60e-48dd-be22-12eac2e67950 + export: + by_category: false + usm: + export: true + decode: true + acb: + export: true + decode: true + hca: + decode: true + images: + convert_to_webp: false + remove_png: false + video: + convert_to_mp4: false + direct_usm_to_mp4_with_ffmpeg: false + remove_m2v: false + audio: + convert_to_mp3: false + convert_to_flac: false + remove_wav: false + path: "./data/jp" \ No newline at end of file diff --git a/sekai-unpacker-server.yaml b/sekai-unpacker-server.yaml index acba243..f73a3d2 100644 --- a/sekai-unpacker-server.yaml +++ b/sekai-unpacker-server.yaml @@ -34,10 +34,10 @@ regions: enabled: true provider: kind: colorful_palette - asset_info_url_template: "" - asset_bundle_url_template: "" + asset_info_url_template: "https://{env}-{hash}-assetbundle-info.sekai.colorfulpalette.org/api/version/{asset_version}/{asset_hash}/os/ios" + asset_bundle_url_template: "https://{env}-{hash}-assetbundle.sekai.colorfulpalette.org/{asset_version}/{asset_hash}/ios/{bundle_path}" profile: "production" - profile_hashes: { assetbundleHostHash: cf2d2388 } + profile_hashes: { production: cf2d2388 } required_cookies: true crypto: aes_key_hex: "6732666343305a637a4e394d544a3631" diff --git a/server/src/main.rs b/server/src/main.rs index 6fa8d78..769e044 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,6 +16,7 @@ use std::fs; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tokio::task::JoinSet; use tokio::time::sleep; #[tokio::main] @@ -39,7 +40,7 @@ async fn main() -> anyhow::Result<()> { let routers = Arc::new(build_routers()); let http_server = Arc::new(Server::new(routers.clone())); - let mut tasks = vec![]; + let mut join_set = JoinSet::new(); for server_conf in &CONFIG.connect.server { let server_conf = server_conf.clone().into_tunnel_config(Identity::Server)?; @@ -47,7 +48,7 @@ async fn main() -> anyhow::Result<()> { let server = TunnelListener::bind(server_conf).await?; info!("tcp server started on {}", url); let http_server = http_server.clone(); - tasks.push(tokio::task::spawn(async move { + join_set.spawn(async move { loop { let endpoint = server .accept() @@ -59,20 +60,23 @@ async fn main() -> anyhow::Result<()> { continue; }; if let TunnelEndpoint::Server(connection) = endpoint { - let result = http_server.on_conn(connection).await; - if let Err(e) = result { - error!("Failed to handle connection: {}", e); - } + let http_server = http_server.clone(); + tokio::task::spawn(async move { + let result = http_server.on_conn(connection).await; + if let Err(e) = result { + error!("Failed to handle connection: {}", e); + } + }); } } - })); + }); } for client_conf in &CONFIG.connect.client { let client_conf = client_conf.clone().into_tunnel_config(Identity::Server); let http_server = http_server.clone(); info!("tcp client started for {}", client_conf.url); - tasks.push(tokio::task::spawn(async move { + join_set.spawn(async move { loop { let endpoint = connect_tunnel(client_conf.clone()) .await @@ -90,11 +94,12 @@ async fn main() -> anyhow::Result<()> { } sleep(Duration::from_secs(10)).await; } - })); + }); } - for task in tasks { - let _ = task.await.map_err(|e| error!("{}", e)); + while let Some(res) = join_set.join_next().await { + let Err(e) = res; + error!("{}", e); } Ok(())