diff --git a/client/src/queue.rs b/client/src/queue.rs index d7bbbb0..5b43728 100644 --- a/client/src/queue.rs +++ b/client/src/queue.rs @@ -103,6 +103,23 @@ impl SharedQueue { }) } + pub fn clear(&self) { + let mut queue = self.inner.data.lock().unwrap(); + + let removed_count = queue.len(); + + if removed_count > 0 { + queue.clear(); + + let prev_pending = self.inner.pending.fetch_sub(removed_count, Ordering::SeqCst); + + if prev_pending == removed_count { + self.inner.done_cond.notify_all(); + } + + } + } + /// 阻塞当前线程,直到所有在途任务(pending == 0)全部处理完 pub fn wait_until_all_consumed(&self) { let mut _queue_lock = self.inner.data.lock().unwrap(); diff --git a/client/src/task.rs b/client/src/task.rs index 760037c..cd00cc1 100644 --- a/client/src/task.rs +++ b/client/src/task.rs @@ -55,6 +55,7 @@ pub async fn post_run( return Ok(None); } cnt.reset(); + queue.clear(); queue.push_all(tasks); Ok(Some(id)) }