use std::rc::Rc;
#[tokio::main]
async fn main() {
// `Rc` does not implement `Send`, and thus may not be sent between
// threads safely.
let nonsend_data = Rc::new("my nonsend data...");
let nonsend_data = nonsend_data.clone();
// Because the `async` block here moves `nonsend_data`, the future is `!Send`.
// Since `tokio::spawn` requires the spawned future to implement `Send`, this
// will not compile.
tokio::spawn(async move {
println!("{}", nonsend_data);
// ...
}).await.unwrap();
}
use std::rc::Rc;
use tokio::task;
#[tokio::main]
async fn main() {
let nonsend_data = Rc::new("my nonsend data...");
// Construct a local task set that can run `!Send` futures.
let local = task::LocalSet::new();
// Run the local task set.
local.run_until(async move {
let nonsend_data = nonsend_data.clone();
// `spawn_local` ensures that the future is spawned on the local
// task set.
task::spawn_local(async move {
println!("{}", nonsend_data);
// ...
}).await.unwrap();
}).await;
}
// 本身是一个 future
use tokio::{task, time};
use std::rc::Rc;
#[tokio::main]
async fn main() {
let nonsend_data = Rc::new("world");
let local = task::LocalSet::new();
let nonsend_data2 = nonsend_data.clone();
local.spawn_local(async move {
// ...
println!("hello {}", nonsend_data2)
});
local.spawn_local(async move {
time::sleep(time::Duration::from_millis(100)).await;
println!("goodbye {}", nonsend_data)
});
// ...
local.await;
}
什么是阻塞
异步是使用一种称为协作调度的机制实现的
异步代码不能在到达.await 的情况前花费很长时间
阻塞线程 (blocking the thread) 意味着阻止运行时切换当前任务
它阻塞了线程。在这种情况下,没有其他任务,所以这不是问题,但在实际程序中会有其他的任务
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("Hello World!");
// No .await here!
std::thread::sleep(Duration::from_secs(5));
println!("Five seconds later...");
}
use std::time::Duration;
async fn sleep_then_print(timer: i32) {
println!("Start timer {}.", timer);
// No .await here!
std::thread::sleep(Duration::from_secs(1));
println!("Timer {} done.", timer);
}
#[tokio::main]
async fn main() {
// The join! macro lets you run multiple things concurrently.
tokio::join!(
sleep_then_print(1),
sleep_then_print(2),
sleep_then_print(3),
);
}
trait Sequencer {
fn generate(&self) -> Vec<i32>;
}
impl PlainSequencer {
async fn generate_async(&self)->Vec<i32>{
let mut res = vec![];
for i in 0..self.bound {
res.push(i);
tokio::time::sleep(Duration::from_millis(100)).await;
}
res
}
}
impl Sequencer for PlainSequencer {
fn generate(&self) -> Vec<i32> {
self.generate_async().await
}
}
注意这里的 generate 是一个同步的签名,却调用异步的实现
尝试一
impl Sequencer for PlainSequencer {
fn generate(&self) -> Vec<i32> {
RUNTIME.block_on(async{
self.generate_async().await
})
}
}
#[cfg(test)]
mod tests{
#[tokio::test]
async fn test_sync_method() {
let sequencer = PlainSequencer {
bound: 3
};
let vec = sequencer.generate();
println!("vec: {:?}", vec);
}
}
Runtime::block_on 方法,它会阻塞当前线程,直到 future 完成。
该错误说明不允许从*当前正在执行的运行时中启动另一个运行时。
Cannot start a runtime from within a runtime.
This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
thread 'tests::test_sync_method' panicked at 'Cannot start a runtime from within a runtime.
/Users/lei/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/runtime/enter.rs:39:9
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
// Wait for a wakeup.
while !thread_notify.unparked.swap(false, Ordering::Acquire) {
// No wakeup occurred. It may occur now, right before parking,
// but in that case the token made available by `unpark()`
// is guaranteed to still be available and `park()` is a no-op.
thread::park();
}
}
})
}
impl Sequencer for PlainSequencer {
fn generate(&self) -> Vec<i32> {
let bound = self.bound;
futures::executor::block_on(async move {
RUNTIME.spawn(async move {
let mut res = vec![];
for i in 0..bound {
res.push(i);
tokio::time::sleep(Duration::from_millis(100)).await;
}
res
}).await.unwrap()
})
}
}
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}
#[tokio::main]
async fn main() {
// 能拿到执行结果
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
// 返回一个 Result<T, Error>
let out = handle.await.unwrap();
println!("GOT {}", out);
}
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
产生的结果是
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
v 仍然被 task 之外拥有,println! 只是借用。能通过 async mov 的方式来使所有权变更
task::spawn(async move {
println!("Here's a vec: {:?}", v);
});
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
下面则不行
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}
锁
不要在锁住的时候,调用 await
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
这是因为 the std::sync::MutexGuard type is not Send,这就意味着 you can't send a mutex lock to another thread。你能这样
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
// 注意现在 rust 编译器还不是识别这种,仍然不能
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
channel
为啥需要 channel
use mini_redis::client;
#[tokio::main]
async fn main() {
// 创建到服务器的连接
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// 生成两个任务,一个用于获取 key, 一个用于设置 key
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
因为两个任务都需要去访问 client,但是 client 并没有实现 Copy 特征,
方法 set 和 get 都使用了 client 的可变引用 &mut self,由此还会造成同时借用两个可变引用的错误。
broadcast: 多生产者,多消费值。每个 consumer 都能收到发送的每一个值,类似于 close 的信号,一个使用的场景是用来进行优雅的关闭。
watch: 单生产者,多消费者。Many values can be sent, but no history is kept. Receivers only see the most recent value.
async_channel:多生产者和多消费者
mpsc 的使用
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
参考资料
(usage:: 什么是 tokio runtime )
tokio::main
来在后台创建运行时
,理解运行时是有一整套的异步代码,包括executor
,waker
这些运行时上下文中
,使用 tokio::spawn 函数产生其他任务。tokio::spawn
或者是用#\[tokio::main\]
注释的主函数。Future::poll
,驱动异步计算完成待处理队列
。从待处理队列移动到就绪队列
tokio::runtime::Builder
tokio::task::LocalSet
Send
的 future,因此无法在线程之间安全地 Send。下面代码不会编译
!Send
future 并安排它们在调用 Runtime::block_on 的线程上。run_until
方法只能在\#\[tokio::main\]
、\#\[tokio::test\]
或直接在对Runtime::block_on
的调用中使用。tokio::spawn
生成的任务中使用。LocalSet
本身实现了Future
,用于在LocalSet
上运行多个 futures 并驱动整个集合直到它们完成什么是阻塞
它阻塞了线程。在这种情况下,没有其他任务,所以这不是问题,但在实际程序中会有其他的任务
.await
处如果我想 block,该怎么办
tokio::task::spawn_blocking
函数rayon
cratestd::thread::spawn
生成专用线程。(usage:: 在同步代码中使用异步函数)
generate 是一个同步方法,.await 里面不能直接使用
尝试一
Runtime::block_on
方法,它会阻塞当前线程,直到 future 完成。尝试二
tokio::test
中挂起,而在tokio::main
中正常完成。generate
函数的一定是Tokio 的 executor。实际运行在 futures_executor::local_pool::run_executor
尝试三
futures::executor::block_on
中,额外的RUNTIME
用于生成异步代码。RUNTIME
,而不是为futures::executor::block_on
生成一个新线程,死锁问题就解决了,tokio::time::sleep
方法调用将抱怨“没有 reactor 正在运行”,因为 Tokio 功能需要运行时tokio::main 和 tokio::test 区别
tokio::main
没有挂起而tokio::test
挂起,它们使用不同的运行时。tokio::main
在多线程运行时运行,而tokio::test
在单线程运行时运行。futures::executor::block_on
阻塞时,用户提交的异步代码无法执行,导致前面提到的死锁。一些结论
常见的功能
scoped_thread_local!
并发编程(类似 goroutine)
spawn 用来产生一个 green thread,也能拿到该 thread 执行的结果。
'static 约束
注意'static 并不意味着生命周期一直到程序终结,能见rust 中的 lifetime
send
概念解释
下面则不行
锁
不要在锁住的时候,调用 await
这是因为 the std::sync::MutexGuard type is not Send,这就意味着 you can't send a mutex lock to another thread。你能这样
channel
为啥需要 channel
client
,但是client
并没有实现Copy
特征,set
和get
都使用了client
的可变引用&mut self
,由此还会造成同时借用两个可变引用的错误。std::sync::Mutex
无法被使用,同步锁无法在.await
调用过程中使用tokio::sync:Mutex
,答案是能用,但是同时就只能运行一个请求channel 的种类
mpsc 的使用
select
<async expression>
都会被汇总并同时执行。<pattern>
。<pattern>
就是变量名,异步表达式的结果能绑定到这个变量名上且\<pattern>
与异步计算的结果不匹配,则其余的异步表达式将继续并发执行直到下一个完成为止。这时,将相同的逻辑用于该结果。
取消
Drop去清理后台资源
。Sender 方能接收到这个通知并通过丢弃正在进行的操作来中止它。
错误
?
号操作符从表达式传播错误。?
号从异步表达式或处理程序中使用。?
在异步表达式中能将错误传播到异步表达式之外。Result
了。从一个处理程序使用
?
号能立即传播错误到select!
表达式之外。返回值
模式匹配 (Pattern matching)
<pattern>
使用了变量绑定。假设从多个 MPSC 通道接收,可能会执行以下操作:
借用
select!
宏没有这样的限制。或者单个异步表达式能可变的借用数据。
data
变量。Ok()
上进行了模式匹配,如果一个表达式失败,另外一个将继续执行。<handler>
时,select!
保证只有一个<handler>
运行。根据这一点,每一个
<handler>
能可变的借用同一个数据。循环
使用 multiple channels:
select!
宏会随机的选择分支来检查就绪情况。select!
没有随机的选择首先要检查的分支,rx1
. 如果rx1
始终都有新消息,则永远不会再检查其余的通道了恢复异步操作 (Resuming an async operation)
select!
宏中调用action(),它在循环外被调用。action()
的返回分配给operation,而不需要调用.await
。然后在operation
上调用tokio::pin!
.select!
循里面,不是传递operation而是传递&mut operation
.operation
变量正在跟踪异步操作。action()
的一次新的调用。select!
分支从通道中接收消息。如果消息是偶数,则循环完成。否则再次开始select!
..await
一个引用,必须固定引用的值或者实现Unpin
.每个任务的并发
tokio::spawn
与select!
都能运行并发异步操作。tokio::spawn
函数传入一个异步操作并产生一个新的任务去运行它。select!
宏能在同一个任务上同时运行所有分支
。select!
宏上的所有分支被同一个任务执行,它们永远不会同时运行。select!
宏的多路复用 步操作也在单个任务上运行。type/rust #type/networking #public