1
0
mirror of https://github.com/Bluemangoo/sekai-unpacker.git synced 2026-05-06 20:44:47 +08:00

Initial Commit

This commit is contained in:
Bluemangoo 2026-04-08 19:40:36 +08:00
commit 7accb43048
Signed by: Bluemangoo
GPG Key ID: F2F7E46880A1C4CF
40 changed files with 7950 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
/.idea
/data

3629
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

36
Cargo.toml Normal file
View File

@ -0,0 +1,36 @@
[workspace]
resolver = "3"
members = ["client", "common", "server", "communicator", "assets-updater"]
[workspace.dependencies]
bytes = "1.11.1"
h2 = "0.4.13"
http = "1.4.0"
log = "0.4.29"
serde = "1.0.228"
serde_json = "1.0.149"
simplelog = "0.12.2"
tokio = "1.51.0"
tokio-rustls = "0.26.4"
rustls-pemfile = "2.2.0"
webpki-roots = "1.0.6"
aes = "0.8.4"
rmp-serde = "1.3.1"
hex = "0.4.3"
cbc = "0.1.2"
tempfile = "3.27.0"
chrono = "0.4.44"
reqwest = "0.13.2"
regex = "1.12.3"
yaml_serde = "0.10.4"
anyhow = "1.0.102"
image = "0.25.10"
cridecoder = "0.1.1"
thiserror = "2.0.18"
cipher = "0.4.4"
moka = "0.12.15"
uuid = "1.23.0"
lazy_static = "1.5.0"
structopt = "0.3.26"
tokio-util = "0.7.18"
futures-util = "0.3.32"

7
README.md Normal file
View File

@ -0,0 +1,7 @@
## server
run `server`
## client
run `client -p <profile>` (multiple profiles are supported, e.g. `client -p profile1 -p profile2`)

24
assets-updater/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "assets-updater"
version = "0.1.0"
edition = "2024"
[dependencies]
common = { path = "../common" }
aes = { workspace = true }
tokio = { workspace = true, features = ["process"] }
rmp-serde = { workspace = true }
hex = { workspace = true }
cbc = { workspace = true }
tempfile = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
regex = { workspace = true }
yaml_serde = { workspace = true }
image = { workspace = true }
cridecoder = { workspace = true }
thiserror = { workspace = true }
log = { workspace = true }
cipher = { workspace = true, features = ["block-padding"] }

View File

@ -0,0 +1,576 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use crate::core::config::{AppConfig, RegionConfig, RegionProviderConfig};
use crate::core::errors::AssetExecutionError;
use crate::core::export_pipeline::extract_unity_asset_bundle;
use crate::core::regions::{compile_patterns, matches_any};
use crate::core::retry::retry_async;
use aes::cipher::block_padding::Pkcs7;
use aes::cipher::{BlockDecryptMut, KeyIvInit};
use chrono::FixedOffset;
use common::updater::{AssetCategory, DownloadTask, SyncContext};
use reqwest::header::{
ACCEPT, ACCEPT_ENCODING, ACCEPT_LANGUAGE, CONNECTION, COOKIE, HeaderMap, HeaderValue,
SET_COOKIE, USER_AGENT,
};
use serde::{Deserialize, Serialize};
type Aes128CbcDec = cbc::Decryptor<aes::Aes128>;
type Aes192CbcDec = cbc::Decryptor<aes::Aes192>;
type Aes256CbcDec = cbc::Decryptor<aes::Aes256>;
/// Deserializes a msgpack/JSON null or missing value as an empty String.
/// Go silently coerces nil → zero value for non-pointer types; this helper
/// mirrors that behavior for String fields.
fn de_null_as_empty_string<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Option::<String>::deserialize(deserializer)?.unwrap_or_default())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssetBundleDetail {
#[serde(rename = "bundleName", deserialize_with = "de_null_as_empty_string")]
pub bundle_name: String,
#[serde(rename = "cacheFileName", deserialize_with = "de_null_as_empty_string")]
pub cache_file_name: String,
#[serde(
rename = "cacheDirectoryName",
deserialize_with = "de_null_as_empty_string"
)]
pub cache_directory_name: String,
// nuverse regions use `crc` instead of `hash`; the server may send nil here.
#[serde(rename = "hash", deserialize_with = "de_null_as_empty_string")]
pub hash: String,
#[serde(rename = "category")]
pub category: AssetCategory,
#[serde(rename = "crc")]
pub crc: i64,
#[serde(rename = "fileSize")]
pub file_size: i64,
#[serde(rename = "dependencies")]
pub dependencies: Vec<String>,
#[serde(rename = "paths", default)]
pub paths: Vec<String>,
#[serde(rename = "isBuiltin")]
pub is_builtin: bool,
#[serde(rename = "isRelocate")]
pub is_relocate: Option<bool>,
#[serde(rename = "md5Hash")]
pub md5_hash: Option<String>,
#[serde(rename = "downloadPath")]
pub download_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssetBundleInfo {
#[serde(rename = "version")]
pub version: Option<String>,
#[serde(rename = "os")]
pub os: Option<String>,
#[serde(rename = "bundles")]
pub bundles: HashMap<String, AssetBundleDetail>,
}
#[derive(Debug, Clone)]
pub struct AssetExecutionContext {
client: reqwest::Client,
region_name: String,
region: RegionConfig,
retry: crate::core::config::RetryConfig,
runtime_cookie: Option<String>,
resolved_asset_version: Option<String>,
pub sync_context: SyncContext,
}
impl AssetExecutionContext {
pub fn new(
app_config: &AppConfig,
sync_context: &SyncContext,
region: &RegionConfig,
) -> Result<Self, AssetExecutionError> {
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
headers.insert(
USER_AGENT,
HeaderValue::from_static("ProductName/134 CFNetwork/1408.0.4 Darwin/22.5.0"),
);
headers.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
headers.insert(
ACCEPT_ENCODING,
HeaderValue::from_static("gzip, deflate, br"),
);
headers.insert(
ACCEPT_LANGUAGE,
HeaderValue::from_static("zh-CN,zh-Hans;q=0.9"),
);
headers.insert(
"X-Unity-Version",
HeaderValue::from_str(&region.runtime.unity_version)
.map_err(|err| AssetExecutionError::HttpClient(err.to_string()))?,
);
let mut builder = reqwest::Client::builder()
.default_headers(headers)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(180))
.pool_max_idle_per_host(100)
.tcp_keepalive(Duration::from_secs(30));
if let Some(proxy) = &app_config.execution.proxy
&& !proxy.is_empty()
{
builder = builder.proxy(
reqwest::Proxy::all(proxy)
.map_err(|err| AssetExecutionError::HttpClient(err.to_string()))?,
);
}
Ok(Self {
client: builder
.build()
.map_err(|err| AssetExecutionError::HttpClient(err.to_string()))?,
sync_context: sync_context.clone(),
region_name: sync_context.region.clone(),
region: region.clone(),
retry: app_config.execution.retry.clone(),
runtime_cookie: None,
resolved_asset_version: sync_context.asset_version.clone(),
})
}
pub async fn fetch_tasks(&mut self) -> Result<Vec<DownloadTask>, AssetExecutionError> {
if self.requires_cookies() {
self.fetch_runtime_cookies().await?;
}
let info = self.fetch_asset_bundle_info().await?;
let tasks = self.build_download_tasks(&info);
Ok(tasks)
}
pub async fn download(
&self,
task: &DownloadTask,
app_config: &AppConfig,
) -> Result<PathBuf, AssetExecutionError> {
let ctx = self.clone();
ctx.download_and_export_bundle(app_config, task).await
}
fn requires_cookies(&self) -> bool {
match &self.region.provider {
RegionProviderConfig::ColorfulPalette {
required_cookies, ..
} => *required_cookies,
RegionProviderConfig::Nuverse {
required_cookies, ..
} => *required_cookies,
}
}
async fn fetch_runtime_cookies(&mut self) -> Result<(), AssetExecutionError> {
let url = match &self.region.provider {
RegionProviderConfig::ColorfulPalette {
cookie_bootstrap_url,
..
}
| RegionProviderConfig::Nuverse {
cookie_bootstrap_url,
..
} => cookie_bootstrap_url.clone().unwrap_or_else(|| {
"https://issue.sekai.colorfulpalette.org/api/signature".to_string()
}),
};
self.runtime_cookie = retry_async(
&self.retry,
"cookie bootstrap",
|_| async {
let response = self.client.post(&url).send().await?;
if response.status().is_success() {
Ok(response
.headers()
.get(SET_COOKIE)
.and_then(|value| value.to_str().ok())
.map(str::to_string))
} else {
Err(AssetExecutionError::HttpStatus {
url: url.clone(),
status: response.status().as_u16(),
})
}
},
is_retryable_http_error,
)
.await?;
Ok(())
}
async fn fetch_asset_bundle_info(&mut self) -> Result<AssetBundleInfo, AssetExecutionError> {
let url = self.render_asset_info_url().await?;
let body = self.get_with_retry(&url).await?;
decrypt_asset_bundle_info(
self.region.crypto.aes_key_hex.as_deref().ok_or_else(|| {
AssetExecutionError::MissingCryptoConfig {
region: self.region_name.clone(),
}
})?,
self.region.crypto.aes_iv_hex.as_deref().ok_or_else(|| {
AssetExecutionError::MissingCryptoConfig {
region: self.region_name.clone(),
}
})?,
&body,
)
}
async fn render_asset_info_url(&mut self) -> Result<String, AssetExecutionError> {
match &self.region.provider {
RegionProviderConfig::ColorfulPalette {
asset_info_url_template,
profile,
profile_hashes,
..
} => {
let asset_version =
self.sync_context.asset_version.as_deref().ok_or_else(|| {
AssetExecutionError::MissingAssetVersionOrHash {
region: self.region_name.clone(),
}
})?;
let asset_hash = self.sync_context.asset_hash.as_deref().ok_or_else(|| {
AssetExecutionError::MissingAssetVersionOrHash {
region: self.region_name.clone(),
}
})?;
let profile_hash = profile_hashes.get(profile).ok_or_else(|| {
AssetExecutionError::MissingProfileHash {
region: self.region_name.clone(),
profile: profile.clone(),
}
})?;
Ok(asset_info_url_template
.replace("{env}", profile)
.replace("{hash}", profile_hash)
.replace("{asset_version}", asset_version)
.replace("{asset_hash}", asset_hash)
+ &time_arg_jst())
}
RegionProviderConfig::Nuverse {
asset_version_url,
app_version,
asset_info_url_template,
..
} => {
// For nuverse, always fetch the version from asset_version_url.
// The incoming request.asset_version is intentionally ignored here
// to match Go reference behavior.
let version_url = asset_version_url.replace("{app_version}", app_version);
let resolved_version =
String::from_utf8_lossy(&self.get_with_retry(&version_url).await?)
.trim()
.to_string();
self.resolved_asset_version = Some(resolved_version.clone());
Ok(asset_info_url_template
.replace("{app_version}", app_version)
.replace("{asset_version}", &resolved_version)
+ &time_arg_jst())
}
}
}
fn render_bundle_url(&self, task: &DownloadTask) -> Result<String, AssetExecutionError> {
match &self.region.provider {
RegionProviderConfig::ColorfulPalette {
asset_bundle_url_template,
profile,
profile_hashes,
..
} => {
let asset_version =
self.sync_context.asset_version.as_deref().ok_or_else(|| {
AssetExecutionError::MissingAssetVersionOrHash {
region: self.region_name.clone(),
}
})?;
let asset_hash = self.sync_context.asset_hash.as_deref().ok_or_else(|| {
AssetExecutionError::MissingAssetVersionOrHash {
region: self.region_name.clone(),
}
})?;
let profile_hash = profile_hashes.get(profile).ok_or_else(|| {
AssetExecutionError::MissingProfileHash {
region: self.region_name.clone(),
profile: profile.clone(),
}
})?;
Ok(asset_bundle_url_template
.replace("{bundle_path}", &task.download_path)
.replace("{asset_version}", asset_version)
.replace("{asset_hash}", asset_hash)
.replace("{env}", profile)
.replace("{hash}", profile_hash)
+ &time_arg_jst())
}
RegionProviderConfig::Nuverse {
asset_bundle_url_template,
app_version,
..
} => {
let asset_version = self
.resolved_asset_version
.as_deref()
.unwrap_or("<resolved-asset-version>");
Ok(asset_bundle_url_template
.replace("{bundle_path}", &task.download_path)
.replace("{app_version}", app_version)
.replace("{asset_version}", asset_version)
+ &time_arg_jst())
}
}
}
async fn get_with_retry(&self, url: &str) -> Result<Vec<u8>, AssetExecutionError> {
retry_async(
&self.retry,
"http get",
|_| async {
let mut request = self.client.get(url);
if let Some(cookie) = &self.runtime_cookie {
request = request.header(COOKIE, cookie);
}
match request.send().await {
Ok(response) if response.status().is_success() => {
Ok(response.bytes().await?.to_vec())
}
Ok(response) => Err(AssetExecutionError::HttpStatus {
url: url.to_string(),
status: response.status().as_u16(),
}),
Err(err) => Err(AssetExecutionError::Http(err)),
}
},
is_retryable_http_error,
)
.await
}
fn build_download_tasks(&self, info: &AssetBundleInfo) -> Vec<DownloadTask> {
let skip_patterns = compile_patterns(&self.sync_context.filters.skip);
let start_app_patterns = compile_patterns(&self.sync_context.filters.start_app);
let on_demand_patterns = compile_patterns(&self.sync_context.filters.on_demand);
let mut tasks = Vec::new();
for (bundle_name, detail) in &info.bundles {
if matches_any(&skip_patterns, bundle_name) {
continue;
}
let category_patterns = match &detail.category {
AssetCategory::StartApp => &start_app_patterns,
AssetCategory::OnDemand => &on_demand_patterns,
AssetCategory::Other(_) => continue,
};
if category_patterns.is_empty() || !matches_any(category_patterns, bundle_name) {
continue;
}
let bundle_hash = match self.region.provider {
RegionProviderConfig::Nuverse { .. } => detail.crc.to_string(),
RegionProviderConfig::ColorfulPalette { .. } => detail.hash.clone(),
};
tasks.push(DownloadTask {
download_path: download_path_for_region(&self.region.provider, bundle_name, detail),
bundle_path: bundle_name.clone(),
bundle_hash,
category: detail.category.clone(),
});
}
tasks.sort_by(|a, b| a.bundle_path.cmp(&b.bundle_path));
tasks
}
async fn download_and_export_bundle(
&self,
app_config: &AppConfig,
task: &DownloadTask,
) -> Result<PathBuf, AssetExecutionError> {
let bundle_url = self.render_bundle_url(task)?;
let body = self.get_with_retry(&bundle_url).await?;
let deobfuscated = deobfuscate(&body);
let temp_file = std::env::temp_dir()
.join("sekai-updater")
.join("obf")
.join(&self.region_name)
.join(&task.bundle_path);
if let Some(parent) = temp_file.parent() {
std::fs::create_dir_all(parent).map_err(|source| {
AssetExecutionError::CreateTempDir {
path: parent.to_path_buf(),
source,
}
})?;
}
std::fs::write(&temp_file, deobfuscated).map_err(|source| {
AssetExecutionError::WriteTempFile {
path: temp_file.clone(),
source,
}
})?;
let category = match task.category {
AssetCategory::StartApp => "StartApp",
AssetCategory::OnDemand => "OnDemand",
AssetCategory::Other(_) => "OnDemand",
};
let export_result = extract_unity_asset_bundle(
app_config,
&self.sync_context,
&self.region,
&temp_file,
&task.bundle_path,
category,
)
.await;
let _ = std::fs::remove_file(&temp_file);
export_result.map_err(Into::into)
}
}
pub async fn fetch_live_asset_bundle_info(
app_config: &AppConfig,
sync_context: &SyncContext,
region: &RegionConfig,
) -> Result<AssetBundleInfo, AssetExecutionError> {
let mut context = AssetExecutionContext::new(app_config, sync_context, region)?;
if context.requires_cookies() {
context.fetch_runtime_cookies().await?;
}
context.fetch_asset_bundle_info().await
}
fn is_retryable_http_error(err: &AssetExecutionError) -> bool {
match err {
AssetExecutionError::Http(_) => true,
AssetExecutionError::HttpStatus { status, .. } => *status >= 500,
_ => false,
}
}
pub fn decrypt_asset_bundle_info(
aes_key_hex: &str,
aes_iv_hex: &str,
content: &[u8],
) -> Result<AssetBundleInfo, AssetExecutionError> {
if content.is_empty() {
return Err(AssetExecutionError::EmptyEncryptedContent);
}
if !content.len().is_multiple_of(16) {
return Err(AssetExecutionError::InvalidEncryptedBlockSize);
}
let key = hex::decode(aes_key_hex)
.map_err(|err| AssetExecutionError::InvalidAesKeyHex(err.to_string()))?;
let iv = hex::decode(aes_iv_hex)
.map_err(|err| AssetExecutionError::InvalidAesIvHex(err.to_string()))?;
if iv.len() != 16 {
return Err(AssetExecutionError::InvalidAesIvLength { got: iv.len() });
}
let mut buf = content.to_vec();
let decrypted = match key.len() {
16 => Aes128CbcDec::new_from_slices(&key, &iv)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))?
.decrypt_padded_mut::<Pkcs7>(&mut buf)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))?,
24 => Aes192CbcDec::new_from_slices(&key, &iv)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))?
.decrypt_padded_mut::<Pkcs7>(&mut buf)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))?,
32 => Aes256CbcDec::new_from_slices(&key, &iv)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))?
.decrypt_padded_mut::<Pkcs7>(&mut buf)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))?,
_ => {
return Err(AssetExecutionError::AssetInfoDecode(format!(
"unsupported AES key length {}",
key.len()
)));
}
};
rmp_serde::from_slice::<AssetBundleInfo>(decrypted)
.map_err(|err| AssetExecutionError::AssetInfoDecode(err.to_string()))
}
pub fn deobfuscate(data: &[u8]) -> Vec<u8> {
const SIMPLE: [u8; 4] = [0x20, 0x00, 0x00, 0x00];
const XOR_HEADER: [u8; 4] = [0x10, 0x00, 0x00, 0x00];
if data.starts_with(&SIMPLE) {
return data[4..].to_vec();
}
if data.starts_with(&XOR_HEADER) {
let body = &data[4..];
if body.len() < 128 {
return body.to_vec();
}
let mut header = vec![0u8; 128];
let pattern = [0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00];
for idx in 0..128 {
header[idx] = body[idx] ^ pattern[idx % pattern.len()];
}
let mut output = header;
output.extend_from_slice(&body[128..]);
return output;
}
data.to_vec()
}
pub fn should_download_bundle(
sync_context: &SyncContext,
bundle_name: &str,
category: &AssetCategory,
) -> bool {
let compiled = match category {
AssetCategory::StartApp => compile_patterns(&sync_context.filters.start_app),
AssetCategory::OnDemand => compile_patterns(&sync_context.filters.on_demand),
AssetCategory::Other(_) => return false,
};
if compiled.is_empty() {
return false;
}
matches_any(&compiled, bundle_name)
}
fn download_path_for_region(
provider: &RegionProviderConfig,
bundle_name: &str,
detail: &AssetBundleDetail,
) -> String {
match provider {
RegionProviderConfig::ColorfulPalette { .. } => bundle_name.to_string(),
RegionProviderConfig::Nuverse { .. } => detail
.download_path
.as_ref()
.map(|prefix| format!("{prefix}/{bundle_name}"))
.unwrap_or_else(|| bundle_name.to_string()),
}
}
fn time_arg_jst() -> String {
let tz = FixedOffset::east_opt(9 * 3600).unwrap();
format!(
"?t={}",
chrono::Utc::now().with_timezone(&tz).format("%Y%m%d%H%M%S")
)
}

View File

@ -0,0 +1,89 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use cridecoder::{extract_acb_from_file, extract_usm_file, HcaDecoder};
use serde::Serialize;
use crate::core::errors::CodecError;
pub const CODEC_BACKEND: &str = "crates.io:cridecoder@0.1.1";
#[derive(Debug, Clone, Serialize)]
pub struct CodecSummary {
pub backend: &'static str,
pub supports_acb: bool,
pub supports_usm: bool,
pub supports_hca_to_wav: bool,
pub supports_usm_metadata: bool,
}
pub fn codec_summary() -> CodecSummary {
CodecSummary {
backend: CODEC_BACKEND,
supports_acb: true,
supports_usm: true,
supports_hca_to_wav: true,
supports_usm_metadata: true,
}
}
pub fn export_acb(input: &Path, output_dir: &Path) -> Result<Option<Vec<String>>, CodecError> {
extract_acb_from_file(input, output_dir).map_err(|err| CodecError::Acb(err.to_string()))
}
pub fn export_usm(input: &Path, output_dir: &Path) -> Result<Vec<PathBuf>, CodecError> {
let outputs = extract_usm_file(input, output_dir, None, false)
.map_err(|err| CodecError::Usm(err.to_string()))?;
normalize_usm_output_names(input, outputs)
}
pub fn read_usm_metadata(input: &Path) -> Result<cridecoder::usm::Metadata, CodecError> {
cridecoder::usm::read_metadata_file(input).map_err(|err| CodecError::Metadata(err.to_string()))
}
pub fn decode_hca_to_wav(input: &Path, output: &Path) -> Result<(), CodecError> {
let input_path = input
.to_str()
.ok_or_else(|| CodecError::NonUtf8Path(input.to_path_buf()))?;
let mut decoder =
HcaDecoder::from_file(input_path).map_err(|err| CodecError::Hca(err.to_string()))?;
let mut file = File::create(output).map_err(|source| CodecError::Io {
path: output.to_path_buf(),
source,
})?;
decoder
.decode_to_wav(&mut file)
.map_err(|err| CodecError::Hca(err.to_string()))
}
fn normalize_usm_output_names(
input: &Path,
outputs: Vec<PathBuf>,
) -> Result<Vec<PathBuf>, CodecError> {
let input_stem = input
.file_stem()
.and_then(|stem| stem.to_str())
.ok_or_else(|| CodecError::NonUtf8Path(input.to_path_buf()))?;
let mut normalized = Vec::with_capacity(outputs.len());
for output in outputs {
let ext = output
.extension()
.and_then(|ext| ext.to_str())
.ok_or_else(|| CodecError::NonUtf8Path(output.clone()))?;
let target = output.with_file_name(format!("{input_stem}.{ext}"));
if output != target {
std::fs::rename(&output, &target).map_err(|source| CodecError::Io {
path: target.clone(),
source,
})?;
normalized.push(target);
} else {
normalized.push(output);
}
}
Ok(normalized)
}

View File

@ -0,0 +1,207 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::core::errors::ConfigError;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct AppConfig {
pub execution: ExecutionConfig,
pub tools: ToolsConfig,
pub concurrency: ConcurrencyConfig,
pub regions: BTreeMap<String, RegionConfig>,
}
impl AppConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
for region_name in self.regions.keys() {
if region_name.to_lowercase() != *region_name {
return Err(ConfigError::InvalidRegionName(region_name.clone()));
}
}
Ok(())
}
pub fn enabled_regions(&self) -> Vec<String> {
self.regions
.iter()
.filter_map(|(name, region)| region.enabled.then_some(name.clone()))
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AccessLogConfig {
pub enabled: bool,
pub format: String,
pub file: Option<String>,
}
impl Default for AccessLogConfig {
fn default() -> Self {
Self {
enabled: true,
format: "[${time}] ${status} - ${method} ${path} ${latency}\n".to_string(),
file: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ToolsConfig {
pub ffmpeg_path: String,
pub asset_studio_cli_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ExecutionConfig {
pub proxy: Option<String>,
pub timeout_seconds: u64,
pub allow_cancel: bool,
/// How many successful downloads to accumulate before flushing the download
/// record to disk mid-run. Set to `0` to disable mid-run flushing (record
/// is only written once at the end). Mirrors Go's `batchSaveSize`.
pub batch_save_size: usize,
pub retry: RetryConfig,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
proxy: None,
timeout_seconds: 300,
allow_cancel: true,
batch_save_size: 50,
retry: RetryConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct RetryConfig {
pub attempts: usize,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
attempts: 4,
initial_backoff_ms: 1_000,
max_backoff_ms: 4_000,
}
}
}
impl Default for ToolsConfig {
fn default() -> Self {
Self {
ffmpeg_path: "ffmpeg".to_string(),
asset_studio_cli_path: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ConcurrencyConfig {
pub download: usize,
pub upload: usize,
pub acb: usize,
pub usm: usize,
pub hca: usize,
}
impl Default for ConcurrencyConfig {
fn default() -> Self {
Self {
download: 4,
upload: 4,
acb: 8,
usm: 4,
hca: 16,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct RegionConfig {
pub enabled: bool,
pub provider: RegionProviderConfig,
pub crypto: CryptoConfig,
pub runtime: RegionRuntimeConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum RegionProviderConfig {
ColorfulPalette {
asset_info_url_template: String,
asset_bundle_url_template: String,
profile: String,
profile_hashes: BTreeMap<String, String>,
#[serde(default)]
required_cookies: bool,
#[serde(default)]
cookie_bootstrap_url: Option<String>,
},
Nuverse {
asset_version_url: String,
app_version: String,
asset_info_url_template: String,
asset_bundle_url_template: String,
#[serde(default)]
required_cookies: bool,
#[serde(default)]
cookie_bootstrap_url: Option<String>,
},
}
impl Default for RegionProviderConfig {
fn default() -> Self {
Self::ColorfulPalette {
asset_info_url_template: String::new(),
asset_bundle_url_template: String::new(),
profile: String::new(),
profile_hashes: BTreeMap::new(),
required_cookies: false,
cookie_bootstrap_url: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct CryptoConfig {
pub aes_key_hex: Option<String>,
pub aes_iv_hex: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct RegionRuntimeConfig {
pub unity_version: String,
}
impl Default for RegionRuntimeConfig {
fn default() -> Self {
Self {
unity_version: "2022.3.21f1".to_string(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct RegionPathsConfig {
pub asset_save_dir: Option<String>,
pub downloaded_asset_record_file: Option<String>,
}

View File

@ -0,0 +1,142 @@
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("failed to read config file {path}: {source}")]
Read {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to parse config file {path}: {source}")]
Parse {
path: PathBuf,
#[source]
source: yaml_serde::Error,
},
#[error("config_version must be 2, got {0}")]
UnsupportedVersion(u32),
#[error("no v2 config file found; tried: {0}")]
MissingConfigFile(String),
#[error("invalid region key `{0}`; region keys must be lowercase")]
InvalidRegionName(String),
#[error("missing required environment variable `{name}` referenced by `{field}`")]
MissingEnvironmentVariable { field: String, name: String },
}
#[derive(Debug, Error)]
pub enum RegionError {
#[error("region `{0}` not found")]
NotFound(String),
#[error("region `{0}` is disabled")]
Disabled(String),
}
#[derive(Debug, Error)]
pub enum CodecError {
#[error("path {0} is not valid UTF-8 for cridecoder file APIs")]
NonUtf8Path(PathBuf),
#[error("io error at {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("ACB extraction failed: {0}")]
Acb(String),
#[error("USM extraction failed: {0}")]
Usm(String),
#[error("USM metadata read failed: {0}")]
Metadata(String),
#[error("HCA decode failed: {0}")]
Hca(String),
}
#[derive(Debug, Error)]
pub enum ExportPipelineError {
#[error(transparent)]
Codec(#[from] CodecError),
#[error("io error at {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("image codec error at {path}: {source}")]
Image {
path: PathBuf,
#[source]
source: image::ImageError,
},
#[error("failed to spawn command `{program}`: {source}")]
Spawn {
program: String,
#[source]
source: std::io::Error,
},
#[error("command `{program}` failed with status {status}: {stderr}")]
CommandFailed {
program: String,
status: String,
stderr: String,
},
#[error("failed to spawn worker `{worker}`: {source}")]
WorkerSpawn {
worker: String,
#[source]
source: std::io::Error,
},
#[error("worker `{worker}` panicked: {message}")]
WorkerPanic { worker: String, message: String },
}
#[derive(Debug, Error)]
pub enum AssetExecutionError {
#[error(transparent)]
Region(#[from] RegionError),
#[error(transparent)]
ExportPipeline(#[from] ExportPipelineError),
#[error("http request failed: {0}")]
Http(#[from] reqwest::Error),
#[error("failed to initialize HTTP client: {0}")]
HttpClient(String),
#[error("HTTP request to {url} returned status {status}")]
HttpStatus { url: String, status: u16 },
#[error("region `{region}` is missing asset_save_dir")]
MissingAssetSaveDir { region: String },
#[error("colorful_palette region `{region}` requires asset_version and asset_hash")]
MissingAssetVersionOrHash { region: String },
#[error("colorful_palette region `{region}` is missing profile hash for `{profile}`")]
MissingProfileHash { region: String, profile: String },
#[error("region `{region}` is missing AES key or IV configuration")]
MissingCryptoConfig { region: String },
#[error("invalid AES key hex: {0}")]
InvalidAesKeyHex(String),
#[error("invalid AES IV hex: {0}")]
InvalidAesIvHex(String),
#[error("invalid AES IV length: got {got}, want 16")]
InvalidAesIvLength { got: usize },
#[error("encrypted content cannot be empty")]
EmptyEncryptedContent,
#[error("encrypted content length is not a multiple of AES block size")]
InvalidEncryptedBlockSize,
#[error("failed to decrypt or deserialize asset info: {0}")]
AssetInfoDecode(String),
#[error("failed to create temp directory for {path}: {source}")]
CreateTempDir {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to write temp file {path}: {source}")]
WriteTempFile {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("job execution cancelled")]
Cancelled,
}

View File

@ -0,0 +1,790 @@
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use crate::core::codec;
use crate::core::config::{AppConfig, RegionConfig};
use crate::core::errors::ExportPipelineError;
use crate::core::media::{
FrameRate, convert_m2v_to_mp4, convert_usm_to_mp4, convert_wav_to_flac, convert_wav_to_mp3,
};
use crate::core::retry::retry_async;
use common::updater::SyncContext;
use image::codecs::webp::WebPEncoder;
use image::{ExtendedColorType, ImageReader};
use log::debug;
use serde::Serialize;
use tokio::process::Command;
#[derive(Debug, Clone, Copy)]
struct AssetStudioCliCapabilities {
filter_exclude_mode: bool,
filter_blacklist_mode: bool,
sekai_keep_single_container_filename: bool,
}
#[derive(Debug, Clone, Serialize, Default)]
pub struct PostProcessSummary {
pub export_root: PathBuf,
pub generated_files: Vec<PathBuf>,
}
pub fn get_export_group(export_path: &str) -> &'static str {
if export_path.is_empty() {
return "container";
}
let normalized = export_path
.replace('\\', "/")
.trim_start_matches('/')
.to_lowercase();
for prefix in [
"event/center",
"event/thumbnail",
"gacha/icon",
"fix_prefab/mc_new",
"mysekai/character/",
] {
if normalized.starts_with(prefix) {
return "containerFull";
}
}
"container"
}
pub async fn extract_unity_asset_bundle(
app_config: &AppConfig,
sync_context: &SyncContext,
region: &RegionConfig,
asset_bundle_file: &Path,
export_path: &str,
category: &str,
) -> Result<PathBuf, ExportPipelineError> {
let output_dir = std::env::temp_dir()
.join("sekai-updater")
.join("extract")
.join(&sync_context.region);
let Some(asset_studio_cli_path) = app_config.tools.asset_studio_cli_path.as_deref() else {
return Ok(asset_bundle_file.parent().unwrap().to_path_buf());
};
let exclude_path_prefix = if sync_context.export.by_category {
"assets/sekai/assetbundle/resources".to_string()
} else if export_path.starts_with("mysekai") {
"assets/sekai/assetbundle/resources/ondemand".to_string()
} else {
format!(
"assets/sekai/assetbundle/resources/{}",
category.to_lowercase()
)
};
let actual_export_path = if sync_context.export.by_category {
output_dir.join(category.to_lowercase()).join(export_path)
} else {
output_dir.join(export_path)
};
debug!("{}", actual_export_path.to_string_lossy());
let capabilities = detect_assetstudio_cli_capabilities(asset_studio_cli_path);
let args = build_assetstudio_export_args(
asset_bundle_file,
output_dir.as_path(),
export_path,
&exclude_path_prefix,
region,
sync_context,
capabilities,
);
retry_async(
&app_config.execution.retry,
"assetstudio export",
|_| async {
let output = Command::new(asset_studio_cli_path)
.args(&args)
.output()
.await
.map_err(|source| ExportPipelineError::Spawn {
program: asset_studio_cli_path.to_string(),
source,
})?;
if output.status.success() {
Ok(())
} else {
Err(ExportPipelineError::CommandFailed {
program: asset_studio_cli_path.to_string(),
status: output.status.to_string(),
stderr: String::from_utf8_lossy(&output.stderr).trim().to_string(),
})
}
},
is_retryable_command_error,
)
.await?;
post_process_exported_files(app_config, sync_context, region, &actual_export_path).await?;
Ok(actual_export_path)
}
pub async fn post_process_exported_files(
app_config: &AppConfig,
sync_context: &SyncContext,
region: &RegionConfig,
export_path: &Path,
) -> Result<(), ExportPipelineError> {
if !export_path.exists() {
return Ok(());
}
handle_usm_files(
export_path,
sync_context,
region,
&app_config.tools.ffmpeg_path,
&app_config.execution.retry,
)
.await?;
handle_acb_files(
export_path,
sync_context,
region,
&app_config.tools.ffmpeg_path,
&app_config.execution.retry,
app_config.concurrency.acb,
app_config.concurrency.hca,
)
.await?;
handle_png_conversion(export_path, sync_context, region).await?;
Ok(())
}
async fn handle_usm_files(
export_path: &Path,
sync_context: &SyncContext,
region: &RegionConfig,
ffmpeg_path: &str,
retry: &crate::core::config::RetryConfig,
) -> Result<Vec<PathBuf>, ExportPipelineError> {
let usm_files = find_files_by_extension(export_path, "usm")?;
if !sync_context.export.usm.export || !sync_context.export.usm.decode || usm_files.is_empty() {
return Ok(Vec::new());
}
let usm_input = if usm_files.len() == 1 {
usm_files[0].clone()
} else {
merge_usm_files(export_path, &usm_files)?
};
process_usm_file(
&usm_input,
export_path,
sync_context,
region,
ffmpeg_path,
retry,
)
.await
}
async fn process_usm_file(
usm_file: &Path,
export_path: &Path,
sync_context: &SyncContext,
_: &RegionConfig,
ffmpeg_path: &str,
retry: &crate::core::config::RetryConfig,
) -> Result<Vec<PathBuf>, ExportPipelineError> {
let output_name = usm_file
.file_stem()
.and_then(|stem| stem.to_str())
.ok_or_else(|| ExportPipelineError::Io {
path: usm_file.to_path_buf(),
source: std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid usm file name"),
})?
.to_string();
if sync_context.export.video.convert_to_mp4
&& sync_context.export.video.direct_usm_to_mp4_with_ffmpeg
{
let mp4 = export_path.join(format!("{output_name}.mp4"));
convert_usm_to_mp4(usm_file, &mp4, ffmpeg_path, retry).await?;
remove_file_if_exists(usm_file)?;
return Ok(vec![mp4]);
}
let metadata = codec::read_usm_metadata(usm_file).ok();
let frame_rate = metadata
.as_ref()
.and_then(|metadata| metadata.video_frame_rate())
.filter(|(_, denominator)| *denominator > 0)
.map(FrameRate::from_tuple);
let extracted = codec::export_usm(usm_file, export_path)?;
let mut generated = extracted.clone();
if sync_context.export.video.convert_to_mp4 {
for extracted_file in extracted {
if extracted_file
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.eq_ignore_ascii_case("m2v"))
.unwrap_or(false)
{
let mp4 = export_path.join(format!("{output_name}.mp4"));
convert_m2v_to_mp4(
&extracted_file,
&mp4,
sync_context.export.video.remove_m2v,
ffmpeg_path,
frame_rate,
retry,
)
.await?;
generated.push(mp4);
if sync_context.export.video.remove_m2v {
generated.retain(|path| path != &extracted_file);
}
}
}
}
remove_file_if_exists(usm_file)?;
Ok(generated)
}
async fn handle_acb_files(
export_path: &Path,
sync_context: &SyncContext,
region: &RegionConfig,
ffmpeg_path: &str,
retry: &crate::core::config::RetryConfig,
acb_concurrency: usize,
hca_concurrency: usize,
) -> Result<Vec<PathBuf>, ExportPipelineError> {
let acb_files = find_files_by_extension(export_path, "acb")?;
if !sync_context.export.acb.export || !sync_context.export.acb.decode || acb_files.is_empty() {
return Ok(Vec::new());
}
let export_path = export_path.to_path_buf();
let region = region.clone();
let sync_context = sync_context.clone();
let ffmpeg_path = ffmpeg_path.to_string();
let retry = retry.clone();
run_path_tasks(acb_files, acb_concurrency, move |acb_file| {
process_acb_file(
&acb_file,
&export_path,
&sync_context,
&region,
&ffmpeg_path,
&retry,
hca_concurrency,
)
})
}
fn process_acb_file(
acb_file: &Path,
output_dir: &Path,
sync_context: &SyncContext,
region: &RegionConfig,
ffmpeg_path: &str,
retry: &crate::core::config::RetryConfig,
hca_concurrency: usize,
) -> Result<Vec<PathBuf>, ExportPipelineError> {
let parent_dir = acb_file.parent().ok_or_else(|| ExportPipelineError::Io {
path: acb_file.to_path_buf(),
source: std::io::Error::new(std::io::ErrorKind::NotFound, "missing parent directory"),
})?;
let extract_dir = tempfile::Builder::new()
.prefix("acb-extract-")
.tempdir_in(parent_dir)
.map_err(|source| ExportPipelineError::Io {
path: parent_dir.to_path_buf(),
source,
})?;
let _ = codec::export_acb(acb_file, extract_dir.path())?;
let mut hca_files = find_files_by_extension(extract_dir.path(), "hca")?;
let acb_path_lower = acb_file.to_string_lossy().replace('\\', "/").to_lowercase();
if acb_path_lower.contains("music/long") {
hca_files.retain(|path| {
let lower = path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default()
.to_lowercase();
!(lower.ends_with("_vr.hca") || lower.ends_with("_screen.hca"))
});
}
if !sync_context.export.hca.decode {
remove_file_if_exists(acb_file)?;
return Ok(Vec::new());
}
let extract_output_dir = extract_dir.path().to_path_buf();
let region = region.clone();
let sync_context = sync_context.clone();
let ffmpeg_path = ffmpeg_path.to_string();
let retry = retry.clone();
let generated = run_path_tasks(hca_files, hca_concurrency, move |hca_file| {
process_hca_file(
&hca_file,
&extract_output_dir,
&sync_context,
&region,
&ffmpeg_path,
&retry,
)
})?;
let final_outputs = move_result_files(output_dir, &generated)?;
remove_file_if_exists(acb_file)?;
Ok(final_outputs)
}
fn process_hca_file(
hca_file: &Path,
output_dir: &Path,
sync_context: &SyncContext,
_: &RegionConfig,
ffmpeg_path: &str,
retry: &crate::core::config::RetryConfig,
) -> Result<Vec<PathBuf>, ExportPipelineError> {
let base_name = hca_file
.file_stem()
.and_then(|stem| stem.to_str())
.ok_or_else(|| ExportPipelineError::Io {
path: hca_file.to_path_buf(),
source: std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hca file name"),
})?;
let wav_file = hca_file.with_extension("wav");
codec::decode_hca_to_wav(hca_file, &wav_file)?;
remove_file_if_exists(hca_file)?;
let mut generated = vec![wav_file.clone()];
if sync_context.export.audio.convert_to_mp3 {
let mp3 = output_dir.join(format!("{base_name}.mp3"));
convert_wav_to_mp3(&wav_file, &mp3, ffmpeg_path, retry)?;
if sync_context.export.audio.remove_wav {
remove_file_if_exists(&wav_file)?;
generated.retain(|path| path != &wav_file);
}
generated.push(mp3);
} else if sync_context.export.audio.convert_to_flac {
let flac = output_dir.join(format!("{base_name}.flac"));
convert_wav_to_flac(&wav_file, &flac, ffmpeg_path, retry)?;
if sync_context.export.audio.remove_wav {
remove_file_if_exists(&wav_file)?;
generated.retain(|path| path != &wav_file);
}
generated.push(flac);
} else if sync_context.export.audio.remove_wav {
remove_file_if_exists(&wav_file)?;
generated.clear();
}
let final_outputs = move_result_files(output_dir, &generated)?;
Ok(final_outputs)
}
async fn handle_png_conversion(
export_path: &Path,
sync_context: &SyncContext,
_: &RegionConfig,
) -> Result<Vec<PathBuf>, ExportPipelineError> {
if !sync_context.export.images.convert_to_webp {
return Ok(Vec::new());
}
let png_files = find_files_by_extension(export_path, "png")?;
let mut generated = Vec::new();
for png_file in png_files {
let webp = png_file.with_extension("webp");
convert_png_to_webp(&png_file, &webp)?;
generated.push(webp.clone());
if sync_context.export.images.remove_png {
remove_file_if_exists(&png_file)?;
}
}
Ok(generated)
}
fn convert_png_to_webp(png_file: &Path, webp_file: &Path) -> Result<(), ExportPipelineError> {
let image = ImageReader::open(png_file)
.map_err(|source| ExportPipelineError::Io {
path: png_file.to_path_buf(),
source,
})?
.decode()
.map_err(|source| ExportPipelineError::Image {
path: png_file.to_path_buf(),
source,
})?;
let rgba = image.to_rgba8();
let (width, height) = rgba.dimensions();
let writer = std::fs::File::create(webp_file).map_err(|source| ExportPipelineError::Io {
path: webp_file.to_path_buf(),
source,
})?;
let writer = std::io::BufWriter::new(writer);
WebPEncoder::new_lossless(writer)
.encode(rgba.as_raw(), width, height, ExtendedColorType::Rgba8)
.map_err(|source| ExportPipelineError::Image {
path: webp_file.to_path_buf(),
source,
})
}
fn is_retryable_command_error(err: &ExportPipelineError) -> bool {
match err {
ExportPipelineError::Spawn { source, .. } => matches!(
source.kind(),
std::io::ErrorKind::Interrupted
| std::io::ErrorKind::TimedOut
| std::io::ErrorKind::WouldBlock
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::ConnectionRefused
),
ExportPipelineError::CommandFailed { .. } => true,
_ => false,
}
}
fn build_assetstudio_export_args(
asset_bundle_file: &Path,
output_dir: &Path,
export_path: &str,
exclude_path_prefix: &str,
region: &RegionConfig,
sync_context: &SyncContext,
capabilities: AssetStudioCliCapabilities,
) -> Vec<String> {
let mut args = vec![
asset_bundle_file.to_string_lossy().to_string(),
"-m".to_string(),
"export".to_string(),
"-t".to_string(),
"monoBehaviour,textAsset,tex2d,tex2dArray,audio".to_string(),
"-g".to_string(),
get_export_group(export_path).to_string(),
"-f".to_string(),
"assetName".to_string(),
"-o".to_string(),
output_dir.to_string_lossy().to_string(),
"--strip-path-prefix".to_string(),
exclude_path_prefix.to_string(),
"-r".to_string(),
];
if capabilities.filter_exclude_mode {
args.push("--filter-exclude-mode".to_string());
} else if capabilities.filter_blacklist_mode {
args.push("--filter-blacklist-mode".to_string());
}
args.push("--filter-with-regex".to_string());
if capabilities.sekai_keep_single_container_filename {
args.push("--sekai-keep-single-container-filename".to_string());
}
if !region.runtime.unity_version.is_empty() {
args.push("--unity-version".to_string());
args.push(region.runtime.unity_version.clone());
}
let mut excluded_exts = Vec::new();
if !sync_context.export.usm.export {
excluded_exts.push("usm");
}
if !sync_context.export.acb.export {
excluded_exts.push("acb");
}
if !excluded_exts.is_empty() {
args.push("--filter-by-name".to_string());
args.push(format!(r".*\.({})$", excluded_exts.join("|")));
}
args
}
fn detect_assetstudio_cli_capabilities(asset_studio_cli_path: &str) -> AssetStudioCliCapabilities {
static CACHE: std::sync::OnceLock<
Mutex<std::collections::HashMap<String, AssetStudioCliCapabilities>>,
> = std::sync::OnceLock::new();
let cache = CACHE.get_or_init(|| Mutex::new(std::collections::HashMap::new()));
if let Some(cached) = cache.lock().unwrap().get(asset_studio_cli_path).copied() {
return cached;
}
let fallback = AssetStudioCliCapabilities {
filter_exclude_mode: true,
filter_blacklist_mode: false,
sekai_keep_single_container_filename: true,
};
let detected = match std::process::Command::new(asset_studio_cli_path)
.arg("--help")
.output()
{
Ok(output) => {
let help = format!(
"{}\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);
AssetStudioCliCapabilities {
filter_exclude_mode: help.contains("--filter-exclude-mode"),
filter_blacklist_mode: help.contains("--filter-blacklist-mode"),
sekai_keep_single_container_filename: help
.contains("--sekai-keep-single-container-filename"),
}
}
Err(_) => fallback,
};
cache
.lock()
.unwrap()
.insert(asset_studio_cli_path.to_string(), detected);
detected
}
fn move_result_files(
output_dir: &Path,
generated: &[PathBuf],
) -> Result<Vec<PathBuf>, ExportPipelineError> {
let mut final_outputs = Vec::new();
for path in generated {
let file_name = match path.file_name() {
Some(name) => name,
None => continue,
};
let destination = output_dir.join(file_name);
if path != &destination {
std::fs::rename(path, &destination).map_err(|source| ExportPipelineError::Io {
path: destination.clone(),
source,
})?;
final_outputs.push(destination);
} else if destination.exists() {
final_outputs.push(destination);
}
}
Ok(final_outputs)
}
fn run_path_tasks<F>(
paths: Vec<PathBuf>,
concurrency: usize,
task: F,
) -> Result<Vec<PathBuf>, ExportPipelineError>
where
F: Fn(PathBuf) -> Result<Vec<PathBuf>, ExportPipelineError> + Send + Sync + 'static,
{
if paths.is_empty() {
return Ok(Vec::new());
}
let worker_count = concurrency.max(1).min(paths.len());
let queue = Arc::new(Mutex::new(VecDeque::from(paths)));
let results = Arc::new(Mutex::new(Vec::<PathBuf>::new()));
let first_error = Arc::new(Mutex::new(None::<ExportPipelineError>));
let task = Arc::new(task);
let mut handles = Vec::with_capacity(worker_count);
const WORKER_STACK_SIZE: usize = 32 * 1024 * 1024;
for _ in 0..worker_count {
let queue = queue.clone();
let results = results.clone();
let first_error = first_error.clone();
let task = task.clone();
let worker_name = "export-task".to_string();
let handle = std::thread::Builder::new()
.name(worker_name.clone())
.stack_size(WORKER_STACK_SIZE)
.spawn(move || {
loop {
if first_error.lock().unwrap().is_some() {
break;
}
let next_path = queue.lock().unwrap().pop_front();
let Some(path) = next_path else {
break;
};
match task(path) {
Ok(mut generated) => results.lock().unwrap().append(&mut generated),
Err(err) => {
let mut first = first_error.lock().unwrap();
if first.is_none() {
*first = Some(err);
}
break;
}
}
}
})
.map_err(|source| ExportPipelineError::WorkerSpawn {
worker: worker_name,
source,
})?;
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|panic| ExportPipelineError::WorkerPanic {
worker: "export task".to_string(),
message: panic_message(panic),
})?;
}
if let Some(err) = first_error.lock().unwrap().take() {
return Err(err);
}
let mut results = results.lock().unwrap();
Ok(std::mem::take(&mut *results))
}
fn panic_message(panic: Box<dyn std::any::Any + Send>) -> String {
if let Some(message) = panic.downcast_ref::<&str>() {
(*message).to_string()
} else if let Some(message) = panic.downcast_ref::<String>() {
message.clone()
} else {
"unknown worker panic".to_string()
}
}
fn merge_usm_files(dir: &Path, usm_files: &[PathBuf]) -> Result<PathBuf, ExportPipelineError> {
let dir_name = dir
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("merged");
let merged_file = dir.join(format!("{dir_name}.usm"));
let mut target =
std::fs::File::create(&merged_file).map_err(|source| ExportPipelineError::Io {
path: merged_file.clone(),
source,
})?;
for source_path in usm_files {
if *source_path == merged_file {
continue;
}
let mut source =
std::fs::File::open(source_path).map_err(|source| ExportPipelineError::Io {
path: source_path.clone(),
source,
})?;
std::io::copy(&mut source, &mut target).map_err(|source| ExportPipelineError::Io {
path: source_path.clone(),
source,
})?;
drop(source);
remove_file_if_exists(source_path)?;
}
Ok(merged_file)
}
pub fn find_files(dir: &Path) -> Result<Vec<PathBuf>, ExportPipelineError> {
let mut files = Vec::new();
walk(dir, &mut |path| {
files.push(path.to_path_buf());
})?;
Ok(files)
}
pub fn find_files_by_extension(dir: &Path, ext: &str) -> Result<Vec<PathBuf>, ExportPipelineError> {
let target_ext = ext.to_lowercase();
let mut files = Vec::new();
walk(dir, &mut |path| {
if path
.extension()
.and_then(|value| value.to_str())
.map(|value| value.eq_ignore_ascii_case(&target_ext))
.unwrap_or(false)
{
files.push(path.to_path_buf());
}
})?;
Ok(files)
}
pub fn find_files_by_extensions<P>(
dir: &Path,
ext: &[P],
) -> Result<Vec<PathBuf>, ExportPipelineError>
where
P: AsRef<str>,
{
let target_ext = ext
.iter()
.map(|x| x.as_ref().to_lowercase())
.collect::<Vec<_>>();
let mut files = Vec::new();
walk(dir, &mut |path| {
if path
.extension()
.and_then(|value| value.to_str())
.map(|value| target_ext.contains(&value.to_lowercase()))
.unwrap_or(false)
{
files.push(path.to_path_buf());
}
})?;
Ok(files)
}
fn walk(dir: &Path, f: &mut dyn FnMut(&Path)) -> Result<(), ExportPipelineError> {
for entry in std::fs::read_dir(dir).map_err(|source| ExportPipelineError::Io {
path: dir.to_path_buf(),
source,
})? {
let entry = entry.map_err(|source| ExportPipelineError::Io {
path: dir.to_path_buf(),
source,
})?;
let path = entry.path();
let file_type = entry
.file_type()
.map_err(|source| ExportPipelineError::Io {
path: path.clone(),
source,
})?;
if file_type.is_dir() {
walk(&path, f)?;
} else {
f(&path);
}
}
Ok(())
}
fn remove_file_if_exists(path: &Path) -> Result<(), ExportPipelineError> {
if path.exists() {
std::fs::remove_file(path).map_err(|source| ExportPipelineError::Io {
path: path.to_path_buf(),
source,
})?;
}
Ok(())
}

View File

@ -0,0 +1,229 @@
use std::fmt::{Display, Formatter};
use std::path::Path;
use tokio::process::Command;
use crate::core::config::RetryConfig;
use crate::core::errors::ExportPipelineError;
use crate::core::retry::{retry_async, retry_sync};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FrameRate {
pub numerator: i32,
pub denominator: i32,
}
impl FrameRate {
pub fn from_tuple((numerator, denominator): (i32, i32)) -> Self {
Self {
numerator,
denominator,
}
}
}
impl Display for FrameRate {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.denominator <= 1 {
write!(f, "{}", self.numerator)
} else {
write!(f, "{}/{}", self.numerator, self.denominator)
}
}
}
pub async fn convert_usm_to_mp4(
usm_file: &Path,
mp4_file: &Path,
ffmpeg_path: &str,
retry: &RetryConfig,
) -> Result<(), ExportPipelineError> {
retry_async(
retry,
"ffmpeg usm->mp4",
|_| async {
run_ffmpeg(
ffmpeg_path,
&[
"-i",
&usm_file.to_string_lossy(),
"-c:v",
"libx264",
"-c:a",
"aac",
"-b:a",
"192k",
"-movflags",
"+faststart",
"-y",
&mp4_file.to_string_lossy(),
],
)
.run_async()
.await
},
is_retryable_command_error,
)
.await
}
pub async fn convert_m2v_to_mp4(
m2v_file: &Path,
mp4_file: &Path,
delete_original: bool,
ffmpeg_path: &str,
frame_rate: Option<FrameRate>,
retry: &RetryConfig,
) -> Result<(), ExportPipelineError> {
let mut args = Vec::new();
if let Some(rate) = frame_rate {
args.push("-r".to_string());
args.push(rate.to_string());
}
args.push("-i".to_string());
args.push(m2v_file.to_string_lossy().to_string());
args.push("-c:v".to_string());
args.push("libx264".to_string());
if let Some(rate) = frame_rate {
args.push("-r".to_string());
args.push(rate.to_string());
}
args.push("-y".to_string());
args.push(mp4_file.to_string_lossy().to_string());
retry_async(
retry,
"ffmpeg m2v->mp4",
|_| async {
let refs: Vec<&str> = args.iter().map(String::as_str).collect();
run_ffmpeg(ffmpeg_path, &refs).run_async().await
},
is_retryable_command_error,
)
.await?;
if delete_original && m2v_file.exists() {
std::fs::remove_file(m2v_file).map_err(|source| ExportPipelineError::Io {
path: m2v_file.to_path_buf(),
source,
})?;
}
Ok(())
}
pub fn convert_wav_to_mp3(
wav_file: &Path,
mp3_file: &Path,
ffmpeg_path: &str,
retry: &RetryConfig,
) -> Result<(), ExportPipelineError> {
retry_sync(
retry,
"ffmpeg wav->mp3",
|_| {
run_ffmpeg_sync(
ffmpeg_path,
&[
"-i",
&wav_file.to_string_lossy(),
"-b:a",
"320k",
"-y",
&mp3_file.to_string_lossy(),
],
)
},
is_retryable_command_error,
)
}
pub fn convert_wav_to_flac(
wav_file: &Path,
flac_file: &Path,
ffmpeg_path: &str,
retry: &RetryConfig,
) -> Result<(), ExportPipelineError> {
retry_sync(
retry,
"ffmpeg wav->flac",
|_| {
run_ffmpeg_sync(
ffmpeg_path,
&[
"-i",
&wav_file.to_string_lossy(),
"-compression_level",
"12",
"-y",
&flac_file.to_string_lossy(),
],
)
},
is_retryable_command_error,
)
}
fn run_ffmpeg<'a>(ffmpeg_path: &'a str, args: &'a [&'a str]) -> FfmpegCommand<'a> {
FfmpegCommand { ffmpeg_path, args }
}
fn run_ffmpeg_sync(ffmpeg_path: &str, args: &[&str]) -> Result<(), ExportPipelineError> {
let output = std::process::Command::new(ffmpeg_path)
.args(args)
.output()
.map_err(|source| ExportPipelineError::Spawn {
program: ffmpeg_path.to_string(),
source,
})?;
map_command_output(ffmpeg_path, output)
}
struct FfmpegCommand<'a> {
ffmpeg_path: &'a str,
args: &'a [&'a str],
}
impl<'a> FfmpegCommand<'a> {
async fn run_async(self) -> Result<(), ExportPipelineError> {
let output = Command::new(self.ffmpeg_path)
.args(self.args)
.output()
.await
.map_err(|source| ExportPipelineError::Spawn {
program: self.ffmpeg_path.to_string(),
source,
})?;
map_command_output(self.ffmpeg_path, output)
}
}
fn is_retryable_command_error(err: &ExportPipelineError) -> bool {
match err {
ExportPipelineError::Spawn { source, .. } => matches!(
source.kind(),
std::io::ErrorKind::Interrupted
| std::io::ErrorKind::TimedOut
| std::io::ErrorKind::WouldBlock
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
| std::io::ErrorKind::ConnectionRefused
),
ExportPipelineError::CommandFailed { .. } => true,
_ => false,
}
}
fn map_command_output(
program: &str,
output: std::process::Output,
) -> Result<(), ExportPipelineError> {
if output.status.success() {
Ok(())
} else {
Err(ExportPipelineError::CommandFailed {
program: program.to_string(),
status: output.status.to_string(),
stderr: String::from_utf8_lossy(&output.stderr).trim().to_string(),
})
}
}

View File

@ -0,0 +1,8 @@
pub mod asset_execution;
pub mod config;
pub mod export_pipeline;
pub mod regions;
pub mod retry;
pub mod media;
pub mod codec;
pub mod errors;

View File

@ -0,0 +1,30 @@
use regex::Regex;
use crate::core::config::{AppConfig, RegionConfig};
use crate::core::errors::RegionError;
pub fn select_region<'a>(
config: &'a AppConfig,
name: &str,
) -> Result<&'a RegionConfig, RegionError> {
let region = config
.regions
.get(name)
.ok_or_else(|| RegionError::NotFound(name.to_string()))?;
if !region.enabled {
return Err(RegionError::Disabled(name.to_string()));
}
Ok(region)
}
pub(crate) fn compile_patterns(patterns: &[String]) -> Vec<Regex> {
patterns
.iter()
.filter_map(|pattern| Regex::new(pattern).ok())
.collect()
}
pub(crate) fn matches_any(patterns: &[Regex], bundle_name: &str) -> bool {
patterns.iter().any(|regex| regex.is_match(bundle_name))
}

View File

@ -0,0 +1,83 @@
use std::fmt::Display;
use std::future::Future;
use std::time::Duration;
use log::warn;
use tokio::time::sleep;
use crate::core::config::RetryConfig;
pub async fn retry_async<T, E, Op, Fut, ShouldRetry>(
config: &RetryConfig,
operation: &str,
mut op: Op,
should_retry: ShouldRetry,
) -> Result<T, E>
where
E: Display,
Op: FnMut(usize) -> Fut,
Fut: Future<Output = Result<T, E>>,
ShouldRetry: Fn(&E) -> bool,
{
let attempts = config.attempts.max(1);
for attempt in 1..=attempts {
match op(attempt).await {
Ok(value) => return Ok(value),
Err(err) if attempt < attempts && should_retry(&err) => {
let delay = backoff_delay(config, attempt);
warn!(
"operation '{}' failed after {} attempt(s) with error: {}, retrying in {} ms",
operation,
attempt,
err,
delay.as_millis()
);
sleep(delay).await;
}
Err(err) => return Err(err),
}
}
unreachable!("retry_async must return from within the attempt loop")
}
pub fn retry_sync<T, E, Op, ShouldRetry>(
config: &RetryConfig,
operation: &str,
mut op: Op,
should_retry: ShouldRetry,
) -> Result<T, E>
where
E: Display,
Op: FnMut(usize) -> Result<T, E>,
ShouldRetry: Fn(&E) -> bool,
{
let attempts = config.attempts.max(1);
for attempt in 1..=attempts {
match op(attempt) {
Ok(value) => return Ok(value),
Err(err) if attempt < attempts && should_retry(&err) => {
let delay = backoff_delay(config, attempt);
warn!(
"operation '{}' failed after {} attempt(s) with error: {}, retrying in {} ms",
operation,
attempt,
err,
delay.as_millis()
);
std::thread::sleep(delay);
}
Err(err) => return Err(err),
}
}
unreachable!("retry_sync must return from within the attempt loop")
}
fn backoff_delay(config: &RetryConfig, attempt: usize) -> Duration {
let base = config.initial_backoff_ms.max(1);
let max = config.max_backoff_ms.max(base);
let multiplier = 1u64
.checked_shl(attempt.saturating_sub(1) as u32)
.unwrap_or(u64::MAX);
Duration::from_millis(base.saturating_mul(multiplier).min(max))
}

View File

@ -0,0 +1 @@
pub mod core;

21
client/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "client"
version = "0.1.0"
edition = "2024"
[dependencies]
common = { path = "../common" }
communicator = { path = "../communicator" }
bytes = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
structopt = { workspace = true }
lazy_static = { workspace = true }
yaml_serde = { workspace = true }
log = { workspace = true }
simplelog = { workspace = true }
anyhow = { workspace = true }
h2 = { workspace = true }
serde_json = { workspace = true }
tokio-util = "0.7.18"

20
client/src/config.rs Normal file
View File

@ -0,0 +1,20 @@
use common::updater::SyncContext;
use communicator::ConnectConfig;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ClientConfig {
pub log_level: Option<String>,
#[serde(flatten, default)]
pub connect: ConnectConfig,
pub profiles: HashMap<String, Profile>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Profile {
#[serde(flatten)]
pub sync_context: SyncContext,
pub path: String,
pub interval: Option<u64>
}

58
client/src/http.rs Normal file
View File

@ -0,0 +1,58 @@
use crate::config::Profile;
use anyhow::anyhow;
use bytes::Bytes;
use common::http::{CloseRequest, DownloadRequest, SyncResponse};
use common::stream::client_receive;
use communicator::http::{json_from_response, request, response_to_async_read, text_from_response};
use h2::client;
use std::path::Path;
use std::pin::pin;
pub async fn sync(
client: &mut client::SendRequest<Bytes>,
profile: &Profile,
) -> anyhow::Result<SyncResponse> {
let mut response = request(
client,
"/sync",
serde_json::to_string(&profile.sync_context)?,
)
.await?;
if !response.status().is_success() {
let body = text_from_response(&mut response).await?;
Err(anyhow!("Failed to request '/sync': {}", body))?;
}
json_from_response(&mut response).await
}
pub async fn download(
client: &mut client::SendRequest<Bytes>,
req: &DownloadRequest,
profile: &Profile,
) -> anyhow::Result<()> {
let mut response = request(client, "/download", serde_json::to_string(req)?).await?;
if !response.status().is_success() {
let body = text_from_response(&mut response).await?;
Err(anyhow!("Failed to request '/download': {}", body))?;
}
client_receive(
pin!(response_to_async_read(response)),
Path::new(&profile.path),
)
.await
}
pub async fn close(
client: &mut client::SendRequest<Bytes>,
req: &CloseRequest,
) -> anyhow::Result<()> {
let mut response = request(client, "/close", serde_json::to_string(req)?).await?;
if response.status().is_success() {
return Ok(());
}
let body = text_from_response(&mut response).await?;
Err(anyhow!("Failed to close request: {}", body))
}

175
client/src/main.rs Normal file
View File

@ -0,0 +1,175 @@
use crate::config::{ClientConfig, Profile};
use crate::task::run;
use communicator::{Identity, TunnelEndpoint, TunnelListener, connect_tunnel};
use lazy_static::lazy_static;
use log::{LevelFilter, error, info};
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use std::fs;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use structopt::StructOpt;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
mod config;
mod http;
mod task;
#[derive(StructOpt)]
struct CommandOpt {
#[structopt(short = "p", long)]
pub profile: Vec<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let log_level = LevelFilter::from_str(
CONFIG
.log_level
.clone()
.unwrap_or("INFO".to_string())
.as_str(),
)
.unwrap_or(LevelFilter::Info);
TermLogger::init(
log_level,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)?;
let command_opts = CommandOpt::from_args();
if command_opts.profile.is_empty() {
error!("No profile specified. Use -p or --profile to specify at least one profile.");
std::process::exit(1);
}
let profiles = command_opts
.profile
.iter()
.map(|p| {
let profile = CONFIG.profiles.get(p.as_str());
match profile {
Some(profile) => Ok((p.to_string(), profile.clone())),
None => Err(anyhow::anyhow!("Profile `{}` not found in config", p)),
}
})
.collect::<Result<Vec<_>, _>>()?;
let mut tasks = vec![];
for profile in profiles {
let profile = Arc::new(profile.clone());
let semaphore = Arc::new(Semaphore::new(1));
let cancel_token = CancellationToken::new();
let post_task = {
async |profile: Arc<(String, Profile)>,
permit: OwnedSemaphorePermit,
cancel_token: CancellationToken| {
match profile.1.interval {
None => {
cancel_token.cancel();
}
Some(interval) => {
sleep(Duration::from_secs(interval)).await;
}
}
drop(permit);
}
};
for server_conf in &CONFIG.connect.server {
let server_conf = server_conf.clone().into_tunnel_config(Identity::Client)?;
let url = server_conf.url.clone();
let server = TunnelListener::bind(server_conf).await?;
let semaphore = semaphore.clone();
let cancel_token = cancel_token.clone();
let profile = profile.clone();
info!("tcp server started on {}", url);
tasks.push(tokio::task::spawn(async move {
loop {
let endpoint = server
.accept()
.await
.map_err(|e| error!("Failed to accept connection: {}", e));
let endpoint = if let Ok(endpoint) = endpoint {
endpoint
} else {
continue;
};
if let TunnelEndpoint::Client(client) = endpoint {
if cancel_token.is_cancelled() {
return;
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let result = run(client, profile.clone()).await;
match result {
Ok(true) => {
post_task(profile.clone(), permit, cancel_token.clone()).await;
}
Err(error) => {
error!("{}", error);
}
_ => {}
}
}
}
}));
}
for client_conf in &CONFIG.connect.client {
let client_conf = client_conf.clone().into_tunnel_config(Identity::Client);
let semaphore = semaphore.clone();
let cancel_token = cancel_token.clone();
let profile = profile.clone();
info!("tcp client started for {}", client_conf.url);
tasks.push(tokio::task::spawn(async move {
loop {
if cancel_token.is_cancelled() {
return;
}
let endpoint = connect_tunnel(client_conf.clone())
.await
.map_err(|e| error!("Failed to accept connection: {}", e));
let endpoint = if let Ok(endpoint) = endpoint {
endpoint
} else {
continue;
};
if let TunnelEndpoint::Client(client) = endpoint {
if cancel_token.is_cancelled() {
return;
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let result = run(client, profile.clone()).await;
match result {
Ok(true) => {
post_task(profile.clone(), permit, cancel_token.clone()).await;
}
Err(error) => {
error!("{}", error);
}
_ => {}
}
}
sleep(Duration::from_secs(10)).await;
}
}));
}
}
for task in tasks {
let _ = task.await.map_err(|e| error!("{}", e));
}
Ok(())
}
lazy_static! {
pub static ref CONFIG: ClientConfig = {
let raw = fs::read_to_string("sekai-unpacker-client.yaml").unwrap();
let config: ClientConfig = yaml_serde::from_str(raw.as_str()).unwrap();
config
};
}

146
client/src/task.rs Normal file
View File

@ -0,0 +1,146 @@
use crate::config::Profile;
use crate::http::{close, download, sync};
use common::http::{CloseRequest, DownloadRequest};
use communicator::ClientManager;
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{RwLock, Semaphore};
pub async fn run(
client: Arc<ClientManager>,
profile: Arc<(String, Profile)>,
) -> anyhow::Result<bool> {
info!("[{}]: Starting sync", profile.0);
let sync_resp = sync(&mut client.get_client().await?, &profile.1).await?;
let id = sync_resp.id;
let local_manifest = Arc::new(
AutoSaveManifest::new(
5,
Path::new(&profile.1.path)
.join("manifest.json")
.to_path_buf(),
)
.await?,
);
let manifest_snapshot = { local_manifest.manifest.read().await.clone() };
let tasks = sync_resp
.tasks
.into_iter()
.filter(|task| {
let bundle_name = &task.bundle_path;
match manifest_snapshot.bundles.get(bundle_name) {
Some(local_hash) => local_hash != &task.bundle_hash,
None => true,
}
})
.collect::<Vec<_>>();
info!("[{}]: Collected {} tasks", profile.0, tasks.len());
let n = 5;
let semaphore = Arc::new(Semaphore::new(n));
let mut handles = Vec::new();
for task in tasks {
let permit = semaphore.clone().acquire_owned().await?;
let client = client.clone();
let id = id.clone();
let local_manifest = local_manifest.clone();
let profile = profile.clone();
handles.push(tokio::task::spawn(async move {
let req = DownloadRequest {
id: id.clone(),
task: task.clone(),
};
let result = download(&mut client.get_client().await.unwrap(), &req, &profile.1).await;
if let Err(e) = result
&& let Some(_) = e.downcast_ref::<h2::Error>()
{
download(&mut client.get_client().await.unwrap(), &req, &profile.1)
.await
.unwrap();
}
local_manifest
.add_bundle(task.bundle_path.clone(), task.bundle_hash.clone())
.await
.unwrap();
drop(permit);
}));
}
let mut succeed = 0;
let mut failed = 0;
for handle in handles {
let r = handle.await;
if let Err(e) = r {
error!("{}", e);
failed += 1;
} else {
succeed += 1;
}
}
local_manifest.save().await?;
info!(
"[{}]: Sync finished with {} succeed, {} failed",
profile.0, succeed, failed
);
let req = CloseRequest { id: id.clone() };
close(&mut client.get_client().await?, &req).await?;
Ok(failed == 0)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Manifest {
#[serde(default)]
bundles: HashMap<String, String>,
}
pub struct AutoSaveManifest {
manifest: Arc<RwLock<Manifest>>,
counter: AtomicUsize,
save_interval: usize,
storage_path: PathBuf,
}
impl AutoSaveManifest {
pub async fn new(interval: usize, path: PathBuf) -> anyhow::Result<Self> {
Ok(Self {
manifest: Arc::new(RwLock::new(serde_json::from_str(
&tokio::fs::read_to_string(&path)
.await
.unwrap_or("{}".to_owned()),
)?)),
counter: AtomicUsize::new(0),
save_interval: interval,
storage_path: path,
})
}
pub async fn add_bundle(&self, key: String, value: String) -> anyhow::Result<()> {
{
let mut w = self.manifest.write().await;
w.bundles.insert(key, value);
}
let current_count = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
if current_count.is_multiple_of(self.save_interval) {
self.save().await?;
}
Ok(())
}
pub async fn save(&self) -> anyhow::Result<()> {
let data = {
let r = self.manifest.read().await;
serde_json::to_vec(&*r)?
};
tokio::fs::write(&self.storage_path, data).await?;
Ok(())
}
}

11
common/Cargo.toml Normal file
View File

@ -0,0 +1,11 @@
[package]
name = "common"
version = "0.1.0"
edition = "2024"
[dependencies]
tokio = { workspace = true, features = ["io-util", "fs"] }
anyhow = { workspace = true }
h2 = { workspace = true }
bytes = { workspace = true }
serde = { workspace = true, features = ["derive"] }

19
common/src/http.rs Normal file
View File

@ -0,0 +1,19 @@
use crate::updater::DownloadTask;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncResponse {
pub id: String,
pub tasks: Vec<DownloadTask>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadRequest {
pub id: String,
pub task: DownloadTask,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CloseRequest {
pub id: String,
}

3
common/src/lib.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod stream;
pub mod updater;
pub mod http;

85
common/src/stream.rs Normal file
View File

@ -0,0 +1,85 @@
use bytes::{BufMut, Bytes, BytesMut};
use h2::SendStream;
use std::path::Path;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncReadExt};
pub async fn server_send_files<P: AsRef<Path>>(
mut send_stream: SendStream<Bytes>,
base: P,
files: &[P],
) -> Result<(), Box<dyn std::error::Error>> {
let mut header = BytesMut::with_capacity(4);
header.put_u32(files.len() as u32);
send_stream.send_data(header.freeze(), false)?;
for (i, path_ref) in files.iter().enumerate() {
let path = path_ref.as_ref();
let mut file = File::open(path).await?;
let metadata = file.metadata().await?;
let file_name = path
.strip_prefix(base.as_ref())
.map(|n| n.to_string_lossy().to_string())?;
let name_bytes = file_name.as_bytes();
let file_size = metadata.len();
let mut meta_buf = BytesMut::with_capacity(2 + name_bytes.len() + 8);
meta_buf.put_u16(name_bytes.len() as u16);
meta_buf.put_slice(name_bytes);
meta_buf.put_u64(file_size);
send_stream.send_data(meta_buf.freeze(), false)?;
let mut buffer = vec![0u8; 8192];
let mut sent_size = 0u64;
while sent_size < file_size {
let n = file.read(&mut buffer).await?;
if n == 0 {
break;
}
let chunk = Bytes::copy_from_slice(&buffer[..n]);
let is_last_chunk = (i == files.len() - 1) && (sent_size + n as u64 == file_size);
send_stream.send_data(chunk, is_last_chunk)?;
sent_size += n as u64;
}
}
Ok(())
}
pub async fn client_receive(
mut response_body_reader: impl AsyncRead + Unpin,
file_root: &Path,
) -> anyhow::Result<()> {
let file_count = response_body_reader.read_u32().await?;
for _ in 0..file_count {
let name_len = response_body_reader.read_u16().await?;
let mut name_buf = vec![0u8; name_len as usize];
response_body_reader.read_exact(&mut name_buf).await?;
let file_name = String::from_utf8(name_buf)?;
let data_len = response_body_reader.read_u64().await?;
let path = file_root.join(&file_name);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = File::create(&path).await?;
let mut take = response_body_reader.take(data_len);
tokio::io::copy(&mut take, &mut file).await?;
response_body_reader = take.into_inner();
}
Ok(())
}

165
common/src/updater.rs Normal file
View File

@ -0,0 +1,165 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncContext {
pub region: String,
#[serde(default)]
pub filters: RegionFiltersConfig,
#[serde(default)]
pub export: RegionExportConfig,
#[serde(default)]
pub asset_version: Option<String>,
#[serde(default)]
pub asset_hash: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct RegionFiltersConfig {
#[serde(default)]
pub start_app: Vec<String>,
#[serde(default)]
pub on_demand: Vec<String>,
#[serde(default)]
pub skip: Vec<String>,
#[serde(default)]
pub file_ext: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct RegionExportConfig {
#[serde(default)]
pub by_category: bool,
#[serde(default)]
pub usm: UsmExportConfig,
#[serde(default)]
pub acb: AcbExportConfig,
#[serde(default)]
pub hca: HcaExportConfig,
#[serde(default)]
pub images: ImageExportConfig,
#[serde(default)]
pub video: VideoExportConfig,
#[serde(default)]
pub audio: AudioExportConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct UsmExportConfig {
pub export: bool,
pub decode: bool,
}
impl Default for UsmExportConfig {
fn default() -> Self {
Self {
export: true,
decode: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AcbExportConfig {
pub export: bool,
pub decode: bool,
}
impl Default for AcbExportConfig {
fn default() -> Self {
Self {
export: true,
decode: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct HcaExportConfig {
pub decode: bool,
}
impl Default for HcaExportConfig {
fn default() -> Self {
Self { decode: true }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct ImageExportConfig {
pub convert_to_webp: bool,
pub remove_png: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct VideoExportConfig {
pub convert_to_mp4: bool,
pub direct_usm_to_mp4_with_ffmpeg: bool,
pub remove_m2v: bool,
}
impl Default for VideoExportConfig {
fn default() -> Self {
Self {
convert_to_mp4: true,
direct_usm_to_mp4_with_ffmpeg: false,
remove_m2v: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AudioExportConfig {
pub convert_to_mp3: bool,
pub convert_to_flac: bool,
pub remove_wav: bool,
}
impl Default for AudioExportConfig {
fn default() -> Self {
Self {
convert_to_mp3: true,
convert_to_flac: false,
remove_wav: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownloadTask {
pub download_path: String,
pub bundle_path: String,
pub bundle_hash: String,
pub category: AssetCategory,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub enum AssetCategory {
StartApp,
OnDemand,
Other(String),
}
impl<'de> Deserialize<'de> for AssetCategory {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
// Treat nil/null as Other("") — matches Go's zero-value coercion.
let raw = Option::<String>::deserialize(deserializer)?.unwrap_or_default();
Ok(match raw.as_str() {
"StartApp" | "startApp" => Self::StartApp,
"OnDemand" | "onDemand" => Self::OnDemand,
other => Self::Other(other.to_string()),
})
}
}

20
communicator/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "communicator"
version = "0.1.0"
edition = "2024"
[dependencies]
bytes = { workspace = true }
h2 = { workspace = true }
http = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "io-util", "time"] }
tokio-rustls = { workspace = true }
rustls-pemfile = { workspace = true }
webpki-roots = { workspace = true }
anyhow = { workspace = true }
tokio-util = { workspace = true }
futures-util = { workspace = true }
futures = "0.3.32"

View File

@ -0,0 +1,48 @@
use crate::stream::Identity;
use crate::{ClientTunnelConfig, ServerTunnelConfig, SslConfig};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ConnectConfig {
#[serde(default)]
pub server: Vec<TcpServerTunnelConfig>,
#[serde(default)]
pub client: Vec<TcpClientTunnelConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TcpServerTunnelConfig {
pub url: String,
#[serde(flatten)]
pub cert: Option<SslConfig>,
pub token: String,
}
impl TcpServerTunnelConfig {
pub fn into_tunnel_config(self, identity: Identity) -> anyhow::Result<ServerTunnelConfig> {
Ok(ServerTunnelConfig {
identity,
url: self.url,
cert: self.cert,
token: self.token,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TcpClientTunnelConfig {
pub host: Option<String>,
pub url: String,
pub token: String,
}
impl TcpClientTunnelConfig {
pub fn into_tunnel_config(self, identity: Identity) -> ClientTunnelConfig {
ClientTunnelConfig {
identity,
host: self.host,
url: self.url,
token: self.token,
}
}
}

216
communicator/src/http.rs Normal file
View File

@ -0,0 +1,216 @@
use crate::stream::ServerManager;
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::stream::{self};
use h2::server::SendResponse;
use h2::{RecvStream, client};
use http::{Request, Response, StatusCode};
use log::{debug, error};
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use tokio_util::io::StreamReader;
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
type Handler = Arc<
dyn Fn(Request<RecvStream>, SendResponse<Bytes>) -> BoxFuture<Result<(), h2::Error>>
+ Send
+ Sync,
>;
pub struct Router {
routes: HashMap<String, Handler>,
}
impl Default for Router {
fn default() -> Self {
Self::new()
}
}
impl Router {
pub fn new() -> Self {
Self {
routes: HashMap::new(),
}
}
pub fn add_route<F, Fut>(&mut self, path: &str, handler: F)
where
F: Fn(Request<RecvStream>, SendResponse<Bytes>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), h2::Error>> + Send + 'static,
{
self.routes.insert(
path.to_string(),
Arc::new(move |req, res| Box::pin(handler(req, res))),
);
}
pub async fn dispatch(
&self,
req: Request<RecvStream>,
mut res: SendResponse<Bytes>,
) -> Result<(), h2::Error> {
let path = req.uri().path();
debug!("Received request for path: {}", path);
if let Some(handler) = self.routes.get(path) {
handler(req, res).await
} else {
// 404
let response = Response::builder().status(404).body(()).unwrap();
res.send_response(response, true)?;
Ok(())
}
}
}
pub struct Server {
router: Arc<Router>,
}
impl Server {
pub fn new(router: Arc<Router>) -> Self {
Self { router }
}
pub async fn on_conn(&self, server: Arc<ServerManager>) -> anyhow::Result<()> {
let router = self.router.clone();
while let Some(result) = server.accept().await {
let (request, respond) = result?;
let r = router.clone();
tokio::spawn(async move {
if let Err(e) = r.dispatch(request, respond).await {
error!("Handler error: {:?}", e);
}
});
}
Ok(())
}
}
pub fn send(mut res: SendResponse<Bytes>, status: u16, content_type: &str, body: String) {
let response = Response::builder()
.status(status)
.header("content-type", content_type)
.body(())
.unwrap();
if let Ok(mut send_stream) = res.send_response(response, false) {
let _ = send_stream.send_data(Bytes::from(body), true);
}
}
pub fn send_error(mut res: SendResponse<Bytes>, error: anyhow::Error) {
let response = Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("content-type", "text/plain")
.body(())
.unwrap();
if let Ok(mut send_stream) = res.send_response(response, false) {
let error_msg = format!("Internal Server Error: {}", error);
let _ = send_stream.send_data(Bytes::from(error_msg), true);
}
}
pub async fn json_from_request<T>(req: &mut Request<RecvStream>) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let body_stream = req.body_mut();
let mut buf = BytesMut::new();
while let Some(chunk) = body_stream.data().await {
let data = chunk?;
let len = data.len();
buf.put(data);
body_stream.flow_control().release_capacity(len)?;
}
if buf.is_empty() {
return Err(anyhow::anyhow!("Request body is empty"));
}
let result = serde_json::from_slice(&buf)?;
Ok(result)
}
pub async fn request(
client: &mut client::SendRequest<Bytes>,
path: &str,
body: String,
) -> anyhow::Result<Response<RecvStream>> {
let request = Request::builder()
.method("POST")
.uri("http://0.0.0.0".to_owned() + path)
.header("content-type", "application/json")
.body(())?;
let (response, mut send_stream) = client.send_request(request, false)?;
send_stream.send_data(Bytes::from(body), true)?;
Ok(response.await?)
}
async fn bytes_from_response(response: &mut Response<RecvStream>) -> anyhow::Result<Bytes> {
let body_stream = response.body_mut();
let mut buf = BytesMut::new();
while let Some(chunk) = body_stream.data().await {
let data = chunk?;
let len = data.len();
buf.put(data);
body_stream.flow_control().release_capacity(len)?;
}
Ok(buf.freeze())
}
pub async fn text_from_response(response: &mut Response<RecvStream>) -> anyhow::Result<String> {
let buf = bytes_from_response(response).await?;
if buf.is_empty() {
return Ok(String::new());
}
Ok(String::from_utf8_lossy(&buf).to_string())
}
pub async fn json_from_response<T>(response: &mut Response<RecvStream>) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let buf = bytes_from_response(response).await?;
if buf.is_empty() {
return Err(anyhow::anyhow!("Request body is empty"));
}
let result = serde_json::from_slice(&buf)?;
Ok(result)
}
pub fn response_to_async_read(res: Response<RecvStream>) -> impl tokio::io::AsyncRead {
let body = res.into_body();
let byte_stream = stream::unfold(body, |mut body| async move {
match body.data().await {
Some(Ok(bytes)) => {
let len = bytes.len();
if let Err(e) = body.flow_control().release_capacity(len) {
return Some((Err(io::Error::other(e)), body));
}
Some((Ok::<_, io::Error>(bytes), body))
}
Some(Err(e)) => Some((Err(io::Error::other(e)), body)),
None => None,
}
});
StreamReader::new(byte_stream)
}

9
communicator/src/lib.rs Normal file
View File

@ -0,0 +1,9 @@
mod config;
pub mod http;
mod stream;
pub use config::{ConnectConfig, TcpClientTunnelConfig, TcpServerTunnelConfig};
pub use stream::{
ClientManager, ClientTunnelConfig, Identity, ServerManager, ServerTunnelConfig, SslConfig,
TunnelEndpoint, TunnelListener, connect_tunnel,
};

611
communicator/src/stream.rs Normal file
View File

@ -0,0 +1,611 @@
use anyhow::anyhow;
use bytes::Bytes;
use h2::{RecvStream, client, server};
use http::Request;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, Notify};
use tokio::time::{Duration, sleep, timeout};
use tokio_rustls::rustls;
use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName};
use tokio_rustls::{TlsAcceptor, TlsConnector};
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Identity {
Server,
Client,
}
impl Identity {
pub fn as_u8(&self) -> u8 {
match self {
Identity::Server => b'S',
Identity::Client => b'C',
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientTunnelConfig {
pub host: Option<String>,
pub url: String,
pub token: String,
pub identity: Identity,
}
impl Default for ClientTunnelConfig {
fn default() -> Self {
Self {
host: None,
url: "127.0.0.1:3333".to_string(),
token: "super_secret_magic_token".to_string(),
identity: Identity::Client,
}
}
}
unsafe impl Send for ClientTunnelConfig {}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SslConfig {
pub cert: String,
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerTunnelConfig {
pub url: String,
pub cert: Option<SslConfig>,
pub token: String,
pub identity: Identity,
}
impl Default for ServerTunnelConfig {
fn default() -> Self {
Self {
url: "127.0.0.1:3333".to_string(),
cert: None,
token: "super_secret_magic_token".to_string(),
identity: Identity::Client,
}
}
}
static CLIENT_HELLO: &[u8] = b"Sekai unpacker client hello";
static SERVER_HELLO: &[u8] = b"Sekai unpacker server hello";
static TLS_BOOTSTRAP_MAGIC: &[u8] = b"SPTLSP1";
static RESUME_MAGIC: &[u8] = b"SPRESM1";
pub enum TunnelEndpointRaw {
Client(Box<client::SendRequest<Bytes>>),
Server(Box<server::Connection<TcpStream, Bytes>>),
}
#[derive(Clone)]
pub enum TunnelEndpoint {
Client(Arc<ClientManager>),
Server(Arc<ServerManager>),
}
pub struct ClientManager {
pub session_id: AtomicU64,
pub current_client: Mutex<Option<client::SendRequest<Bytes>>>,
pub config: Option<ClientTunnelConfig>,
pub notify: Arc<Notify>,
}
impl ClientManager {
pub async fn get_client(&self) -> anyhow::Result<client::SendRequest<Bytes>> {
loop {
let cached = { self.current_client.lock().await.clone() };
if let Some(mut c) = cached {
if futures::future::poll_fn(|cx| c.poll_ready(cx))
.await
.is_ok()
{
return Ok(c);
}
debug!("Client physical connection dead, preparing to recover...");
}
let mut lock = self.current_client.lock().await;
if lock.is_some()
&& futures::future::poll_fn(|cx| lock.as_mut().unwrap().poll_ready(cx))
.await
.is_ok()
{
continue;
}
if let Some(config) = &self.config {
let mut retry_delay = Duration::from_secs(1);
let mut sid = self.session_id.load(Ordering::Relaxed);
let new_client = loop {
match do_client_reconnect(config, &mut sid).await {
Ok(TunnelEndpointRaw::Client(c)) => {
self.session_id.store(sid, Ordering::Relaxed);
break c;
}
Ok(_) => return Err(anyhow::anyhow!("Identity mismatch on reconnect")),
Err(e) => {
error!("Reconnect failed: {}. Retrying in {:?}...", e, retry_delay);
drop(e);
sleep(retry_delay).await;
retry_delay = std::cmp::min(retry_delay * 2, Duration::from_secs(30));
}
}
};
*lock = Some(*new_client.clone());
return Ok(*new_client);
} else {
*lock = None;
drop(lock);
match timeout(Duration::from_secs(15), self.notify.notified()).await {
Ok(_) => continue,
Err(_) => {
anyhow::bail!("Session did not reconnect within 15s. Throwing error!")
}
}
}
}
}
}
pub struct ServerManager {
pub session_id: AtomicU64,
pub current_server: Mutex<Option<server::Connection<TcpStream, Bytes>>>,
pub config: Option<ClientTunnelConfig>,
pub notify: Arc<Notify>,
}
impl ServerManager {
pub async fn accept(
&self,
) -> Option<Result<(Request<RecvStream>, server::SendResponse<Bytes>), h2::Error>> {
loop {
let mut conn_guard = self.current_server.lock().await;
if let Some(conn) = conn_guard.as_mut() {
if let Some(res) = conn.accept().await {
return Some(res);
}
*conn_guard = None;
log::warn!("Server physical connection dropped, waiting for recovery...");
}
drop(conn_guard);
if let Some(config) = &self.config {
let mut retry_delay = Duration::from_secs(1);
let mut sid = self.session_id.load(Ordering::Relaxed);
let new_server = loop {
match do_client_reconnect(config, &mut sid).await {
Ok(TunnelEndpointRaw::Server(s)) => {
self.session_id.store(sid, Ordering::Relaxed);
break s;
}
Ok(_) => {
error!("Identity mismatch on reconnect");
return None;
}
Err(e) => {
error!("Reconnect failed: {}. Retrying in {:?}...", e, retry_delay);
drop(e);
sleep(retry_delay).await;
retry_delay = std::cmp::min(retry_delay * 2, Duration::from_secs(30));
}
}
};
*self.current_server.lock().await = Some(*new_server);
continue;
} else {
match timeout(Duration::from_secs(15), self.notify.notified()).await {
Ok(_) => continue,
Err(_) => {
error!(
"Session did not reconnect within 15s. Throwing error (returning None)!"
);
return None;
}
}
}
}
}
}
enum ResumeResult {
NotResume(TcpStream),
NewSession(TunnelEndpointRaw, u64),
ResumedExisting,
Invalid,
}
pub struct TunnelListener {
listener: TcpListener,
config: ServerTunnelConfig,
pending_plain_sessions: Mutex<HashSet<u64>>,
next_session_id: AtomicU64,
active_sessions: Mutex<HashMap<u64, TunnelEndpoint>>,
}
impl TunnelListener {
pub async fn bind(config: ServerTunnelConfig) -> anyhow::Result<Self> {
let listener = TcpListener::bind(&config.url).await?;
info!("TCP tunnel listener bound to {}", &config.url);
Ok(Self {
listener,
config,
pending_plain_sessions: Mutex::new(HashSet::new()),
next_session_id: AtomicU64::new(1),
active_sessions: Mutex::new(HashMap::new()),
})
}
pub async fn accept(&self) -> anyhow::Result<TunnelEndpoint> {
loop {
let (stream, peer_addr) = self.listener.accept().await?;
debug!("[{}] Connected on tcp", peer_addr);
if is_tls_client_hello(&stream).await {
if let Err(e) = self.handle_tls_bootstrap(stream, peer_addr).await {
error!("[{}] TLS bootstrap failed: {}", peer_addr, e);
}
continue;
}
let mut stream = match self.try_resume_plain_session(stream, peer_addr).await? {
ResumeResult::NewSession(ep_raw, sid) => {
let ep = wrap_raw_endpoint(sid, ep_raw, None);
self.active_sessions.lock().await.insert(sid, ep.clone());
return Ok(ep);
}
ResumeResult::ResumedExisting => continue,
ResumeResult::Invalid => continue,
ResumeResult::NotResume(s) => s,
};
if perform_server_handshake(&mut stream, &self.config.token, self.config.identity)
.await
.is_err()
{
debug!("[{}] Plain handshake failed", peer_addr);
continue;
}
debug!(
"[{}] Plain handshake completed, upgrading to H2...",
peer_addr
);
let ep_raw = upgrade_to_h2_raw(stream, self.config.identity)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
let sid = self.next_session_id.fetch_add(1, Ordering::Relaxed);
let ep = wrap_raw_endpoint(sid, ep_raw, None);
self.active_sessions.lock().await.insert(sid, ep.clone());
return Ok(ep);
}
}
async fn handle_tls_bootstrap(
&self,
stream: TcpStream,
peer_addr: std::net::SocketAddr,
) -> Result<(), Box<dyn Error>> {
let cert_cfg = self
.config
.cert
.as_ref()
.ok_or("TLS is not enabled on server")?;
let acceptor = TlsAcceptor::from(build_server_tls_config(cert_cfg)?);
let mut tls_stream = acceptor.accept(stream).await?;
perform_server_handshake(&mut tls_stream, &self.config.token, self.config.identity).await?;
let mut magic = vec![0u8; TLS_BOOTSTRAP_MAGIC.len()];
tls_stream.read_exact(&mut magic).await?;
if magic != TLS_BOOTSTRAP_MAGIC {
return Err("TLS bootstrap marker mismatch".into());
}
let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
{
let mut pending = self.pending_plain_sessions.lock().await;
pending.insert(session_id);
}
tls_stream.write_all(TLS_BOOTSTRAP_MAGIC).await?;
tls_stream.write_all(&session_id.to_be_bytes()).await?;
debug!(
"[{}] TLS bootstrap done, issued plain-H2 session {}",
peer_addr, session_id
);
Ok(())
}
async fn try_resume_plain_session(
&self,
mut stream: TcpStream,
peer_addr: std::net::SocketAddr,
) -> anyhow::Result<ResumeResult> {
let mut peek_buf = vec![0u8; RESUME_MAGIC.len()];
let n = stream.peek(&mut peek_buf).await?;
if n < RESUME_MAGIC.len() || peek_buf != RESUME_MAGIC {
return Ok(ResumeResult::NotResume(stream));
}
let mut magic = vec![0u8; RESUME_MAGIC.len()];
stream.read_exact(&mut magic).await?;
let mut sid_buf = [0u8; 8];
stream.read_exact(&mut sid_buf).await?;
let session_id = u64::from_be_bytes(sid_buf);
let is_pending = {
let mut pending = self.pending_plain_sessions.lock().await;
pending.remove(&session_id)
};
if is_pending {
let ep_raw = upgrade_to_h2_raw(stream, self.config.identity)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
return Ok(ResumeResult::NewSession(ep_raw, session_id));
}
let active = self.active_sessions.lock().await.get(&session_id).cloned();
if let Some(ep) = active {
let ep_raw = upgrade_to_h2_raw(stream, self.config.identity)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
update_endpoint(&ep, ep_raw).await;
info!(
"[{}] Successfully resumed existing session {}",
peer_addr, session_id
);
return Ok(ResumeResult::ResumedExisting);
}
error!("[{}] Invalid plain-H2 session {}", peer_addr, session_id);
Ok(ResumeResult::Invalid)
}
}
async fn is_tls_client_hello(stream: &TcpStream) -> bool {
let mut header = [0u8; 3];
match stream.peek(&mut header).await {
Ok(n) if n >= 3 => header[0] == 0x16 && header[1] == 0x03 && (1..=4).contains(&header[2]),
_ => false,
}
}
async fn perform_server_handshake<S>(
stream: &mut S,
token: &str,
identity: Identity,
) -> Result<(), Box<dyn Error>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let mut client_hello_buf = vec![0u8; CLIENT_HELLO.len()];
stream.read_exact(&mut client_hello_buf).await?;
if client_hello_buf != CLIENT_HELLO {
return Err("Client Hello mismatch".into());
}
let mut token_buf = vec![0u8; token.len()];
stream.read_exact(&mut token_buf).await?;
if String::from_utf8_lossy(&token_buf) != token {
return Err("Wrong token".into());
}
stream.write_all(SERVER_HELLO).await?;
let mut peer_id_buf = [0u8; 1];
stream.read_exact(&mut peer_id_buf).await?;
stream.write_all(&[identity.as_u8()]).await?;
if peer_id_buf[0] == identity.as_u8() {
return Err("Identity collision with peer".into());
}
Ok(())
}
async fn perform_client_handshake<S>(
stream: &mut S,
token: &str,
identity: Identity,
) -> anyhow::Result<()>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
stream.write_all(CLIENT_HELLO).await?;
stream.write_all(token.as_bytes()).await?;
let mut server_hello_buf = vec![0u8; SERVER_HELLO.len()];
stream.read_exact(&mut server_hello_buf).await?;
if server_hello_buf != SERVER_HELLO {
return Err(anyhow!("Server Hello mismatch"));
}
stream.write_all(&[identity.as_u8()]).await?;
let mut peer_id_buf = [0u8; 1];
stream.read_exact(&mut peer_id_buf).await?;
if peer_id_buf[0] == identity.as_u8() {
return Err(anyhow!("Identity collision with server"));
}
Ok(())
}
fn build_server_tls_config(
cert_cfg: &SslConfig,
) -> Result<Arc<rustls::ServerConfig>, Box<dyn Error>> {
let cert_file = File::open(&cert_cfg.cert)?;
let mut cert_reader = BufReader::new(cert_file);
let cert_chain: Vec<CertificateDer<'static>> =
rustls_pemfile::certs(&mut cert_reader).collect::<Result<Vec<_>, _>>()?;
if cert_chain.is_empty() {
return Err("No certificate found in PEM".into());
}
let key_file = File::open(&cert_cfg.key)?;
let mut key_reader = BufReader::new(key_file);
let private_key =
rustls_pemfile::private_key(&mut key_reader)?.ok_or("No private key found in PEM")?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, private_key)?;
Ok(Arc::new(server_config))
}
fn build_client_tls_connector() -> TlsConnector {
let root_store =
rustls::RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
TlsConnector::from(Arc::new(config))
}
async fn upgrade_to_h2_raw(
stream: TcpStream,
identity: Identity,
) -> anyhow::Result<TunnelEndpointRaw> {
match identity {
Identity::Client => {
let (h2_client, h2_conn) = client::handshake(stream).await?;
tokio::spawn(async move {
if let Err(e) = h2_conn.await {
debug!("H2 connection driver finished/error: {:?}", e);
}
});
Ok(TunnelEndpointRaw::Client(Box::new(h2_client)))
}
Identity::Server => {
let h2_conn = server::handshake(stream).await?;
Ok(TunnelEndpointRaw::Server(Box::new(h2_conn)))
}
}
}
fn wrap_raw_endpoint(
sid: u64,
raw: TunnelEndpointRaw,
config: Option<ClientTunnelConfig>,
) -> TunnelEndpoint {
match raw {
TunnelEndpointRaw::Client(c) => TunnelEndpoint::Client(Arc::new(ClientManager {
session_id: AtomicU64::new(sid),
current_client: Mutex::new(Some(*c)),
config,
notify: Arc::new(Notify::new()),
})),
TunnelEndpointRaw::Server(s) => TunnelEndpoint::Server(Arc::new(ServerManager {
session_id: AtomicU64::new(sid),
current_server: Mutex::new(Some(*s)),
config,
notify: Arc::new(Notify::new()),
})),
}
}
async fn update_endpoint(ep: &TunnelEndpoint, ep_raw: TunnelEndpointRaw) {
match (ep, ep_raw) {
(TunnelEndpoint::Client(mgr), TunnelEndpointRaw::Client(c)) => {
*mgr.current_client.lock().await = Some(*c);
mgr.notify.notify_waiters();
}
(TunnelEndpoint::Server(mgr), TunnelEndpointRaw::Server(s)) => {
*mgr.current_server.lock().await = Some(*s);
mgr.notify.notify_waiters();
}
_ => error!("Identity mismatch during session resume!"),
}
}
pub async fn connect_tunnel(config: ClientTunnelConfig) -> Result<TunnelEndpoint, Box<dyn Error>> {
info!("Connecting to tunnel at {}", &config.url);
let mut sid = 0;
let ep_raw = do_client_reconnect(&config, &mut sid).await?;
Ok(wrap_raw_endpoint(sid, ep_raw, Some(config)))
}
async fn do_client_reconnect(
config: &ClientTunnelConfig,
current_sid: &mut u64,
) -> anyhow::Result<TunnelEndpointRaw> {
if *current_sid != 0 {
match resume_tunnel_client(config, *current_sid).await {
Ok(ep) => {
info!("Resumed existing session {}", current_sid);
return Ok(ep);
}
Err(e) => {
log::warn!("Resume failed: {}. Falling back to full bootstrap.", e);
}
}
}
if let Some(host) = config.host.clone() {
let (sid, _) = bootstrap_tls_and_get_sid(config, &host).await?;
*current_sid = sid;
resume_tunnel_client(config, sid).await
} else {
let mut stream = TcpStream::connect(&config.url).await?;
perform_client_handshake(&mut stream, &config.token, config.identity).await?;
let raw = upgrade_to_h2_raw(stream, config.identity).await?;
*current_sid = 0;
Ok(raw)
}
}
async fn bootstrap_tls_and_get_sid(
config: &ClientTunnelConfig,
host: &str,
) -> anyhow::Result<(u64, ())> {
let connector = build_client_tls_connector();
let tcp = TcpStream::connect(&config.url).await?;
let server_name = ServerName::try_from(host.to_string())
.map_err(|_| anyhow!("Invalid TLS host: {}", host))?
.to_owned();
let mut tls_stream = connector
.connect(server_name, tcp)
.await
.map_err(|e| anyhow!("TLS handshake failed (server may not support TLS): {}", e))?;
perform_client_handshake(&mut tls_stream, &config.token, config.identity).await?;
tls_stream.write_all(TLS_BOOTSTRAP_MAGIC).await?;
let mut magic = vec![0u8; TLS_BOOTSTRAP_MAGIC.len()];
tls_stream.read_exact(&mut magic).await?;
if magic != TLS_BOOTSTRAP_MAGIC {
return Err(anyhow!("TLS bootstrap response mismatch"));
}
let mut sid_buf = [0u8; 8];
tls_stream.read_exact(&mut sid_buf).await?;
Ok((u64::from_be_bytes(sid_buf), ()))
}
async fn resume_tunnel_client(
config: &ClientTunnelConfig,
session_id: u64,
) -> anyhow::Result<TunnelEndpointRaw> {
let mut plain_stream = TcpStream::connect(&config.url).await?;
plain_stream.write_all(RESUME_MAGIC).await?;
plain_stream.write_all(&session_id.to_be_bytes()).await?;
upgrade_to_h2_raw(plain_stream, config.identity).await
}

View File

@ -0,0 +1,39 @@
log_level: "DEBUG"
client:
- url: "127.0.0.1:3333"
token: abc
host: "local.bluemangoo.net"
profiles:
cn:
region: cn
interval: 3
filters:
start_app:
- "thumbnail"
on_demand: [ ]
skip: [ ]
file_ext: [ ]
export:
by_category: false
usm:
export: true
decode: true
acb:
export: true
decode: true
hca:
decode: true
images:
convert_to_webp: false
remove_png: false
video:
convert_to_mp4: false
direct_usm_to_mp4_with_ffmpeg: false
remove_m2v: false
audio:
convert_to_mp3: false
convert_to_flac: false
remove_wav: false
path: "./data/cn"

View File

@ -0,0 +1,59 @@
log_level: "DEBUG"
server:
- url: 127.0.0.1:3333
token: abc
cert: "D:\\WorkDir\\Nginx\\cert\\_.bluemangoo.net\\_.bluemangoo.net-chain.pem"
key: "D:\\WorkDir\\Nginx\\cert\\_.bluemangoo.net\\_.bluemangoo.net-key.pem"
execution:
proxy: ""
timeout_seconds: 300
allow_cancel: true
batch_save_size: 50
retry:
attempts: 4
initial_backoff_ms: 1000
max_backoff_ms: 4000
tools:
ffmpeg_path: "ffmpeg"
asset_studio_cli_path: "D:\\Workspace\\AssetStudio\\AssetStudioCLI\\bin\\Release\\net9.0\\AssetStudioModCLI.exe"
concurrency:
download: 4
upload: 4
acb: 8
usm: 4
hca: 16
regions:
jp:
enabled: true
provider:
kind: colorful_palette
asset_info_url_template: ""
asset_bundle_url_template: ""
profile: "production"
profile_hashes: { assetbundleHostHash: cf2d2388 }
required_cookies: true
crypto:
aes_key_hex: "6732666343305a637a4e394d544a3631"
aes_iv_hex: "6d737833495630693958453575595a31"
runtime:
unity_version: "2022.3.21f1"
cn:
enabled: true
provider:
kind: nuverse
asset_version_url: "https://lf3-mkcncdn-tos.dailygn.com/obj/rt-game-lf/gdl_app_5236/Mainland/{app_version}/Release/cn_online/ios/version"
app_version: "5.2.0"
asset_info_url_template: "https://lf3-mkcncdn-tos.dailygn.com/obj/sf-game-lf/gdl_app_5236/AssetBundle/{app_version}/Release/cn_online/ios{asset_version}/AssetBundleInfoNew.json"
asset_bundle_url_template: "https://lf3-mkcncdn-tos.dailygn.com/obj/sf-game-lf/gdl_app_5236/AssetBundle/{app_version}/Release/cn_online/{bundle_path}"
required_cookies: false
crypto:
aes_key_hex: "6732666343305a637a4e394d544a3631"
aes_iv_hex: "6d737833495630693958453575595a31"
runtime:
unity_version: "2022.3.21f1"

23
server/Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "server"
version = "0.1.0"
edition = "2024"
[dependencies]
common = { path = "../common" }
communicator = { path = "../communicator" }
assets-updater = { path = "../assets-updater" }
tokio = { workspace = true, features = ["rt-multi-thread", "rt", "macros"] }
anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
yaml_serde = { workspace = true }
moka = { workspace = true, features = ["future"] }
uuid = { workspace = true, features = ["v4"] }
h2 = { workspace = true }
bytes = { workspace = true }
http = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
simplelog = { workspace = true }

12
server/src/config.rs Normal file
View File

@ -0,0 +1,12 @@
use assets_updater::core::config::AppConfig;
use communicator::ConnectConfig;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ServerConfig {
pub log_level: Option<String>,
#[serde(flatten, default)]
pub connect: ConnectConfig,
#[serde(flatten, default)]
pub updater_config: AppConfig,
}

115
server/src/main.rs Normal file
View File

@ -0,0 +1,115 @@
mod config;
mod router;
mod session;
use crate::config::ServerConfig;
use crate::router::build_routers;
use crate::session::SessionStore;
use assets_updater::core::asset_execution::AssetExecutionContext;
use communicator::http::Server;
use communicator::{Identity, TunnelEndpoint, TunnelListener, connect_tunnel};
use lazy_static::lazy_static;
use log::{LevelFilter, error, info};
use moka::future::Cache;
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use std::fs;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let log_level = LevelFilter::from_str(
CONFIG
.log_level
.clone()
.unwrap_or("INFO".to_string())
.as_str(),
)
.unwrap_or(LevelFilter::Info);
TermLogger::init(
log_level,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)?;
let routers = Arc::new(build_routers());
let http_server = Arc::new(Server::new(routers.clone()));
let mut tasks = vec![];
for server_conf in &CONFIG.connect.server {
let server_conf = server_conf.clone().into_tunnel_config(Identity::Server)?;
let url = server_conf.url.clone();
let server = TunnelListener::bind(server_conf).await?;
info!("tcp server started on {}", url);
let http_server = http_server.clone();
tasks.push(tokio::task::spawn(async move {
loop {
let endpoint = server
.accept()
.await
.map_err(|e| error!("Failed to accept connection: {}", e));
let endpoint = if let Ok(endpoint) = endpoint {
endpoint
} else {
continue;
};
if let TunnelEndpoint::Server(connection) = endpoint {
let result = http_server.on_conn(connection).await;
if let Err(e) = result {
error!("Failed to handle connection: {}", e);
}
}
}
}));
}
for client_conf in &CONFIG.connect.client {
let client_conf = client_conf.clone().into_tunnel_config(Identity::Server);
let http_server = http_server.clone();
info!("tcp client started for {}", client_conf.url);
tasks.push(tokio::task::spawn(async move {
loop {
let endpoint = connect_tunnel(client_conf.clone())
.await
.map_err(|e| error!("Failed to accept connection: {}", e));
let endpoint = if let Ok(endpoint) = endpoint {
endpoint
} else {
continue;
};
if let TunnelEndpoint::Server(connection) = endpoint {
let result = http_server.on_conn(connection).await;
if let Err(e) = result {
error!("Failed to handle connection: {}", e);
}
}
sleep(Duration::from_secs(10)).await;
}
}));
}
for task in tasks {
let _ = task.await.map_err(|e| error!("{}", e));
}
Ok(())
}
lazy_static! {
pub static ref SESSION_STORE: SessionStore<AssetExecutionContext> = SessionStore::new(
Cache::builder()
.time_to_idle(Duration::from_hours(3))
.max_capacity(10000),
);
pub static ref CONFIG: ServerConfig = {
let raw = fs::read_to_string("sekai-unpacker-server.yaml").unwrap();
let config: ServerConfig = yaml_serde::from_str(raw.as_str()).unwrap();
config.updater_config.validate().unwrap();
config
};
}

View File

@ -0,0 +1,46 @@
use crate::SESSION_STORE;
use bytes::Bytes;
use common::http::CloseRequest;
use communicator::http::{json_from_request, send, send_error};
use h2::RecvStream;
use h2::server::SendResponse;
use http::Request;
pub async fn close(
mut request: Request<RecvStream>,
send_response: SendResponse<Bytes>,
) -> Result<(), h2::Error> {
let body = json_from_request(&mut request).await;
if let Err(error) = body {
send_error(send_response, error);
return Ok(());
}
let req_body: CloseRequest = body.unwrap();
let context = SESSION_STORE.remove(&req_body.id).await;
match context {
Some(_) => {
send(
send_response,
200,
"application/json",
serde_json::json!({
"msg": "OK"
})
.to_string(),
);
}
None => {
send(
send_response,
500,
"application/json",
serde_json::json!({
"msg": "invalid session id"
})
.to_string(),
);
}
}
Ok(())
}

View File

@ -0,0 +1,91 @@
use crate::{CONFIG, SESSION_STORE};
use assets_updater::core::export_pipeline::{find_files, find_files_by_extensions};
use bytes::Bytes;
use common::http::DownloadRequest;
use common::stream::server_send_files;
use communicator::http::{json_from_request, send, send_error};
use h2::RecvStream;
use h2::server::SendResponse;
use http::{Request, Response};
pub async fn download(
mut request: Request<RecvStream>,
mut send_response: SendResponse<Bytes>,
) -> Result<(), h2::Error> {
let body = json_from_request(&mut request).await;
if let Err(error) = body {
send_error(send_response, error);
return Ok(());
}
let req_body: DownloadRequest = body.unwrap();
let id = req_body.id;
let context = SESSION_STORE.get(&id).await;
if context.is_none() {
send(
send_response,
200,
"application/json",
serde_json::json!({
"msg": "invalid session id"
})
.to_string(),
);
return Ok(());
}
let context = context.unwrap();
let dir = context
.download(&req_body.task, &CONFIG.updater_config)
.await;
if let Err(error) = dir {
send_error(send_response, error.into());
return Ok(());
}
let dir = dir.unwrap();
let files = if !dir.is_dir() {
if context.sync_context.filters.file_ext.is_empty()
|| dir.extension().is_some_and(|t| {
context
.sync_context
.filters
.file_ext
.contains(&t.to_str().unwrap().to_lowercase())
})
{
vec![dir.clone()]
} else {
vec![]
}
} else {
let files = if context.sync_context.filters.file_ext.is_empty() {
find_files(&dir)
} else {
find_files_by_extensions(&dir, &context.sync_context.filters.file_ext)
};
if let Err(error) = files {
send_error(send_response, error.into());
return Ok(());
}
files.unwrap()
};
let response = Response::builder()
.status(200)
.header("content-type", "application/x-sekai-stream")
.body(())
.unwrap();
let dir_base = std::env::temp_dir()
.join("sekai-updater")
.join("extract")
.join(&context.sync_context.region);
if let Ok(send_stream) = send_response.send_response(response, false) {
let _ = server_send_files(send_stream, dir_base, &files).await;
}
let _ = std::fs::remove_file(dir);
Ok(())
}

18
server/src/router/mod.rs Normal file
View File

@ -0,0 +1,18 @@
mod close;
mod download;
mod sync;
use crate::router::close::close;
use crate::router::download::download;
use crate::router::sync::sync_route;
use communicator::http::Router;
pub fn build_routers() -> Router {
let mut router = Router::new();
router.add_route("/sync", sync_route);
router.add_route("/download", download);
router.add_route("/close", close);
router
}

55
server/src/router/sync.rs Normal file
View File

@ -0,0 +1,55 @@
use crate::{CONFIG, SESSION_STORE};
use assets_updater::core::asset_execution::{
should_download_bundle, AssetExecutionContext,
};
use assets_updater::core::regions::select_region;
use bytes::Bytes;
use common::http::SyncResponse;
use common::updater::SyncContext;
use communicator::http::{json_from_request, send, send_error};
use h2::server::SendResponse;
use h2::RecvStream;
use http::Request;
pub async fn sync_route(
mut request: Request<RecvStream>,
send_response: SendResponse<Bytes>,
) -> Result<(), h2::Error> {
let body = json_from_request(&mut request).await;
if let Err(error) = body {
send_error(send_response, error);
return Ok(());
}
let sync_context: SyncContext = body.unwrap();
let region = select_region(&CONFIG.updater_config, &sync_context.region);
if let Err(error) = region {
send_error(send_response, error.into());
return Ok(());
}
let region = region.unwrap();
let exec = AssetExecutionContext::new(&CONFIG.updater_config, &sync_context, region);
if let Err(error) = exec {
send_error(send_response, error.into());
return Ok(());
}
let mut exec = exec.unwrap();
let tasks = exec.fetch_tasks().await;
if let Err(error) = tasks {
send_error(send_response, error.into());
return Ok(());
}
let tasks = tasks
.unwrap()
.into_iter()
.filter(|task| should_download_bundle(&sync_context, &task.download_path, &task.category))
.collect::<Vec<_>>();
let id = SESSION_STORE.put(exec).await;
let resp = serde_json::to_string(&SyncResponse { id, tasks }).unwrap();
send(send_response, 200, "application/json", resp);
Ok(())
}

31
server/src/session.rs Normal file
View File

@ -0,0 +1,31 @@
use moka::future::{Cache, CacheBuilder};
use std::hash::RandomState;
use uuid::Uuid;
pub struct SessionStore<T> {
cache: Cache<String, T>,
}
impl<T> SessionStore<T>
where
T: Clone + Send + Sync + 'static,
{
pub fn new(builder: CacheBuilder<String, T, Cache<String, T, RandomState>>) -> Self {
let cache = builder.build();
Self { cache }
}
pub async fn put(&self, data: T) -> String {
let id = Uuid::new_v4().to_string(); // 生成唯一的 UUID
self.cache.insert(id.clone(), data).await;
id
}
pub async fn get(&self, id: &str) -> Option<T> {
self.cache.get(id).await
}
pub async fn remove(&self, id: &str) -> Option<T> {
self.cache.remove(id).await
}
}