fn spawn<F>(fut: F) -> Receiver<F::Output>
where F: Future + Send + 'static,
F::Output: Send,
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut fut = pin!(fut);
let t = thread::current();
let waker = Arc::new(ThreadWaker(t)).into();
let mut cx = Context::from_waker(&waker);
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(res) => {
let _ = tx.send(res);
break;
}
Poll::Pending => thread::park(),
}
}
});
rx
}
fn main() {
println!("Start {}", chrono::Utc::now());
let r1 = spawn(MyFuture {
id: 1,
start: Instant::now(),
duration: Duration::from_secs(10),
});
let r2 = spawn(MyFuture {
id: 2,
start: Instant::now(),
duration: Duration::from_secs(10),
});
let h1 = thread::spawn(move || {
r1.recv().unwrap();
println!("Finish 1 {}", chrono::Utc::now());
});
let h2 = thread::spawn(move || {
r2.recv().unwrap();
println!("Finish 2 {}", chrono::Utc::now());
});
h1.join().unwrap();
h2.join().unwrap();
}
Send + 'static
Future 多了一个 Send + 'static,让其能在多个线程中传递
如果想 Future 的产生和计算都在同一个线程中
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let local = tokio::task::LocalSet::new();
let rc = Rc::new(42);
local.run_until(async move {
println!("rc={rc}");
}).await;
});
use crossbeam::channel;
use std::sync::Arc;
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `future` at any given time. The
// `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
// 引入 futures
use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
当上面的计时器线程调用 waker.wake() 时,任务被推送到通道中。
接下来在 MiniTokio::run() 函数中实现任务的接收和执行。
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the future
let mut future = self.future.try_lock().unwrap();
// Poll the future
let _ = future.as_mut().poll(&mut cx);
}
// Spawns a new task with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
参考资料
async runtime
[tokio:main] 实际是一个过程宏,将 async fn main 展开成
std::future::Future 的定义
示例
多个 futures
前面的 future 只有一个能运行,改动 block 的逻辑,让其能同时运行多个 future,改变 block_on 的函数名称
Send + 'static
如果想 Future 的产生和计算都在同一个线程中
Waker API
在使用 async fn main 下生成的 Future 如下
Poll::Pending
。Poll::Pending
向调用者表明 Future 将在稍后完成,调用者应稍后再次调用poll
。执行者 (Executors)
poll
调用它们来转换它们的状态。poll
?tokio::spawn
或者是用#[tokio::main]
注释的主函数。这导致将生成的最外层 Future 提交给Tokio 执行者。执行者负责在外部未来 Future 调用
Future::poll
,驱动异步计算完成。Wakers
这是一个资源能够通知等待任务该资源已准备好继续某些操作
poll
的Context
参数有一个waker()
方法。此方法返回绑定到当前任务的Waker
。Waker
有一个wake()
方法。调用此方法会向执行程序 (exectutors) 发出信号,表明应安排执行相关任务。
资源在转换到就绪状态时调用
wake()
以通知执行者轮询任务将能够取得进展。Waker 实际是一个回调闭包,使用 wake 和 wake_by_ref 来通知异步运行时再次当前的 Future 计算
最开始的实现:
在返回 Poll::Pending 之前,调用了 cx.waker().wake_by_ref()
通过返回 Poll::Pending,负责向唤醒者发出信号。
下一步更新 Mini Tokio 以接收唤醒通知。
希望 Executors 只在任务被唤醒时运行任务,为此,MiniTokio 将提供自己的 waker。
当 waker 被调用时,其相关任务被排队等待执行
当上面的计时器线程调用 waker.wake() 时,任务被推送到通道中。
接下来在 MiniTokio::run() 函数中实现任务的接收和执行。
type/rust #public