1
0
mirror of https://github.com/Bluemangoo/sekai-unpacker.git synced 2026-05-06 20:44:47 +08:00

fix multi client

This commit is contained in:
Bluemangoo 2026-04-10 20:10:54 +08:00
parent 0a4f6338a6
commit d9461d1ef0
Signed by: Bluemangoo
GPG Key ID: F2F7E46880A1C4CF
6 changed files with 179 additions and 62 deletions

View File

@ -18,4 +18,4 @@ simplelog = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
h2 = { workspace = true } h2 = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
tokio-util = "0.7.18" tokio-util = {workspace = true}

View File

@ -1,15 +1,17 @@
use crate::config::{ClientConfig, Profile}; use crate::config::{ClientConfig, Profile};
use crate::task::run; use crate::task::run;
use communicator::{Identity, TunnelEndpoint, TunnelListener, connect_tunnel}; use communicator::{ClientManager, Identity, TunnelEndpoint, TunnelListener, connect_tunnel};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use log::{LevelFilter, error, info}; use log::{LevelFilter, error, info};
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode}; use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use std::collections::{HashMap, VecDeque};
use std::fs; use std::fs;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc};
use tokio::task::JoinSet;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -60,7 +62,59 @@ async fn main() -> anyhow::Result<()> {
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
let mut tasks = vec![]; let mut join_set = JoinSet::new();
let mut senders: HashMap<String, Arc<Sender<Arc<ClientManager>>>> = HashMap::new();
for profile in &profiles {
senders.insert(profile.0.clone(), Arc::new(Sender::new()));
}
let (liveness_tx, mut liveness_rx) = mpsc::channel::<()>(1);
let senders = Arc::new(senders);
let cancel_token = CancellationToken::new();
{
let cancel_token = cancel_token.clone();
join_set.spawn(async move {
if liveness_rx.recv().await.is_none() {
cancel_token.cancel();
}
});
}
for server_conf in &CONFIG.connect.server {
let server_conf = server_conf.clone().into_tunnel_config(Identity::Client)?;
let url = server_conf.url.clone();
let server = TunnelListener::bind(server_conf).await?;
let senders = senders.clone();
let cancel_token = cancel_token.clone();
info!("tcp server started on {}", url);
join_set.spawn(async move {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("tcp server {} stopped", url);
break;
}
endpoint_res = server.accept() => {
let endpoint = match endpoint_res {
Ok(e) => e,
Err(e) => {
error!("Failed to accept connection: {}", e);
continue;
}
};
if let TunnelEndpoint::Client(client) = endpoint {
for (_, sender) in senders.iter() {
sender.send(client.clone());
}
}
}
}
}
});
}
for profile in profiles { for profile in profiles {
let profile = Arc::new(profile.clone()); let profile = Arc::new(profile.clone());
let semaphore = Arc::new(Semaphore::new(1)); let semaphore = Arc::new(Semaphore::new(1));
@ -80,48 +134,45 @@ async fn main() -> anyhow::Result<()> {
drop(permit); drop(permit);
} }
}; };
for server_conf in &CONFIG.connect.server {
let server_conf = server_conf.clone().into_tunnel_config(Identity::Client)?; {
let url = server_conf.url.clone(); let sender = senders.get(&profile.0).unwrap().clone();
let server = TunnelListener::bind(server_conf).await?;
let semaphore = semaphore.clone(); let semaphore = semaphore.clone();
let cancel_token = cancel_token.clone(); let cancel_token = cancel_token.clone();
let profile = profile.clone(); let profile = profile.clone();
info!("tcp server started on {}", url); let liveness_tx = liveness_tx.clone();
tasks.push(tokio::task::spawn(async move { join_set.spawn(async move {
let _guard = liveness_tx;
loop { loop {
let endpoint = server let client = sender.recv();
.accept() if client.is_none() {
.await
.map_err(|e| error!("Failed to accept connection: {}", e));
let endpoint = if let Ok(endpoint) = endpoint {
endpoint
} else {
continue; continue;
}; }
if let TunnelEndpoint::Client(client) = endpoint { let client = client.unwrap();
loop { if cancel_token.is_cancelled() {
if client.get_client().await.is_err() { return;
break; }
loop {
if client.get_client().await.is_err() {
break;
}
if cancel_token.is_cancelled() {
return;
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let result = run(client.clone(), profile.clone()).await;
match result {
Ok(true) => {
post_task(profile.clone(), permit, cancel_token.clone()).await;
} }
if cancel_token.is_cancelled() { Err(error) => {
return; error!("{}", error);
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let result = run(client.clone(), profile.clone()).await;
match result {
Ok(true) => {
post_task(profile.clone(), permit, cancel_token.clone()).await;
}
Err(error) => {
error!("{}", error);
}
_ => {}
} }
_ => {}
} }
} }
} }
})); });
} }
for client_conf in &CONFIG.connect.client { for client_conf in &CONFIG.connect.client {
@ -130,7 +181,7 @@ async fn main() -> anyhow::Result<()> {
let cancel_token = cancel_token.clone(); let cancel_token = cancel_token.clone();
let profile = profile.clone(); let profile = profile.clone();
info!("tcp client started for {}", client_conf.url); info!("tcp client started for {}", client_conf.url);
tasks.push(tokio::task::spawn(async move { join_set.spawn(async move {
loop { loop {
if cancel_token.is_cancelled() { if cancel_token.is_cancelled() {
return; return;
@ -164,18 +215,46 @@ async fn main() -> anyhow::Result<()> {
} }
} }
} }
if cancel_token.is_cancelled() {
return;
}
sleep(Duration::from_secs(10)).await; sleep(Duration::from_secs(10)).await;
} }
})); });
} }
} }
for task in tasks { drop(liveness_tx);
let _ = task.await.map_err(|e| error!("{}", e));
while let Some(res) = join_set.join_next().await {
if let Err(e) = res {
error!("{}", e);
}
} }
Ok(()) Ok(())
} }
struct Sender<T> {
inner: RwLock<VecDeque<T>>,
}
impl<T> Sender<T> {
pub fn new() -> Self {
Self {
inner: RwLock::new(VecDeque::new()),
}
}
pub fn send(&self, item: T) {
self.inner.write().unwrap().push_back(item);
}
pub fn recv(&self) -> Option<T> {
self.inner.write().unwrap().pop_front()
}
}
lazy_static! { lazy_static! {
pub static ref CONFIG: ClientConfig = { pub static ref CONFIG: ClientConfig = {
let raw = fs::read_to_string("sekai-unpacker-client.yaml").unwrap(); let raw = fs::read_to_string("sekai-unpacker-client.yaml").unwrap();

View File

@ -9,12 +9,14 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{RwLock, Semaphore}; use tokio::sync::{RwLock, Semaphore};
use tokio::task::JoinSet;
pub async fn run( pub async fn run(
client: Arc<ClientManager>, client: Arc<ClientManager>,
profile: Arc<(String, Profile)>, profile: Arc<(String, Profile)>,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
info!("[{}]: Starting sync", profile.0); info!("[{}]: Starting sync", profile.0);
tokio::fs::create_dir_all(&profile.1.path).await?;
let sync_resp = sync(&mut client.get_client().await?, &profile.1).await?; let sync_resp = sync(&mut client.get_client().await?, &profile.1).await?;
let id = sync_resp.id; let id = sync_resp.id;
let local_manifest = Arc::new( let local_manifest = Arc::new(
@ -41,7 +43,7 @@ pub async fn run(
info!("[{}]: Collected {} tasks", profile.0, tasks.len()); info!("[{}]: Collected {} tasks", profile.0, tasks.len());
let n = 5; let n = 5;
let semaphore = Arc::new(Semaphore::new(n)); let semaphore = Arc::new(Semaphore::new(n));
let mut handles = Vec::new(); let mut join_set = JoinSet::new();
for task in tasks { for task in tasks {
let permit = semaphore.clone().acquire_owned().await?; let permit = semaphore.clone().acquire_owned().await?;
let client = client.clone(); let client = client.clone();
@ -49,7 +51,7 @@ pub async fn run(
let local_manifest = local_manifest.clone(); let local_manifest = local_manifest.clone();
let profile = profile.clone(); let profile = profile.clone();
handles.push(tokio::task::spawn(async move { join_set.spawn(async move {
let req = DownloadRequest { let req = DownloadRequest {
id: id.clone(), id: id.clone(),
task: task.clone(), task: task.clone(),
@ -68,12 +70,11 @@ pub async fn run(
.await .await
.unwrap(); .unwrap();
drop(permit); drop(permit);
})); });
} }
let mut succeed = 0; let mut succeed = 0;
let mut failed = 0; let mut failed = 0;
for handle in handles { while let Some(r) = join_set.join_next().await {
let r = handle.await;
if let Err(e) = r { if let Err(e) = r {
error!("{}", e); error!("{}", e);
failed += 1; failed += 1;

View File

@ -12,7 +12,7 @@ server:
profiles: profiles:
cn: cn:
region: cn region: cn
interval: 3 # interval: 3 # seconds
filters: filters:
start_app: start_app:
- "thumbnail" - "thumbnail"
@ -40,4 +40,36 @@ profiles:
convert_to_mp3: false convert_to_mp3: false
convert_to_flac: false convert_to_flac: false
remove_wav: false remove_wav: false
path: "./data/cn" path: "./data/cn"
jp:
region: jp
filters:
start_app:
- "thumbnail"
on_demand: [ ]
skip: [ ]
file_ext: [ ]
asset_version: 6.4.0.30
asset_hash: cce60d07-d60e-48dd-be22-12eac2e67950
export:
by_category: false
usm:
export: true
decode: true
acb:
export: true
decode: true
hca:
decode: true
images:
convert_to_webp: false
remove_png: false
video:
convert_to_mp4: false
direct_usm_to_mp4_with_ffmpeg: false
remove_m2v: false
audio:
convert_to_mp3: false
convert_to_flac: false
remove_wav: false
path: "./data/jp"

View File

@ -34,10 +34,10 @@ regions:
enabled: true enabled: true
provider: provider:
kind: colorful_palette kind: colorful_palette
asset_info_url_template: "" asset_info_url_template: "https://{env}-{hash}-assetbundle-info.sekai.colorfulpalette.org/api/version/{asset_version}/{asset_hash}/os/ios"
asset_bundle_url_template: "" asset_bundle_url_template: "https://{env}-{hash}-assetbundle.sekai.colorfulpalette.org/{asset_version}/{asset_hash}/ios/{bundle_path}"
profile: "production" profile: "production"
profile_hashes: { assetbundleHostHash: cf2d2388 } profile_hashes: { production: cf2d2388 }
required_cookies: true required_cookies: true
crypto: crypto:
aes_key_hex: "6732666343305a637a4e394d544a3631" aes_key_hex: "6732666343305a637a4e394d544a3631"

View File

@ -16,6 +16,7 @@ use std::fs;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::sleep; use tokio::time::sleep;
#[tokio::main] #[tokio::main]
@ -39,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
let routers = Arc::new(build_routers()); let routers = Arc::new(build_routers());
let http_server = Arc::new(Server::new(routers.clone())); let http_server = Arc::new(Server::new(routers.clone()));
let mut tasks = vec![]; let mut join_set = JoinSet::new();
for server_conf in &CONFIG.connect.server { for server_conf in &CONFIG.connect.server {
let server_conf = server_conf.clone().into_tunnel_config(Identity::Server)?; let server_conf = server_conf.clone().into_tunnel_config(Identity::Server)?;
@ -47,7 +48,7 @@ async fn main() -> anyhow::Result<()> {
let server = TunnelListener::bind(server_conf).await?; let server = TunnelListener::bind(server_conf).await?;
info!("tcp server started on {}", url); info!("tcp server started on {}", url);
let http_server = http_server.clone(); let http_server = http_server.clone();
tasks.push(tokio::task::spawn(async move { join_set.spawn(async move {
loop { loop {
let endpoint = server let endpoint = server
.accept() .accept()
@ -59,20 +60,23 @@ async fn main() -> anyhow::Result<()> {
continue; continue;
}; };
if let TunnelEndpoint::Server(connection) = endpoint { if let TunnelEndpoint::Server(connection) = endpoint {
let result = http_server.on_conn(connection).await; let http_server = http_server.clone();
if let Err(e) = result { tokio::task::spawn(async move {
error!("Failed to handle connection: {}", e); let result = http_server.on_conn(connection).await;
} if let Err(e) = result {
error!("Failed to handle connection: {}", e);
}
});
} }
} }
})); });
} }
for client_conf in &CONFIG.connect.client { for client_conf in &CONFIG.connect.client {
let client_conf = client_conf.clone().into_tunnel_config(Identity::Server); let client_conf = client_conf.clone().into_tunnel_config(Identity::Server);
let http_server = http_server.clone(); let http_server = http_server.clone();
info!("tcp client started for {}", client_conf.url); info!("tcp client started for {}", client_conf.url);
tasks.push(tokio::task::spawn(async move { join_set.spawn(async move {
loop { loop {
let endpoint = connect_tunnel(client_conf.clone()) let endpoint = connect_tunnel(client_conf.clone())
.await .await
@ -90,11 +94,12 @@ async fn main() -> anyhow::Result<()> {
} }
sleep(Duration::from_secs(10)).await; sleep(Duration::from_secs(10)).await;
} }
})); });
} }
for task in tasks { while let Some(res) = join_set.join_next().await {
let _ = task.await.map_err(|e| error!("{}", e)); let Err(e) = res;
error!("{}", e);
} }
Ok(()) Ok(())