Streams

stream 是一种异步的一系列的值。它的异步等效与 Rust 的 std::iter::Iterator 并且通过 Stream trait 来表示。Streams 能够在 async functions 中被迭代。它们也可以通过适配器被转换。Tokio 在 StreamExt trait 上提供了一些场景的适配器。

Tokio 在一个单独的 crate 提供了 stream 支持:tokio-stream

tokio-stream = "0.1"

目前,Tokio 的 stream 实用工具存在与 tokio-strean crate 中。一旦 Stream trait 在 Rust 标准库中稳定了,Tokio 的 stream 将会被移动到 tokio crate 。

Iteration

目前为止,Rust 编程语言没有支持异步的 for 循环。作为替代,迭代 stream 通过使用  StreamExt::next() 并配对一个 while let 循环来完成。

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

像迭代器一样,next() 方法返回 Option<T> ,T 是 stream 里的值的类型。接收到 None 表示这个 stream iteration 终止了。

Mini-Redis broadcast (Mini-Redis 广播)

让我们使用 Mini-Redis client 来回顾一个稍微更复杂的例子。

完整的代码可以在这里找到。

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Publish some data
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

一个任务被生成来发布消息到 Mini-Redis server 上的 "numbers" channel 。然后,在主要的任务中,我们订阅 "numbers" channel 并且打印接收到的消息。

订阅之后,我们在返回的 subscriber 上调用 into_stream() 。这个方法会消费掉这个 Subscriber ,返回一个能在消息到达的时候生成消息的 stream 。在我们开始迭代消息之前,注意这个 stream 通过tokio::pin! 被  pin 到了栈上。 在一个 stream 上调用 next() 要求这个 stream 是 pinned (这也是上面用 tokio::pin! 的原因)。into_stream() 函数返回一个没有被 pin 的 stream,为了迭代这个 stream ,我们必须显式地 pin 它。

当一个 Rust 的值不再能够在内存中被移动时,这个值就是 "pinned" 。a pinned value 的关键属性是指针可以取到 pinned data 并且调用者可以确信指针是有效的。这个特性被 async/await 用来支持跨 .await 点的借用数据。

如果我们忘记 pin the stream,我们会得到一个像这样的错误:

#![allow(unused)]
fn main() {
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  --> streams/src/main.rs:29:36
   |
29 |     while let Some(msg) = messages.next().await {
   |                                    ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
   |
   = note: required because it appears within the type `impl Future`
   = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
   = note: required because it appears within the type `impl Stream`
   = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
}

如果你遇到了类似这样的错误信息,请尝试 pin 这个值!!!

在尝试运行这个之前,先把 Mini-Redis server 跑起来:

mini-redis-server

然后尝试运行上面的代码。我们将会看到消息被输出到了 STDOUT。

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

由于订阅和发布之间存在竞争,一些早期消息可能会被删除。该程序永远不会退出。只要服务器处于活动状态,对 Mini-Redis 频道的订阅就会保持活动状态。

让我们看看可以怎么来用 stream 拓展这个程序。

Adapters (适配器)

接收一个 Stream 并且返回另一个 Stream 的函数通常被称为 'stream adapters' ,因为它们是 'adapter pattern' 的一种形式。常见的 stream adapters 包括 map、 take 和 filter

让我们更新一下 Mini-Redis 来让它能够退出。在接收到 3 个消息后,停止迭代消息,使用 take 来完成这个目的。这个 adapter 限制 stream 生产至多 n 条消息(n 条消息后 while let 就拿不到 Some 了,程序就能退出了)。

#![allow(unused)]
fn main() {
let messages = subscriber
    .into_stream()
    .take(3);
}

再次运行程序,我们得到了:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

这次程序结束了。

现在,让我们把 stream 限制为个位数,我们将会通过检查消息的长度来确保此事。我们使用 filter adapter 来 drop 任何不匹配先决条件的消息。

#![allow(unused)]
fn main() {
let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);
}

再次运行程序,我们得到:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

请注意,adapter 的应用顺序很重要。先调用 filter 然后 take 是跟先 take 然后 filter 不一样的(这很好理解,先 take 的话,就会在前三个里找内容是个位数的消息)。

最后,我们将通过剥离 Ok(Message{...}) 部分来整理输出,这通过 map 来完成。因为这是在 filter 之后被应用的,我们能知道消息是 Ok,所以我们可以使用 unwrap()

#![allow(unused)]
fn main() {
let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);
}

现在,输出是:

got = b"1"
got = b"3"
got = b"6"

另一种选择是使用 filter_map 将 filter 和 map 两个步骤组合起来作为一个单次调用。

这里可以找到更多可用的 adapter。

Implementing Stream

 Stream trait 和 Future trait 非常类似。

#![allow(unused)]
fn main() {
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}
}

Stream::poll_next() 函数非常像 Future::poll ,除了它可以被反复调用来从 stream 接收许多值。正如我们在Async in depth了解到的一样,当一个 stream 没有准备好返回一个值的时候,Poll::Pending 会被返回。任务的 waker 会被注册,一旦 stream 应该被再次 poll 的时候,waker 会被通知。

这里的 size_hint() 方法的使用方式跟 iterators 里的一样,它会返回 stream 剩余长度是上下界,(0, None) 是它的默认实现,这对任何 stream 来说都是正确的。

通常来说,当手动实现一个 Stream 的时候,它是通过组合 future 和其它 stream 来完成的。作为一个示例,让我们重建在Async in depth实现的 Delay future,我们将会把它转换成一个以 10 ms 为间隔,生成 3 次 () 的 stream 。

#![allow(unused)]
fn main() {
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // No more delays
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}
}

async-stream

手动用 Stream trait 来实现 stream 是非常冗长乏味的。不幸的是,Rust 编程语言还不支持 async/await 来定义 stream 。这项工作正在做,但是还没就绪。

async-stream crate 可以作为一个临时解决方案使用,这个 crate 提供了一个 stream! 宏,它能将输入转化成一个 stream。通过使用这个 crate,上面的 interval 可以像这样被实现:

#![allow(unused)]
fn main() {
use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}
}