Update futures-util to 0.3.21 am: 737dc97288 am: b9180251b3 am: e2f2e216a9
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/2004175
Change-Id: Iacbebadeebe2be67366f7b2932d3b2619e3eb541
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index ffd4f55..b52386f 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
{
"git": {
- "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
- }
-}
+ "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+ },
+ "path_in_vcs": "futures-util"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index b7a9667..9d3dea9 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.17",
+ cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
test_suites: ["general-tests"],
auto_gen_config: true,
@@ -62,13 +62,10 @@
"futures-sink",
"io",
"memchr",
- "proc-macro-hack",
- "proc-macro-nested",
"sink",
"slab",
"std",
],
- cfgs: ["fn_like_proc_macro"],
rustlibs: [
"libfutures_channel",
"libfutures_core",
@@ -78,14 +75,10 @@
"libmemchr",
"libpin_project_lite",
"libpin_utils",
- "libproc_macro_nested",
"libslab",
"libtokio",
],
- proc_macros: [
- "libfutures_macro",
- "libproc_macro_hack",
- ],
+ proc_macros: ["libfutures_macro"],
}
rust_library {
@@ -93,7 +86,7 @@
host_supported: true,
crate_name: "futures_util",
cargo_env_compat: true,
- cargo_pkg_version: "0.3.17",
+ cargo_pkg_version: "0.3.21",
srcs: ["src/lib.rs"],
edition: "2018",
features: [
@@ -108,13 +101,10 @@
"futures-sink",
"io",
"memchr",
- "proc-macro-hack",
- "proc-macro-nested",
"sink",
"slab",
"std",
],
- cfgs: ["fn_like_proc_macro"],
rustlibs: [
"libfutures_channel",
"libfutures_core",
@@ -124,13 +114,9 @@
"libmemchr",
"libpin_project_lite",
"libpin_utils",
- "libproc_macro_nested",
"libslab",
],
- proc_macros: [
- "libfutures_macro",
- "libproc_macro_hack",
- ],
+ proc_macros: ["libfutures_macro"],
apex_available: [
"//apex_available:platform",
"com.android.bluetooth",
diff --git a/Cargo.toml b/Cargo.toml
index 90010e9..a148319 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,45 +11,51 @@
[package]
edition = "2018"
+rust-version = "1.45"
name = "futures-util"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "Common utilities and extension traits for the futures-rs library.\n"
+version = "0.3.21"
+description = """
+Common utilities and extension traits for the futures-rs library.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
[dependencies.futures-channel]
-version = "0.3.17"
+version = "0.3.21"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.17"
+version = "0.3.21"
default-features = false
[dependencies.futures-io]
-version = "0.3.17"
+version = "0.3.21"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.17"
+version = "=0.3.21"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.17"
+version = "0.3.21"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.17"
+version = "0.3.21"
default-features = false
[dependencies.futures_01]
@@ -67,14 +73,6 @@
[dependencies.pin-utils]
version = "0.1.0"
-[dependencies.proc-macro-hack]
-version = "0.5.19"
-optional = true
-
-[dependencies.proc-macro-nested]
-version = "0.1.2"
-optional = true
-
[dependencies.slab]
version = "0.4.2"
optional = true
@@ -82,24 +80,54 @@
[dependencies.tokio-io]
version = "0.1.9"
optional = true
+
[dev-dependencies.tokio]
version = "0.1.11"
-[build-dependencies.autocfg]
-version = "1"
[features]
-alloc = ["futures-core/alloc", "futures-task/alloc"]
+alloc = [
+ "futures-core/alloc",
+ "futures-task/alloc",
+]
async-await = []
-async-await-macro = ["async-await", "futures-macro", "proc-macro-hack", "proc-macro-nested"]
+async-await-macro = [
+ "async-await",
+ "futures-macro",
+]
bilock = []
cfg-target-has-atomic = []
-channel = ["std", "futures-channel"]
-compat = ["std", "futures_01"]
-default = ["std", "async-await", "async-await-macro"]
-io = ["std", "futures-io", "memchr"]
-io-compat = ["io", "compat", "tokio-io"]
-read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
+channel = [
+ "std",
+ "futures-channel",
+]
+compat = [
+ "std",
+ "futures_01",
+]
+default = [
+ "std",
+ "async-await",
+ "async-await-macro",
+]
+io = [
+ "std",
+ "futures-io",
+ "memchr",
+]
+io-compat = [
+ "io",
+ "compat",
+ "tokio-io",
+]
sink = ["futures-sink"]
-std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
-unstable = ["futures-core/unstable", "futures-task/unstable"]
+std = [
+ "alloc",
+ "futures-core/std",
+ "futures-task/std",
+ "slab",
+]
+unstable = [
+ "futures-core/unstable",
+ "futures-task/unstable",
+]
write-all-vectored = ["io"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index a8e9362..46ec854 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,11 @@
[package]
name = "futures-util"
+version = "0.3.21"
edition = "2018"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3"
description = """
Common utilities and extension traits for the futures-rs library.
"""
@@ -16,7 +15,7 @@
std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
alloc = ["futures-core/alloc", "futures-task/alloc"]
async-await = []
-async-await-macro = ["async-await", "futures-macro", "proc-macro-hack", "proc-macro-nested"]
+async-await-macro = ["async-await", "futures-macro"]
compat = ["std", "futures_01"]
io-compat = ["io", "compat", "tokio-io"]
sink = ["futures-sink"]
@@ -28,25 +27,19 @@
# `unstable` feature as an explicit opt-in to unstable API.
unstable = ["futures-core/unstable", "futures-task/unstable"]
bilock = []
-read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
write-all-vectored = ["io"]
# These features are no longer used.
# TODO: remove in the next major version.
cfg-target-has-atomic = []
-[build-dependencies]
-autocfg = "1"
-
[dependencies]
-futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.17", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.17", default-features = false, optional = true }
-proc-macro-hack = { version = "0.5.19", optional = true }
-proc-macro-nested = { version = "0.1.2", optional = true }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.21", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.21", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
diff --git a/METADATA b/METADATA
index 0ace75a..ed41eb9 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
}
url {
type: ARCHIVE
- value: "https://static.crates.io/crates/futures-util/futures-util-0.3.17.crate"
+ value: "https://static.crates.io/crates/futures-util/futures-util-0.3.21.crate"
}
- version: "0.3.17"
+ version: "0.3.21"
license_type: NOTICE
last_upgrade_date {
- year: 2021
- month: 9
- day: 22
+ year: 2022
+ month: 3
+ day: 1
}
}
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6e0aaed
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# futures-util
+
+Common utilities and extension traits for the futures-rs library.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-util = "0.3"
+```
+
+The current `futures-util` requires Rust 1.45 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs
new file mode 100644
index 0000000..64d5f9a
--- /dev/null
+++ b/benches/flatten_unordered.rs
@@ -0,0 +1,66 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshot_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+ const STREAM_ITEM_COUNT: usize = 1;
+
+ b.iter(|| {
+ let mut txs = VecDeque::with_capacity(STREAM_COUNT);
+ let mut rxs = Vec::new();
+
+ for _ in 0..STREAM_COUNT {
+ let (tx, rx) = oneshot::channel();
+ txs.push_back(tx);
+ rxs.push(rx);
+ }
+
+ thread::spawn(move || {
+ let mut last = 1;
+ while let Some(tx) = txs.pop_front() {
+ let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
+ last += STREAM_ITEM_COUNT;
+ }
+ });
+
+ let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
+ async {
+ if let Some(next) = vals.next() {
+ let val = next.await.unwrap();
+ Some((val, vals))
+ } else {
+ None
+ }
+ }
+ .boxed()
+ })
+ .flatten_unordered(None);
+
+ block_on(future::poll_fn(move |cx| {
+ let mut count = 0;
+ loop {
+ match flatten.poll_next_unpin(cx) {
+ Poll::Ready(None) => break,
+ Poll::Ready(Some(_)) => {
+ count += 1;
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
+
+ Poll::Ready(())
+ }))
+ });
+}
diff --git a/build.rs b/build.rs
index f8aa5fe..05e0496 100644
--- a/build.rs
+++ b/build.rs
@@ -1,10 +1,3 @@
-#![warn(rust_2018_idioms, single_use_lifetimes)]
-
-use autocfg::AutoCfg;
-use std::env;
-
-include!("no_atomic_cas.rs");
-
// The rustc-cfg listed below are considered public API, but it is *unstable*
// and outside of the normal semver guarantees:
//
@@ -14,10 +7,15 @@
// need to enable it manually when building for custom targets or using
// non-cargo build systems that don't run the build script.
//
-// With the exceptions mentioned above, the rustc-cfg strings below are
-// *not* public API. Please let us know by opening a GitHub issue if your build
-// environment requires some way to enable these cfgs other than by executing
-// our build script.
+// With the exceptions mentioned above, the rustc-cfg emitted by the build
+// script are *not* public API.
+
+#![warn(rust_2018_idioms, single_use_lifetimes)]
+
+use std::env;
+
+include!("no_atomic_cas.rs");
+
fn main() {
let target = match env::var("TARGET") {
Ok(target) => target,
@@ -35,27 +33,9 @@
// `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
// run. This is needed for compatibility with non-cargo build systems that
// don't run the build script.
- if NO_ATOMIC_CAS_TARGETS.contains(&&*target) {
+ if NO_ATOMIC_CAS.contains(&&*target) {
println!("cargo:rustc-cfg=futures_no_atomic_cas");
}
- let cfg = match AutoCfg::new() {
- Ok(cfg) => cfg,
- Err(e) => {
- println!(
- "cargo:warning={}: unable to determine rustc version: {}",
- env!("CARGO_PKG_NAME"),
- e
- );
- return;
- }
- };
-
- // Function like procedural macros in expressions patterns statements stabilized in Rust 1.45:
- // https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements
- if cfg.probe_rustc_version(1, 45) {
- println!("cargo:rustc-cfg=fn_like_proc_macro");
- }
-
println!("cargo:rerun-if-changed=no_atomic_cas.rs");
}
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
index 4708bf8..9b05d4b 100644
--- a/no_atomic_cas.rs
+++ b/no_atomic_cas.rs
@@ -1,7 +1,7 @@
// This file is @generated by no_atomic_cas.sh.
// It is not intended for manual editing.
-const NO_ATOMIC_CAS_TARGETS: &[&str] = &[
+const NO_ATOMIC_CAS: &[&str] = &[
"avr-unknown-gnu-atmega328",
"bpfeb-unknown-none",
"bpfel-unknown-none",
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs
index c5cdd9b..28f3b23 100644
--- a/src/async_await/join_mod.rs
+++ b/src/async_await/join_mod.rs
@@ -81,12 +81,10 @@
#[allow(unreachable_pub)]
#[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
pub use futures_macro::join_internal;
#[allow(unreachable_pub)]
#[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
pub use futures_macro::try_join_internal;
document_join_macro! {
diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs
index 37e938d..1d13067 100644
--- a/src/async_await/select_mod.rs
+++ b/src/async_await/select_mod.rs
@@ -29,9 +29,6 @@
/// It is also gated behind the `async-await` feature of this library, which is
/// activated by default.
///
- /// Note that `select!` relies on `proc-macro-hack`, and may require to set the
- /// compiler's recursion limit very high, e.g. `#![recursion_limit="1024"]`.
- ///
/// # Examples
///
/// ```
@@ -309,12 +306,10 @@
#[cfg(feature = "std")]
#[allow(unreachable_pub)]
#[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
pub use futures_macro::select_internal;
#[allow(unreachable_pub)]
#[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
pub use futures_macro::select_biased_internal;
document_select_macro! {
diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs
index 7743406..1c8002f 100644
--- a/src/async_await/stream_select_mod.rs
+++ b/src/async_await/stream_select_mod.rs
@@ -3,7 +3,6 @@
#[cfg(feature = "std")]
#[allow(unreachable_pub)]
#[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
pub use futures_macro::stream_select_internal;
/// Combines several streams, all producing the same `Item` type, into one stream.
@@ -13,10 +12,6 @@
///
/// If multiple streams are ready, one will be pseudo randomly selected at runtime.
///
-/// This macro is gated behind the `async-await` feature of this library, which is activated by default.
-/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion
-/// limit very high, e.g. `#![recursion_limit="1024"]`.
-///
/// # Examples
///
/// ```
diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs
index 17239a4..36de1da 100644
--- a/src/compat/compat01as03.rs
+++ b/src/compat/compat01as03.rs
@@ -64,6 +64,7 @@
/// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
/// # // feature issues
@@ -90,6 +91,7 @@
/// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::stream::StreamExt;
/// use futures_util::compat::Stream01CompatExt;
@@ -119,6 +121,7 @@
/// [`Sink<T, Error = E>`](futures_sink::Sink).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::{sink::SinkExt, stream::StreamExt};
/// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
@@ -351,8 +354,6 @@
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
mod io {
use super::*;
- #[cfg(feature = "read-initializer")]
- use futures_io::Initializer;
use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
use std::io::Error;
use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
@@ -364,6 +365,7 @@
/// [`AsyncRead`](futures_io::AsyncRead).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::io::AsyncReadExt;
/// use futures_util::compat::AsyncRead01CompatExt;
@@ -393,6 +395,7 @@
/// [`AsyncWrite`](futures_io::AsyncWrite).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::io::AsyncWriteExt;
/// use futures_util::compat::AsyncWrite01CompatExt;
@@ -416,16 +419,6 @@
impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- // check if `prepare_uninitialized_buffer` needs zeroing
- if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
- Initializer::zeroing()
- } else {
- Initializer::nop()
- }
- }
-
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs
index 2573fe7..5d3a6e9 100644
--- a/src/compat/compat03as01.rs
+++ b/src/compat/compat03as01.rs
@@ -236,17 +236,7 @@
}
}
- impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
- #[cfg(feature = "read-initializer")]
- unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
- let initializer = self.inner.initializer();
- let does_init = initializer.should_initialize();
- if does_init {
- initializer.initialize(buf);
- }
- does_init
- }
- }
+ impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
diff --git a/src/compat/executor.rs b/src/compat/executor.rs
index e25705b..ea0c67a 100644
--- a/src/compat/executor.rs
+++ b/src/compat/executor.rs
@@ -17,6 +17,7 @@
/// futures 0.3 [`Spawn`](futures_task::Spawn).
///
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::task::SpawnExt;
/// use futures::future::{FutureExt, TryFutureExt};
/// use futures_util::compat::Executor01CompatExt;
diff --git a/src/future/either.rs b/src/future/either.rs
index 35650da..9602de7 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -184,8 +184,6 @@
use core::pin::Pin;
use core::task::{Context, Poll};
- #[cfg(feature = "read-initializer")]
- use futures_io::Initializer;
use futures_io::{
AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
};
@@ -195,14 +193,6 @@
A: AsyncRead,
B: AsyncRead,
{
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- match self {
- Either::Left(x) => x.initializer(),
- Either::Right(x) => x.initializer(),
- }
- }
-
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs
index 1d13e0c..ec30ee3 100644
--- a/src/io/allow_std.rs
+++ b/src/io/allow_std.rs
@@ -1,6 +1,4 @@
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
use std::pin::Pin;
use std::{fmt, io};
@@ -121,10 +119,6 @@
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.0.initializer()
- }
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.0.read_to_end(buf)
}
@@ -155,11 +149,6 @@
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
}
-
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.0.initializer()
- }
}
impl<T> io::Seek for AllowStdIo<T>
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 2d585a9..0334a9f 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -2,8 +2,6 @@
use futures_core::future::Future;
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
use pin_project_lite::pin_project;
use std::io::{self, Read};
@@ -144,12 +142,6 @@
self.consume(nread);
Poll::Ready(Ok(nread))
}
-
- // we can't skip unconditionally because of the large buffer case in read.
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.inner.initializer()
- }
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs
index f292b87..cb74863 100644
--- a/src/io/buf_writer.rs
+++ b/src/io/buf_writer.rs
@@ -6,6 +6,7 @@
use std::fmt;
use std::io::{self, Write};
use std::pin::Pin;
+use std::ptr;
pin_project! {
/// Wraps a writer and buffers its output.
@@ -49,7 +50,7 @@
Self { inner, buf: Vec::with_capacity(cap), written: 0 }
}
- fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
let len = this.buf.len();
@@ -83,6 +84,68 @@
pub fn buffer(&self) -> &[u8] {
&self.buf
}
+
+ /// Capacity of `buf`. how many chars can be held in buffer
+ pub(super) fn capacity(&self) -> usize {
+ self.buf.capacity()
+ }
+
+ /// Remaining number of bytes to reach `buf` 's capacity
+ #[inline]
+ pub(super) fn spare_capacity(&self) -> usize {
+ self.buf.capacity() - self.buf.len()
+ }
+
+ /// Write a byte slice directly into buffer
+ ///
+ /// Will truncate the number of bytes written to `spare_capacity()` so you want to
+ /// calculate the size of your slice to avoid losing bytes
+ ///
+ /// Based on `std::io::BufWriter`
+ pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize {
+ let available = self.spare_capacity();
+ let amt_to_buffer = available.min(buf.len());
+
+ // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
+ unsafe {
+ self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
+ }
+
+ amt_to_buffer
+ }
+
+ /// Write byte slice directly into `self.buf`
+ ///
+ /// Based on `std::io::BufWriter`
+ #[inline]
+ unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) {
+ debug_assert!(buf.len() <= self.spare_capacity());
+ let this = self.project();
+ let old_len = this.buf.len();
+ let buf_len = buf.len();
+ let src = buf.as_ptr();
+ let dst = this.buf.as_mut_ptr().add(old_len);
+ ptr::copy_nonoverlapping(src, dst, buf_len);
+ this.buf.set_len(old_len + buf_len);
+ }
+
+ /// Write directly using `inner`, bypassing buffering
+ pub(super) fn inner_poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write(cx, buf)
+ }
+
+ /// Write directly using `inner`, bypassing buffering
+ pub(super) fn inner_poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write_vectored(cx, bufs)
+ }
}
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
diff --git a/src/io/chain.rs b/src/io/chain.rs
index a35c50d..728a3d2 100644
--- a/src/io/chain.rs
+++ b/src/io/chain.rs
@@ -1,7 +1,5 @@
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
use pin_project_lite::pin_project;
use std::fmt;
@@ -111,16 +109,6 @@
}
this.second.poll_read_vectored(cx, bufs)
}
-
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- let initializer = self.first.initializer();
- if initializer.should_initialize() {
- initializer
- } else {
- self.second.initializer()
- }
- }
}
impl<T, U> AsyncBufRead for Chain<T, U>
diff --git a/src/io/empty.rs b/src/io/empty.rs
index ab2395a..02f6103 100644
--- a/src/io/empty.rs
+++ b/src/io/empty.rs
@@ -1,6 +1,4 @@
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead};
use std::fmt;
use std::io;
@@ -43,12 +41,6 @@
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}
-
- #[cfg(feature = "read-initializer")]
- #[inline]
- unsafe fn initializer(&self) -> Initializer {
- Initializer::nop()
- }
}
impl AsyncBufRead for Empty {
diff --git a/src/io/line_writer.rs b/src/io/line_writer.rs
new file mode 100644
index 0000000..71cd668
--- /dev/null
+++ b/src/io/line_writer.rs
@@ -0,0 +1,155 @@
+use super::buf_writer::BufWriter;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_io::IoSlice;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+pin_project! {
+/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines
+///
+/// This was written based on `std::io::LineWriter` which goes into further details
+/// explaining the code.
+///
+/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter`
+/// to write on-each-line.
+#[derive(Debug)]
+pub struct LineWriter<W: AsyncWrite> {
+ #[pin]
+ buf_writer: BufWriter<W>,
+}
+}
+
+impl<W: AsyncWrite> LineWriter<W> {
+ /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB
+ /// which was taken from `std::io::LineWriter`
+ pub fn new(inner: W) -> LineWriter<W> {
+ LineWriter::with_capacity(1024, inner)
+ }
+
+ /// Creates a new `LineWriter` with the specified buffer capacity.
+ pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
+ LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) }
+ }
+
+ /// Flush `buf_writer` if last char is "new line"
+ fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = self.project();
+ match this.buf_writer.buffer().last().copied() {
+ Some(b'\n') => this.buf_writer.flush_buf(cx),
+ _ => Poll::Ready(Ok(())),
+ }
+ }
+
+ /// Returns a reference to `buf_writer`'s internally buffered data.
+ pub fn buffer(&self) -> &[u8] {
+ self.buf_writer.buffer()
+ }
+
+ /// Acquires a reference to the underlying sink or stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &W {
+ self.buf_writer.get_ref()
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for LineWriter<W> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let mut this = self.as_mut().project();
+ let newline_index = match memchr::memrchr(b'\n', buf) {
+ None => {
+ ready!(self.as_mut().flush_if_completed_line(cx)?);
+ return self.project().buf_writer.poll_write(cx, buf);
+ }
+ Some(newline_index) => newline_index + 1,
+ };
+
+ ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+ let lines = &buf[..newline_index];
+
+ let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? };
+
+ if flushed == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let tail = if flushed >= newline_index {
+ &buf[flushed..]
+ } else if newline_index - flushed <= this.buf_writer.capacity() {
+ &buf[flushed..newline_index]
+ } else {
+ let scan_area = &buf[flushed..];
+ let scan_area = &scan_area[..this.buf_writer.capacity()];
+ match memchr::memrchr(b'\n', scan_area) {
+ Some(newline_index) => &scan_area[..newline_index + 1],
+ None => scan_area,
+ }
+ };
+
+ let buffered = this.buf_writer.as_mut().write_to_buf(tail);
+ Poll::Ready(Ok(flushed + buffered))
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let mut this = self.as_mut().project();
+ // `is_write_vectored()` is handled in original code, but not in this crate
+ // see https://github.com/rust-lang/rust/issues/70436
+
+ let last_newline_buf_idx = bufs
+ .iter()
+ .enumerate()
+ .rev()
+ .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
+ let last_newline_buf_idx = match last_newline_buf_idx {
+ None => {
+ ready!(self.as_mut().flush_if_completed_line(cx)?);
+ return self.project().buf_writer.poll_write_vectored(cx, bufs);
+ }
+ Some(i) => i,
+ };
+
+ ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+ let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
+
+ let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? };
+ if flushed == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let lines_len = lines.iter().map(|buf| buf.len()).sum();
+ if flushed < lines_len {
+ return Poll::Ready(Ok(flushed));
+ }
+
+ let buffered: usize = tail
+ .iter()
+ .filter(|buf| !buf.is_empty())
+ .map(|buf| this.buf_writer.as_mut().write_to_buf(buf))
+ .take_while(|&n| n > 0)
+ .sum();
+
+ Poll::Ready(Ok(flushed + buffered))
+ }
+
+ /// Forward to `buf_writer` 's `BufWriter::poll_flush()`
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.as_mut().project().buf_writer.poll_flush(cx)
+ }
+
+ /// Forward to `buf_writer` 's `BufWriter::poll_close()`
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.as_mut().project().buf_writer.poll_close(cx)
+ }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 16cf5a7..4dd2e02 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -26,10 +26,6 @@
// Re-export some types from `std::io` so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
#[doc(no_inline)]
-#[cfg(feature = "read-initializer")]
-#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
-pub use std::io::Initializer;
-#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
@@ -40,15 +36,9 @@
/// Initializes a buffer if necessary.
///
-/// A buffer is always initialized if `read-initializer` feature is disabled.
+/// A buffer is currently always initialized.
#[inline]
unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
- #[cfg(feature = "read-initializer")]
- {
- if !_reader.initializer().should_initialize() {
- return;
- }
- }
ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
}
@@ -61,6 +51,9 @@
mod buf_writer;
pub use self::buf_writer::BufWriter;
+mod line_writer;
+pub use self::line_writer::LineWriter;
+
mod chain;
pub use self::chain::Chain;
diff --git a/src/io/repeat.rs b/src/io/repeat.rs
index 4cefcb2..2828bf0 100644
--- a/src/io/repeat.rs
+++ b/src/io/repeat.rs
@@ -1,7 +1,5 @@
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncRead, IoSliceMut};
use std::fmt;
use std::io;
@@ -59,12 +57,6 @@
}
Poll::Ready(Ok(nwritten))
}
-
- #[cfg(feature = "read-initializer")]
- #[inline]
- unsafe fn initializer(&self) -> Initializer {
- Initializer::nop()
- }
}
impl fmt::Debug for Repeat {
diff --git a/src/io/take.rs b/src/io/take.rs
index 0583020..2c49480 100644
--- a/src/io/take.rs
+++ b/src/io/take.rs
@@ -1,7 +1,5 @@
use futures_core::ready;
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead};
use pin_project_lite::pin_project;
use std::pin::Pin;
@@ -100,11 +98,6 @@
*this.limit -= n as u64;
Poll::Ready(Ok(n))
}
-
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> Initializer {
- self.inner.initializer()
- }
}
impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
diff --git a/src/lib.rs b/src/lib.rs
index 76d3799..9a10c93 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,7 +1,6 @@
//! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
//! and the `AsyncRead` and `AsyncWrite` traits.
-#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
#![cfg_attr(not(feature = "std"), no_std)]
#![warn(
@@ -23,9 +22,6 @@
#[cfg(all(feature = "bilock", not(feature = "unstable")))]
compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
-compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
#[cfg(feature = "alloc")]
extern crate alloc;
@@ -148,11 +144,6 @@
#[cfg(feature = "std")]
macro_rules! delegate_async_read {
($field:ident) => {
- #[cfg(feature = "read-initializer")]
- unsafe fn initializer(&self) -> $crate::io::Initializer {
- self.$field.initializer()
- }
-
fn poll_read(
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 4a05d88..fdbd53d 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -121,8 +121,9 @@
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
+ woken: AtomicBool::new(false),
});
- let stub_ptr = &*stub as *const Task<Fut>;
+ let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
waker: AtomicWaker::new(),
head: AtomicPtr::new(stub_ptr as *mut _),
@@ -167,6 +168,7 @@
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
+ woken: AtomicBool::new(false),
});
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -375,7 +377,7 @@
// The `ReadyToRunQueue` stub is never inserted into the `head_all`
// list, and its pointer value will remain valid for the lifetime of
// this `FuturesUnordered`, so we can make use of its value here.
- &*self.ready_to_run_queue.stub as *const _ as *mut _
+ Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _
}
}
@@ -383,25 +385,12 @@
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- // Variable to determine how many times it is allowed to poll underlying
- // futures without yielding.
- //
- // A single call to `poll_next` may potentially do a lot of work before
- // yielding. This happens in particular if the underlying futures are awoken
- // frequently but continue to return `Pending`. This is problematic if other
- // tasks are waiting on the executor, since they do not get to run. This value
- // caps the number of calls to `poll` on underlying futures a single call to
- // `poll_next` is allowed to make.
- //
- // The value is the length of FuturesUnordered. This ensures that each
- // future is polled only once at most per iteration.
- //
- // See also https://github.com/rust-lang/futures-rs/issues/2047.
- let yield_every = self.len();
+ let len = self.len();
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
+ let mut yielded = 0;
// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
@@ -512,7 +501,11 @@
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
- let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
+ let task = bomb.task.as_ref().unwrap();
+ // We are only interested in whether the future is awoken before it
+ // finishes polling, so reset the flag here.
+ task.woken.store(false, Relaxed);
+ let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);
// Safety: We won't move the future ever again
@@ -525,12 +518,17 @@
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
+ // If the future was awoken during polling, we assume
+ // the future wanted to explicitly yield.
+ yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);
- if polled == yield_every {
- // We have polled a large number of futures in a row without yielding.
- // To ensure we do not starve other tasks waiting on the executor,
- // we yield here, but immediately wake ourselves up to continue.
+ // If a future yields, we respect it and yield here.
+ // If all futures have been polled, we also yield here to
+ // avoid starving other tasks waiting on the executor.
+ // (polling the same future twice per iteration may cause
+ // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
+ if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs
index 5ef6cde..4518705 100644
--- a/src/stream/futures_unordered/ready_to_run_queue.rs
+++ b/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -83,7 +83,7 @@
}
pub(super) fn stub(&self) -> *const Task<Fut> {
- &*self.stub
+ Arc::as_ptr(&self.stub)
}
// Clear the queue of tasks.
diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs
index da2cd67..ec2114e 100644
--- a/src/stream/futures_unordered/task.rs
+++ b/src/stream/futures_unordered/task.rs
@@ -1,6 +1,6 @@
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::sync::atomic::Ordering::{self, SeqCst};
+use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use super::abort::abort;
@@ -31,6 +31,11 @@
// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,
+
+ // Whether the future was awoken during polling
+ // It is possible for this flag to be set to true after the polling,
+ // but it will be ignored.
+ pub(super) woken: AtomicBool,
}
// `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@
None => return,
};
+ arc_self.woken.store(true, Relaxed);
+
// It's our job to enqueue this task it into the ready to run queue. To
// do this we set the `queued` flag, and if successful we then do the
// actual queueing operation, ensuring that we're only queued once.
@@ -62,7 +69,7 @@
// still.
let prev = arc_self.queued.swap(true, SeqCst);
if !prev {
- inner.enqueue(&**arc_self);
+ inner.enqueue(Arc::as_ptr(arc_self));
inner.waker.wake();
}
}
diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs
new file mode 100644
index 0000000..513cab7
--- /dev/null
+++ b/src/stream/stream/count.rs
@@ -0,0 +1,53 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+ /// Future for the [`count`](super::StreamExt::count) method.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Count<St> {
+ #[pin]
+ stream: St,
+ count: usize
+ }
+}
+
+impl<St> fmt::Debug for Count<St>
+where
+ St: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
+ }
+}
+
+impl<St: Stream> Count<St> {
+ pub(super) fn new(stream: St) -> Self {
+ Self { stream, count: 0 }
+ }
+}
+
+impl<St: FusedStream> FusedFuture for Count<St> {
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+impl<St: Stream> Future for Count<St> {
+ type Output = usize;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+
+ Poll::Ready(loop {
+ match ready!(this.stream.as_mut().poll_next(cx)) {
+ Some(_) => *this.count += 1,
+ None => break *this.count,
+ }
+ })
+ }
+}
diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs
new file mode 100644
index 0000000..07f971c
--- /dev/null
+++ b/src/stream/stream/flatten_unordered.rs
@@ -0,0 +1,509 @@
+use alloc::sync::Arc;
+use core::{
+ cell::UnsafeCell,
+ convert::identity,
+ fmt,
+ num::NonZeroUsize,
+ pin::Pin,
+ sync::atomic::{AtomicU8, Ordering},
+};
+
+use pin_project_lite::pin_project;
+
+use futures_core::{
+ future::Future,
+ ready,
+ stream::{FusedStream, Stream},
+ task::{Context, Poll, Waker},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use futures_task::{waker, ArcWake};
+
+use crate::stream::FuturesUnordered;
+
+/// There is nothing to poll and stream isn't being
+/// polled or waking at the moment.
+const NONE: u8 = 0;
+
+/// Inner streams need to be polled.
+const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
+
+/// The base stream needs to be polled.
+const NEED_TO_POLL_STREAM: u8 = 0b10;
+
+/// It needs to poll base stream and inner streams.
+const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
+
+/// The current stream is being polled at the moment.
+const POLLING: u8 = 0b100;
+
+/// Inner streams are being woken at the moment.
+const WAKING_INNER_STREAMS: u8 = 0b1000;
+
+/// The base stream is being woken at the moment.
+const WAKING_STREAM: u8 = 0b10000;
+
+/// The base stream and inner streams are being woken at the moment.
+const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
+
+/// The stream was waked and will be polled.
+const WOKEN: u8 = 0b100000;
+
+/// Determines what needs to be polled, and is stream being polled at the
+/// moment or not.
+#[derive(Clone, Debug)]
+struct SharedPollState {
+ state: Arc<AtomicU8>,
+}
+
+impl SharedPollState {
+ /// Constructs new `SharedPollState` with the given state.
+ fn new(value: u8) -> SharedPollState {
+ SharedPollState { state: Arc::new(AtomicU8::new(value)) }
+ }
+
+ /// Attempts to start polling, returning stored state in case of success.
+ /// Returns `None` if some waker is waking at the moment.
+ fn start_polling(
+ &self,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ if value & WAKING_ALL == NONE {
+ Some(POLLING)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+ let bomb = PollStateBomb::new(self, SharedPollState::reset);
+
+ Some((value, bomb))
+ }
+
+ /// Starts the waking process and performs bitwise or with the given value.
+ fn start_waking(
+ &self,
+ to_poll: u8,
+ waking: u8,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ // Waking process for this waker already started
+ if value & waking != NONE {
+ return None;
+ }
+ let mut next_value = value | to_poll;
+ // Only start the waking process if we're not in the polling phase and the stream isn't woken already
+ if value & (WOKEN | POLLING) == NONE {
+ next_value |= waking;
+ }
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+
+ if value & (WOKEN | POLLING) == NONE {
+ let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
+
+ Some((value, bomb))
+ } else {
+ None
+ }
+ }
+
+ /// Sets current state to
+ /// - `!POLLING` allowing to use wakers
+ /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
+ /// or `will_be_woken` flag supplied
+ /// - `!WAKING_ALL` as
+ /// * Wakers called during the `POLLING` phase won't propagate their calls
+ /// * `POLLING` phase can't start if some of the wakers are active
+ /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
+ fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 {
+ self.state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
+ let mut next_value = to_poll;
+
+ value &= NEED_TO_POLL_ALL;
+ if value != NONE || will_be_woken {
+ next_value |= WOKEN;
+ }
+ next_value |= value;
+
+ Some(next_value & !POLLING & !WAKING_ALL)
+ })
+ .unwrap()
+ }
+
+ /// Toggles state to non-waking, allowing to start polling.
+ fn stop_waking(&self, waking: u8) -> u8 {
+ self.state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ let mut next_value = value & !waking;
+ // Waker will be called only if the current waking state is the same as the specified waker state
+ if value & WAKING_ALL == waking {
+ next_value |= WOKEN;
+ }
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .unwrap_or_else(identity)
+ }
+
+ /// Resets current state allowing to poll the stream and wake up wakers.
+ fn reset(&self) -> u8 {
+ self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
+ }
+}
+
+/// Used to execute some function on the given state when dropped.
+struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> {
+ state: &'a SharedPollState,
+ drop: Option<F>,
+}
+
+impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
+ /// Constructs new bomb with the given state.
+ fn new(state: &'a SharedPollState, drop: F) -> Self {
+ Self { state, drop: Some(drop) }
+ }
+
+ /// Deactivates bomb, forces it to not call provided function when dropped.
+ fn deactivate(mut self) {
+ self.drop.take();
+ }
+
+ /// Manually fires the bomb, returning supplied state.
+ fn fire(mut self) -> Option<u8> {
+ self.drop.take().map(|drop| (drop)(self.state))
+ }
+}
+
+impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
+ fn drop(&mut self) {
+ if let Some(drop) = self.drop.take() {
+ (drop)(self.state);
+ }
+ }
+}
+
+/// Will update state with the provided value on `wake_by_ref` call
+/// and then, if there is a need, call `inner_waker`.
+struct InnerWaker {
+ inner_waker: UnsafeCell<Option<Waker>>,
+ poll_state: SharedPollState,
+ need_to_poll: u8,
+}
+
+unsafe impl Send for InnerWaker {}
+unsafe impl Sync for InnerWaker {}
+
+impl InnerWaker {
+ /// Replaces given waker's inner_waker for polling stream/futures which will
+ /// update poll state on `wake_by_ref` call. Use only if you need several
+ /// contexts.
+ ///
+ /// ## Safety
+ ///
+ /// This function will modify waker's `inner_waker` via `UnsafeCell`, so
+ /// it should be used only during `POLLING` phase.
+ unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker {
+ *self_arc.inner_waker.get() = cx.waker().clone().into();
+ waker(self_arc.clone())
+ }
+
+ /// Attempts to start the waking process for the waker with the given value.
+ /// If succeeded, then the stream isn't yet woken and not being polled at the moment.
+ fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ self.poll_state.start_waking(self.need_to_poll, self.waking_state())
+ }
+
+ /// Returns the corresponding waking state toggled by this waker.
+ fn waking_state(&self) -> u8 {
+ self.need_to_poll << 3
+ }
+}
+
+impl ArcWake for InnerWaker {
+ fn wake_by_ref(self_arc: &Arc<Self>) {
+ if let Some((_, state_bomb)) = self_arc.start_waking() {
+ // Safety: now state is not `POLLING`
+ let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
+
+ if let Some(inner_waker) = waker_opt.clone() {
+ // Stop waking to allow polling stream
+ let poll_state_value = state_bomb.fire().unwrap();
+
+ // Here we want to call waker only if stream isn't woken yet and
+ // also to optimize the case when two wakers are called at the same time.
+ //
+ // In this case the best strategy will be to propagate only the latest waker's awake,
+ // and then poll both entities in a single `poll_next` call
+ if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() {
+ // Wake up inner waker
+ inner_waker.wake();
+ }
+ }
+ }
+ }
+}
+
+pin_project! {
+ /// Future which contains optional stream.
+ ///
+ /// If it's `Some`, it will attempt to call `poll_next` on it,
+ /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
+ /// or `None` in case of `Poll::Ready(None)`.
+ ///
+ /// If `poll_next` will return `Poll::Pending`, it will be forwarded to
+ /// the future and current task will be notified by waker.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ struct PollStreamFut<St> {
+ #[pin]
+ stream: Option<St>,
+ }
+}
+
+impl<St> PollStreamFut<St> {
+ /// Constructs new `PollStreamFut` using given `stream`.
+ fn new(stream: impl Into<Option<St>>) -> Self {
+ Self { stream: stream.into() }
+ }
+}
+
+impl<St: Stream + Unpin> Future for PollStreamFut<St> {
+ type Output = Option<(St::Item, PollStreamFut<St>)>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut stream = self.project().stream;
+
+ let item = if let Some(stream) = stream.as_mut().as_pin_mut() {
+ ready!(stream.poll_next(cx))
+ } else {
+ None
+ };
+ let next_item_fut = PollStreamFut::new(stream.get_mut().take());
+ let out = item.map(|item| (item, next_item_fut));
+
+ Poll::Ready(out)
+ }
+}
+
+pin_project! {
+ /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+ /// method.
+ #[project = FlattenUnorderedProj]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FlattenUnordered<St> where St: Stream {
+ #[pin]
+ inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
+ #[pin]
+ stream: St,
+ poll_state: SharedPollState,
+ limit: Option<NonZeroUsize>,
+ is_stream_done: bool,
+ inner_streams_waker: Arc<InnerWaker>,
+ stream_waker: Arc<InnerWaker>,
+ }
+}
+
+impl<St> fmt::Debug for FlattenUnordered<St>
+where
+ St: Stream + fmt::Debug,
+ St::Item: Stream + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlattenUnordered")
+ .field("poll_state", &self.poll_state)
+ .field("inner_streams", &self.inner_streams)
+ .field("limit", &self.limit)
+ .field("stream", &self.stream)
+ .field("is_stream_done", &self.is_stream_done)
+ .finish()
+ }
+}
+
+impl<St> FlattenUnordered<St>
+where
+ St: Stream,
+ St::Item: Stream + Unpin,
+{
+ pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
+ let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
+
+ FlattenUnordered {
+ inner_streams: FuturesUnordered::new(),
+ stream,
+ is_stream_done: false,
+ limit: limit.and_then(NonZeroUsize::new),
+ inner_streams_waker: Arc::new(InnerWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_INNER_STREAMS,
+ }),
+ stream_waker: Arc::new(InnerWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_STREAM,
+ }),
+ poll_state,
+ }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+impl<St> FlattenUnorderedProj<'_, St>
+where
+ St: Stream,
+{
+ /// Checks if current `inner_streams` size is less than optional limit.
+ fn is_exceeded_limit(&self) -> bool {
+ self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
+ }
+}
+
+impl<St> FusedStream for FlattenUnordered<St>
+where
+ St: FusedStream,
+ St::Item: FusedStream + Unpin,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.inner_streams.is_empty()
+ }
+}
+
+impl<St> Stream for FlattenUnordered<St>
+where
+ St: Stream,
+ St::Item: Stream + Unpin,
+{
+ type Item = <St::Item as Stream>::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut next_item = None;
+ let mut need_to_poll_next = NONE;
+
+ let mut this = self.as_mut().project();
+
+ let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() {
+ Some(value) => value,
+ _ => {
+ // Waker was called, just wait for the next poll
+ return Poll::Pending;
+ }
+ };
+
+ if poll_state_value & NEED_TO_POLL_STREAM != NONE {
+ // Safety: now state is `POLLING`.
+ let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };
+
+ // Here we need to poll the base stream.
+ //
+ // To improve performance, we will attempt to place as many items as we can
+ // to the `FuturesUnordered` bucket before polling inner streams
+ loop {
+ if this.is_exceeded_limit() || *this.is_stream_done {
+ // We either exceeded the limit or the stream is exhausted
+ if !*this.is_stream_done {
+ // The stream needs to be polled in the next iteration
+ need_to_poll_next |= NEED_TO_POLL_STREAM;
+ }
+
+ break;
+ } else {
+ match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) {
+ Poll::Ready(Some(inner_stream)) => {
+ // Add new stream to the inner streams bucket
+ this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream));
+ // Inner streams must be polled afterward
+ poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(None) => {
+ // Mark the stream as done
+ *this.is_stream_done = true;
+ }
+ Poll::Pending => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
+ // Safety: now state is `POLLING`.
+ let inner_streams_waker =
+ unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) };
+
+ match this
+ .inner_streams
+ .as_mut()
+ .poll_next(&mut Context::from_waker(&inner_streams_waker))
+ {
+ Poll::Ready(Some(Some((item, next_item_fut)))) => {
+ // Push next inner stream item future to the list of inner streams futures
+ this.inner_streams.as_mut().push(next_item_fut);
+ // Take the received item
+ next_item = Some(item);
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(Some(None)) => {
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ _ => {}
+ }
+ }
+
+ // We didn't have any `poll_next` panic, so it's time to deactivate the bomb
+ state_bomb.deactivate();
+
+ let mut force_wake =
+ // we need to poll the stream and didn't reach the limit yet
+ need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
+ // or we need to poll inner streams again
+ || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;
+
+ // Stop polling and swap the latest state
+ poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
+ // If state was changed during `POLLING` phase, need to manually call a waker
+ force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;
+
+ let is_done = *this.is_stream_done && this.inner_streams.is_empty();
+
+ if next_item.is_some() || is_done {
+ Poll::Ready(next_item)
+ } else {
+ if force_wake {
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for FlattenUnordered<St>
+where
+ St: Stream + Sink<Item>,
+{
+ type Error = St::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 86997f4..642b91e 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -40,6 +40,10 @@
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::concat::Concat;
+mod count;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::count::Count;
+
mod cycle;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::cycle::Cycle;
@@ -195,6 +199,25 @@
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
+mod flatten_unordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)]
+pub use self::flatten_unordered::FlattenUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+delegate_all!(
+ /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
+ FlatMapUnordered<St, U, F>(
+ FlattenUnordered<Map<St, F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
+ where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
+);
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
mod for_each_concurrent;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
@@ -386,9 +409,9 @@
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
+ /// let events = stream.filter(|x| future::ready(x % 2 == 0));
///
- /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
@@ -418,11 +441,11 @@
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter_map(|x| async move {
+ /// let events = stream.filter_map(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
///
- /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
@@ -576,6 +599,38 @@
assert_future::<Self::Item, _>(Concat::new(self))
}
+ /// Drives the stream to completion, counting the number of items.
+ ///
+ /// # Overflow Behavior
+ ///
+ /// The method does no guarding against overflows, so counting elements of a
+ /// stream with more than [`usize::MAX`] elements either produces the wrong
+ /// result or panics. If debug assertions are enabled, a panic is guaranteed.
+ ///
+ /// # Panics
+ ///
+ /// This function might panic if the iterator has more than [`usize::MAX`]
+ /// elements.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..=10);
+ /// let count = stream.count().await;
+ ///
+ /// assert_eq!(count, 10);
+ /// # });
+ /// ```
+ fn count(self) -> Count<Self>
+ where
+ Self: Sized,
+ {
+ assert_future::<usize, _>(Count::new(self))
+ }
+
/// Repeats a stream endlessly.
///
/// The stream never terminates. Note that you likely want to avoid
@@ -718,13 +773,57 @@
assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
}
+ /// Flattens a stream of streams into just one continuous stream. Polls
+ /// inner streams concurrently.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::StreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(1).unwrap();
+ /// tx1.unbounded_send(2).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(3).unwrap();
+ /// tx2.unbounded_send(4).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(rx1).unwrap();
+ /// tx3.unbounded_send(rx2).unwrap();
+ /// });
+ ///
+ /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
+ /// output.sort();
+ ///
+ /// assert_eq!(output, vec![1, 2, 3, 4]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
+ where
+ Self::Item: Stream + Unpin,
+ Self: Sized,
+ {
+ FlattenUnordered::new(self, limit.into())
+ }
+
/// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
///
/// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
/// you would have to chain combinators like `.map(f).flatten()` while this
/// combinator provides ability to write `.flat_map(f)` instead of chaining.
///
- /// The provided closure which produce inner streams is executed over all elements
+ /// The provided closure which produces inner streams is executed over all elements
/// of stream as last inner stream is terminated and next stream item is available.
///
/// Note that this function consumes the stream passed into it and returns a
@@ -752,6 +851,59 @@
assert_stream::<U::Item, _>(FlatMap::new(self, f))
}
+ /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
+ /// and polls them concurrently, yielding items in any order, as they made
+ /// available.
+ ///
+ /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
+ /// instead, and you need to poll all of them concurrently, you would
+ /// have to use something like `for_each_concurrent` and merge values
+ /// by hand. This combinator provides ability to collect all values
+ /// from concurrently polled streams into one stream.
+ ///
+ /// The first argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled concurrently. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// The provided closure which produces inner streams is executed over
+ /// all elements of stream as next stream item is available and limit
+ /// of concurrently processed streams isn't exceeded.
+ ///
+ /// Note that this function consumes the stream passed into it and
+ /// returns a wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..5);
+ /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
+ /// let mut values = stream.collect::<Vec<_>>().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flat_map_unordered<U, F>(
+ self,
+ limit: impl Into<Option<usize>>,
+ f: F,
+ ) -> FlatMapUnordered<Self, U, F>
+ where
+ U: Stream + Unpin,
+ F: FnMut(Self::Item) -> U,
+ Self: Sized,
+ {
+ FlatMapUnordered::new(self, limit.into(), f)
+ }
+
/// Combinator similar to [`StreamExt::fold`] that holds internal state
/// and produces a new stream.
///
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index 8724145..f5cfde9 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -118,11 +118,11 @@
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
-impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F>
+impl<St, S, Fut, F, Item> Sink<Item> for Scan<St, S, Fut, F>
where
- S: Stream + Sink<Item>,
+ St: Stream + Sink<Item>,
{
- type Error = S::Error;
+ type Error = St::Error;
delegate_sink!(stream, Item);
}
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 455ddca..6bf2cb7 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -736,17 +736,21 @@
/// thread::spawn(move || {
/// tx2.unbounded_send(Ok(2)).unwrap();
/// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
/// });
/// thread::spawn(move || {
/// tx3.unbounded_send(Ok(rx1)).unwrap();
/// tx3.unbounded_send(Ok(rx2)).unwrap();
- /// tx3.unbounded_send(Err(4)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
/// });
///
/// let mut stream = rx3.try_flatten();
/// assert_eq!(stream.next().await, Some(Ok(1)));
/// assert_eq!(stream.next().await, Some(Ok(2)));
/// assert_eq!(stream.next().await, Some(Err(3)));
+ /// assert_eq!(stream.next().await, Some(Ok(4)));
+ /// assert_eq!(stream.next().await, Some(Err(5)));
+ /// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn try_flatten(self) -> TryFlatten<Self>
@@ -1001,6 +1005,7 @@
/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::future::{FutureExt, TryFutureExt};
/// # let (tx, rx) = futures::channel::oneshot::channel();
///
diff --git a/src/task/mod.rs b/src/task/mod.rs
index eff6d48..0a31eea 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -16,7 +16,6 @@
pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj};
pub use futures_task::noop_waker;
-#[cfg(feature = "std")]
pub use futures_task::noop_waker_ref;
#[cfg(not(futures_no_atomic_cas))]
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index f877923..87ca360 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -34,6 +34,7 @@
/// today. Feel free to use this method in the meantime.
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
/// use futures::executor::ThreadPool;
/// use futures::task::SpawnExt;
///
@@ -58,6 +59,7 @@
/// resolves to the output of the spawned future.
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
/// use futures::executor::{block_on, ThreadPool};
/// use futures::future;
/// use futures::task::SpawnExt;
@@ -136,6 +138,7 @@
/// resolves to the output of the spawned future.
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
/// use futures::executor::LocalPool;
/// use futures::task::LocalSpawnExt;
///