For a single item, use FutureExt::into_stream
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
inner().into_stream()
}
async fn inner() -> i32 {
42
}
For a stream from a number of futures generated by a closure, use stream::unfold
:
use futures::prelude::*; // 0.3.1
fn outer() -> impl Stream<Item = i32> {
stream::unfold((), |()| async { Some((inner().await, ())) })
}
async fn inner() -> i32 {
42
}
In your case, you can use stream::unfold
:
use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1
fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
stream::unfold(s, |s| {
async {
let data = read_one(&s).await;
Some((data, s))
}
})
}
async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
let mut data = vec![0; 1024];
let (len, _) = s.recv_from(&mut data).await?;
data.truncate(len);
Ok(data)
}
#[async_std::main]
async fn main() -> io::Result<()> {
let s = UdpSocket::bind("0.0.0.0:9876").await?;
read_many(s)
.for_each(|d| {
async {
match d {
Ok(d) => match std::str::from_utf8(&d) {
Ok(s) => println!("{}", s),
Err(_) => println!("{:x?}", d),
},
Err(e) => eprintln!("Error: {}", e),
}
}
})
.await;
Ok(())
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…