diff --git a/client/src/main.rs b/client/src/main.rs index e963b69..79801df 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -143,7 +143,11 @@ async fn main() -> anyhow::Result<()> { let liveness_tx = liveness_tx.clone(); join_set.spawn(async move { let _guard = liveness_tx; + let mut inner_set = JoinSet::new(); loop { + if cancel_token.is_cancelled() { + return; + } let client = sender.recv(); if client.is_none() { continue; @@ -152,25 +156,37 @@ async fn main() -> anyhow::Result<()> { if cancel_token.is_cancelled() { return; } - loop { - if client.get_client().await.is_err() { - break; - } - if cancel_token.is_cancelled() { - return; - } - let permit = semaphore.clone().acquire_owned().await.unwrap(); - let result = run(client.clone(), profile.clone()).await; - match result { - Ok(true) => { - post_task(profile.clone(), permit, cancel_token.clone()).await; + let semaphore = semaphore.clone(); + let cancel_token = cancel_token.clone(); + let profile = profile.clone(); + inner_set.spawn(async move { + loop { + if client.get_client().await.is_err() { + return; } - Err(error) => { - error!("{}", error); + if cancel_token.is_cancelled() { + return; } - _ => {} + { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + if cancel_token.is_cancelled() { + return; + } + let result = run(client.clone(), profile.clone()).await; + match result { + Ok(true) => { + post_task(profile.clone(), permit, cancel_token.clone()) + .await; + } + Err(error) => { + error!("{}", error); + } + _ => {} + } + } + sleep(Duration::from_millis(100)).await; // let other client get semaphore } - } + }); } }); } @@ -202,17 +218,24 @@ async fn main() -> anyhow::Result<()> { if cancel_token.is_cancelled() { return; } - let permit = semaphore.clone().acquire_owned().await.unwrap(); - let result = run(client.clone(), profile.clone()).await; - match result { - Ok(true) => { - post_task(profile.clone(), permit, cancel_token.clone()).await; + { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + if cancel_token.is_cancelled() { + return; } - Err(error) => { - error!("{}", error); + let result = run(client.clone(), profile.clone()).await; + match result { + Ok(true) => { + post_task(profile.clone(), permit, cancel_token.clone()) + .await; + } + Err(error) => { + error!("{}", error); + } + _ => {} } - _ => {} } + sleep(Duration::from_millis(100)).await; // let other client get semaphore } } if cancel_token.is_cancelled() {