|
此文章由 DDD888 原创或转贴,不代表本站立场和观点,版权归 oursteps.com.au 和作者 DDD888 所有!转贴必须注明作者、出处和本声明,并保持内容完整
好奇怪,我调用 tokio sleep,就掉在那sleep函数里了,但改成调用thread::sleep就可以
下面是完整代码
use tokio;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio: :sleep;
//use std::thread;
use std: uration;
use crate::manager::Manager;
use crossbeam_channel::Receiver;
pub async fn my_async_task(
the_manager: Arc<Mutex<Manager>>,
index: usize,
signal_receiver: Receiver<()>,
) -> String {
while true {
let result = the_manager.lock().await.get();
if result == 0 {
return "".to_string();
}
if result == 1 {
break;
}
// result = 2
let _ = signal_receiver.recv();
println!("debug my_async_task get recv signal {:#?}", index);
//sleep(Duration::from_millis(10000)).await;
}
println!("debug my_async_task start {:#?}", index);
sleep(Duration::from_millis(300)).await;
//thread::sleep(Duration::from_secs(2));
println!("debug my_async_task finished {:#?}", index);
the_manager.lock().await.release();
format!("{}", index)
}
+++++++++++++++++++++++++++++++++
use crossbeam_channel::Sender;
pub struct Manager {
count: usize,
total: usize,
job_max: usize,
signal_sender: Sender<()>,
}
impl Manager {
pub fn new(job_max: usize, total: usize, sender: Sender<()>) -> Manager {
Manager {
count: 0,
job_max,
total,
signal_sender: sender,
}
}
pub fn get(&mut self) -> usize {
if self.total < 1 {
return 0;
}
if self.count < self.job_max {
self.count = self.count + 1;
return 1;
}
2
}
pub fn release(&mut self) {
if self.count > 0 {
self.count = self.count - 1;
}
if self.total > 0 {
self.total = self.total - 1;
if self.total > 0 {
let _ = self.signal_sender.send(());
}
}
}
}
++++++++++++++++++++++++++++++++++
mod manager;
mod my_async_task;
use tokio;
use futures::stream;
use futures::stream::StreamExt;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::sync::Mutex;
use crate::{manager::Manager, my_async_task::my_async_task};
use crossbeam_channel::unbounded;
fn main() {
let job_max = 2;
let total = 6;
// Create a channel of unbounded capacity.
let (s, rx) = unbounded();
let manager = Arc::new(Mutex::new(Manager::new(job_max, total, s)));
let runtime = Builder::new_multi_thread()
.enable_all()
.thread_name("multi-thread")
.worker_threads(1)
.build()
.unwrap();
let v = runtime.block_on(async {
let mut tasks = vec![];
for index in 0..total {
let rx_clone = rx.clone();
tasks.push(tokio::spawn(my_async_task(
manager.clone(),
index,
rx_clone,
)));
}
let fetches = stream::iter(tasks)
.buffer_unordered(job_max)
.collect::<Vec<_>>();
fetches.await
});
println!("length={}", v.len());
}
|
|