1
0
mirror of https://github.com/Bluemangoo/sekai-unpacker.git synced 2026-05-06 20:44:47 +08:00

Compare commits

..

No commits in common. "cdf617b5c333a2b1b13cfc7b5179b1de3fc6d028" and "de7b63d366ffb453d59c52bc8642890bb579572a" have entirely different histories.

8 changed files with 22 additions and 162 deletions

26
Cargo.lock generated
View File

@ -126,17 +126,6 @@ dependencies = [
"yaml_serde",
]
[[package]]
name = "async-http-proxy"
version = "1.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29faa5d4d308266048bd7505ba55484315a890102f9345b9ff4b87de64201592"
dependencies = [
"httparse",
"thiserror 1.0.69",
"tokio",
]
[[package]]
name = "async-lock"
version = "3.4.1"
@ -455,7 +444,6 @@ name = "communicator"
version = "0.1.0"
dependencies = [
"anyhow",
"async-http-proxy",
"bytes",
"futures",
"futures-util",
@ -467,9 +455,7 @@ dependencies = [
"serde_json",
"tokio",
"tokio-rustls",
"tokio-socks",
"tokio-util",
"url",
"webpki-roots",
]
@ -2729,18 +2715,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-socks"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4770b8024672c1101b3f6733eab95b18007dbe0847a8afe341fcf79e06043f"
dependencies = [
"either",
"futures-util",
"thiserror 1.0.69",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.18"

View File

@ -35,7 +35,3 @@ structopt = "0.3.26"
tokio-util = "0.7.18"
futures-util = "0.3.32"
twox-hash = "2.1.2"
futures = "0.3.32"
tokio-socks = "0.5.2"
url = "2.5.8"
async-http-proxy = "1.2.5"

View File

@ -183,6 +183,7 @@ async fn main() -> anyhow::Result<()> {
let liveness_tx = liveness_tx.clone();
join_set.spawn(async move {
let _guard = liveness_tx;
let mut inner_set = JoinSet::new();
loop {
if cancel_token.is_cancelled() {
return;
@ -204,7 +205,7 @@ async fn main() -> anyhow::Result<()> {
let local_manifest = local_manifest.clone();
let signal = signal.clone();
tokio::spawn(async move {
inner_set.spawn(async move {
loop {
if client.get_client().await.is_err() {
return;

View File

@ -103,23 +103,6 @@ impl<T> SharedQueue<T> {
})
}
pub fn clear(&self) {
let mut queue = self.inner.data.lock().unwrap();
let removed_count = queue.len();
if removed_count > 0 {
queue.clear();
let prev_pending = self.inner.pending.fetch_sub(removed_count, Ordering::SeqCst);
if prev_pending == removed_count {
self.inner.done_cond.notify_all();
}
}
}
/// 阻塞当前线程直到所有在途任务pending == 0全部处理完
pub fn wait_until_all_consumed(&self) {
let mut _queue_lock = self.inner.data.lock().unwrap();

View File

@ -55,7 +55,6 @@ pub async fn post_run(
return Ok(None);
}
cnt.reset();
queue.clear();
queue.push_all(tasks);
Ok(Some(id))
}
@ -122,6 +121,11 @@ pub async fn run_main(
match r {
Ok(Ok(())) => cnt.inc_success(),
Ok(Err(e)) => {
if e.to_string()
.contains("Session did not reconnect within 15s")
{
return Err(anyhow!(e));
}
error!("{}", e);
cnt.inc_failure()
}
@ -159,21 +163,12 @@ pub async fn run_side(
let n = p1.concurrent.unwrap_or(5);
let semaphore = Arc::new(Semaphore::new(n));
let mut join_set = JoinSet::new();
let cancel_token = CancellationToken::new();
while let Some(task) = queue.try_pop() {
if cancel_token.is_cancelled() {
break;
}
let permit = semaphore.clone().acquire_owned().await?;
if cancel_token.is_cancelled() {
break;
}
let client = client.clone();
let id = id.clone();
let local_manifest = manifest.clone();
let p1 = p1.clone();
let cancel_token = cancel_token.clone();
join_set.spawn(async move {
let guard = task;
@ -190,11 +185,6 @@ pub async fn run_side(
let mut retry_conn = client.get_client().await?;
result = download(&mut retry_conn, &req, &p1).await;
}
if let Err(e) = &result
&& e.downcast_ref::<h2::Error>().is_some()
{
cancel_token.cancel();
}
result?;
local_manifest

View File

@ -17,7 +17,4 @@ webpki-roots = { workspace = true }
anyhow = { workspace = true }
tokio-util = { workspace = true }
futures-util = { workspace = true }
futures = { workspace = true }
tokio-socks = { workspace = true }
url = {workspace = true}
async-http-proxy = {workspace = true, features = ["tokio", "runtime-tokio"]}
futures = "0.3.32"

View File

@ -34,7 +34,6 @@ pub struct TcpClientTunnelConfig {
pub host: Option<String>,
pub url: String,
pub token: String,
pub proxy: Option<String>,
}
impl TcpClientTunnelConfig {
@ -44,7 +43,6 @@ impl TcpClientTunnelConfig {
host: self.host,
url: self.url,
token: self.token,
proxy: self.proxy,
}
}
}

View File

@ -1,11 +1,10 @@
use anyhow::anyhow;
use async_http_proxy::http_connect_tokio;
use bytes::Bytes;
use h2::{RecvStream, client, server};
use http::Request;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
@ -18,8 +17,6 @@ use tokio::time::{Duration, sleep, timeout};
use tokio_rustls::rustls;
use tokio_rustls::rustls::pki_types::{CertificateDer, ServerName};
use tokio_rustls::{TlsAcceptor, TlsConnector};
use tokio_socks::tcp::Socks5Stream;
use url::Url;
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Identity {
@ -42,7 +39,6 @@ pub struct ClientTunnelConfig {
pub url: String,
pub token: String,
pub identity: Identity,
pub proxy: Option<String>,
}
impl Default for ClientTunnelConfig {
@ -52,7 +48,6 @@ impl Default for ClientTunnelConfig {
url: "127.0.0.1:3333".to_string(),
token: "super_secret_magic_token".to_string(),
identity: Identity::Client,
proxy: None,
}
}
}
@ -100,30 +95,6 @@ pub enum TunnelEndpoint {
Server(Arc<ServerManager>),
}
#[derive(Clone)]
pub enum WeakTunnelEndpoint {
Client(std::sync::Weak<ClientManager>),
Server(std::sync::Weak<ServerManager>),
}
impl TunnelEndpoint {
pub fn downgrade(&self) -> WeakTunnelEndpoint {
match self {
Self::Client(c) => WeakTunnelEndpoint::Client(Arc::downgrade(c)),
Self::Server(s) => WeakTunnelEndpoint::Server(Arc::downgrade(s)),
}
}
}
impl WeakTunnelEndpoint {
pub fn upgrade(&self) -> Option<TunnelEndpoint> {
match self {
Self::Client(c) => c.upgrade().map(TunnelEndpoint::Client),
Self::Server(s) => s.upgrade().map(TunnelEndpoint::Server),
}
}
}
pub struct ClientManager {
pub session_id: AtomicU64,
pub current_client: Mutex<Option<client::SendRequest<Bytes>>>,
@ -260,9 +231,9 @@ enum ResumeResult {
pub struct TunnelListener {
listener: TcpListener,
config: ServerTunnelConfig,
pending_plain_sessions: Mutex<HashMap<u64, std::time::Instant>>,
pending_plain_sessions: Mutex<HashSet<u64>>,
next_session_id: AtomicU64,
active_sessions: Mutex<HashMap<u64, WeakTunnelEndpoint>>,
active_sessions: Mutex<HashMap<u64, TunnelEndpoint>>,
}
impl TunnelListener {
@ -272,7 +243,7 @@ impl TunnelListener {
Ok(Self {
listener,
config,
pending_plain_sessions: Mutex::new(HashMap::new()),
pending_plain_sessions: Mutex::new(HashSet::new()),
next_session_id: AtomicU64::new(1),
active_sessions: Mutex::new(HashMap::new()),
})
@ -293,9 +264,7 @@ impl TunnelListener {
let mut stream = match self.try_resume_plain_session(stream, peer_addr).await? {
ResumeResult::NewSession(ep_raw, sid) => {
let ep = wrap_raw_endpoint(sid, ep_raw, None);
let mut sessions = self.active_sessions.lock().await;
sessions.retain(|_, weak_ep| weak_ep.upgrade().is_some());
sessions.insert(sid, ep.downgrade());
self.active_sessions.lock().await.insert(sid, ep.clone());
return Ok(ep);
}
ResumeResult::ResumedExisting => continue,
@ -321,9 +290,7 @@ impl TunnelListener {
let sid = self.next_session_id.fetch_add(1, Ordering::Relaxed);
let ep = wrap_raw_endpoint(sid, ep_raw, None);
let mut sessions = self.active_sessions.lock().await;
sessions.retain(|_, weak_ep| weak_ep.upgrade().is_some());
sessions.insert(sid, ep.downgrade());
self.active_sessions.lock().await.insert(sid, ep.clone());
return Ok(ep);
}
}
@ -353,8 +320,7 @@ impl TunnelListener {
let session_id = self.next_session_id.fetch_add(1, Ordering::Relaxed);
{
let mut pending = self.pending_plain_sessions.lock().await;
pending.retain(|_, time| time.elapsed() < Duration::from_secs(30));
pending.insert(session_id, std::time::Instant::now());
pending.insert(session_id);
}
tls_stream.write_all(TLS_BOOTSTRAP_MAGIC).await?;
@ -385,7 +351,7 @@ impl TunnelListener {
let is_pending = {
let mut pending = self.pending_plain_sessions.lock().await;
pending.remove(&session_id).is_some()
pending.remove(&session_id)
};
if is_pending {
@ -400,11 +366,7 @@ impl TunnelListener {
let ep_raw = upgrade_to_h2_raw(stream, self.config.identity)
.await
.map_err(|e| anyhow::anyhow!("{}", e))?;
update_endpoint(
&ep.upgrade().ok_or(anyhow!("Connection is cleared"))?,
ep_raw,
)
.await;
update_endpoint(&ep, ep_raw).await;
info!(
"[{}] Successfully resumed existing session {}",
peer_addr, session_id
@ -580,47 +542,6 @@ pub async fn connect_tunnel(config: ClientTunnelConfig) -> Result<TunnelEndpoint
Ok(wrap_raw_endpoint(sid, ep_raw, Some(config)))
}
pub async fn connect_with_auto_proxy(config: &ClientTunnelConfig) -> anyhow::Result<TcpStream> {
let Some(proxy) = &config.proxy else {
return Ok(TcpStream::connect(&config.url).await?);
};
let parsed_proxy = Url::parse(proxy)?;
match parsed_proxy.scheme() {
"socks5" => {
let host = parsed_proxy.host_str().unwrap();
let port = parsed_proxy.port().unwrap_or(1080);
let stream = Socks5Stream::connect((host, port), config.url.clone())
.await?
.into_inner();
Ok(stream)
}
"http" | "https" => {
let proxy_addr = format!(
"{}:{}",
parsed_proxy.host_str().unwrap(),
parsed_proxy.port().unwrap_or(80)
);
let mut stream = TcpStream::connect(proxy_addr).await?;
let target_url = Url::parse(&format!("tcp://{}", config.url))?;
http_connect_tokio(
&mut stream,
target_url.host_str().unwrap(),
target_url.port().unwrap_or(80),
)
.await?;
Ok(stream)
}
_ => Err(anyhow::anyhow!(
"Unsupported proxy scheme: {}",
parsed_proxy.scheme()
)),
}
}
async fn do_client_reconnect(
config: &ClientTunnelConfig,
current_sid: &mut u64,
@ -642,7 +563,7 @@ async fn do_client_reconnect(
*current_sid = sid;
resume_tunnel_client(config, sid).await
} else {
let mut stream = connect_with_auto_proxy(config).await?;
let mut stream = TcpStream::connect(&config.url).await?;
perform_client_handshake(&mut stream, &config.token, config.identity).await?;
let raw = upgrade_to_h2_raw(stream, config.identity).await?;
*current_sid = 0;
@ -655,7 +576,7 @@ async fn bootstrap_tls_and_get_sid(
host: &str,
) -> anyhow::Result<(u64, ())> {
let connector = build_client_tls_connector();
let tcp = connect_with_auto_proxy(config).await?;
let tcp = TcpStream::connect(&config.url).await?;
let server_name = ServerName::try_from(host.to_string())
.map_err(|_| anyhow!("Invalid TLS host: {}", host))?
.to_owned();
@ -683,7 +604,7 @@ async fn resume_tunnel_client(
config: &ClientTunnelConfig,
session_id: u64,
) -> anyhow::Result<TunnelEndpointRaw> {
let mut plain_stream = connect_with_auto_proxy(config).await?;
let mut plain_stream = TcpStream::connect(&config.url).await?;
plain_stream.write_all(RESUME_MAGIC).await?;
plain_stream.write_all(&session_id.to_be_bytes()).await?;
upgrade_to_h2_raw(plain_stream, config.identity).await