Update futures-util to 0.3.21 am: 737dc97288 am: b9180251b3 am: e2f2e216a9

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/2004175

Change-Id: Iacbebadeebe2be67366f7b2932d3b2619e3eb541
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index ffd4f55..b52386f 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
 {
   "git": {
-    "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
-  }
-}
+    "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+  },
+  "path_in_vcs": "futures-util"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index b7a9667..9d3dea9 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@
     host_supported: true,
     crate_name: "futures_util",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.3.17",
+    cargo_pkg_version: "0.3.21",
     srcs: ["src/lib.rs"],
     test_suites: ["general-tests"],
     auto_gen_config: true,
@@ -62,13 +62,10 @@
         "futures-sink",
         "io",
         "memchr",
-        "proc-macro-hack",
-        "proc-macro-nested",
         "sink",
         "slab",
         "std",
     ],
-    cfgs: ["fn_like_proc_macro"],
     rustlibs: [
         "libfutures_channel",
         "libfutures_core",
@@ -78,14 +75,10 @@
         "libmemchr",
         "libpin_project_lite",
         "libpin_utils",
-        "libproc_macro_nested",
         "libslab",
         "libtokio",
     ],
-    proc_macros: [
-        "libfutures_macro",
-        "libproc_macro_hack",
-    ],
+    proc_macros: ["libfutures_macro"],
 }
 
 rust_library {
@@ -93,7 +86,7 @@
     host_supported: true,
     crate_name: "futures_util",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.3.17",
+    cargo_pkg_version: "0.3.21",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
@@ -108,13 +101,10 @@
         "futures-sink",
         "io",
         "memchr",
-        "proc-macro-hack",
-        "proc-macro-nested",
         "sink",
         "slab",
         "std",
     ],
-    cfgs: ["fn_like_proc_macro"],
     rustlibs: [
         "libfutures_channel",
         "libfutures_core",
@@ -124,13 +114,9 @@
         "libmemchr",
         "libpin_project_lite",
         "libpin_utils",
-        "libproc_macro_nested",
         "libslab",
     ],
-    proc_macros: [
-        "libfutures_macro",
-        "libproc_macro_hack",
-    ],
+    proc_macros: ["libfutures_macro"],
     apex_available: [
         "//apex_available:platform",
         "com.android.bluetooth",
diff --git a/Cargo.toml b/Cargo.toml
index 90010e9..a148319 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,45 +11,51 @@
 
 [package]
 edition = "2018"
+rust-version = "1.45"
 name = "futures-util"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "Common utilities and extension traits for the futures-rs library.\n"
+version = "0.3.21"
+description = """
+Common utilities and extension traits for the futures-rs library.
+"""
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3"
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
+
 [package.metadata.docs.rs]
 all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+    "--cfg",
+    "docsrs",
+]
+
 [dependencies.futures-channel]
-version = "0.3.17"
+version = "0.3.21"
 features = ["std"]
 optional = true
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.17"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.17"
+version = "0.3.21"
 features = ["std"]
 optional = true
 default-features = false
 
 [dependencies.futures-macro]
-version = "=0.3.17"
+version = "=0.3.21"
 optional = true
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.17"
+version = "0.3.21"
 optional = true
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.17"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures_01]
@@ -67,14 +73,6 @@
 [dependencies.pin-utils]
 version = "0.1.0"
 
-[dependencies.proc-macro-hack]
-version = "0.5.19"
-optional = true
-
-[dependencies.proc-macro-nested]
-version = "0.1.2"
-optional = true
-
 [dependencies.slab]
 version = "0.4.2"
 optional = true
@@ -82,24 +80,54 @@
 [dependencies.tokio-io]
 version = "0.1.9"
 optional = true
+
 [dev-dependencies.tokio]
 version = "0.1.11"
-[build-dependencies.autocfg]
-version = "1"
 
 [features]
-alloc = ["futures-core/alloc", "futures-task/alloc"]
+alloc = [
+    "futures-core/alloc",
+    "futures-task/alloc",
+]
 async-await = []
-async-await-macro = ["async-await", "futures-macro", "proc-macro-hack", "proc-macro-nested"]
+async-await-macro = [
+    "async-await",
+    "futures-macro",
+]
 bilock = []
 cfg-target-has-atomic = []
-channel = ["std", "futures-channel"]
-compat = ["std", "futures_01"]
-default = ["std", "async-await", "async-await-macro"]
-io = ["std", "futures-io", "memchr"]
-io-compat = ["io", "compat", "tokio-io"]
-read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
+channel = [
+    "std",
+    "futures-channel",
+]
+compat = [
+    "std",
+    "futures_01",
+]
+default = [
+    "std",
+    "async-await",
+    "async-await-macro",
+]
+io = [
+    "std",
+    "futures-io",
+    "memchr",
+]
+io-compat = [
+    "io",
+    "compat",
+    "tokio-io",
+]
 sink = ["futures-sink"]
-std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
-unstable = ["futures-core/unstable", "futures-task/unstable"]
+std = [
+    "alloc",
+    "futures-core/std",
+    "futures-task/std",
+    "slab",
+]
+unstable = [
+    "futures-core/unstable",
+    "futures-task/unstable",
+]
 write-all-vectored = ["io"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index a8e9362..46ec854 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,11 @@
 [package]
 name = "futures-util"
+version = "0.3.21"
 edition = "2018"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3"
 description = """
 Common utilities and extension traits for the futures-rs library.
 """
@@ -16,7 +15,7 @@
 std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
 alloc = ["futures-core/alloc", "futures-task/alloc"]
 async-await = []
-async-await-macro = ["async-await", "futures-macro", "proc-macro-hack", "proc-macro-nested"]
+async-await-macro = ["async-await", "futures-macro"]
 compat = ["std", "futures_01"]
 io-compat = ["io", "compat", "tokio-io"]
 sink = ["futures-sink"]
@@ -28,25 +27,19 @@
 # `unstable` feature as an explicit opt-in to unstable API.
 unstable = ["futures-core/unstable", "futures-task/unstable"]
 bilock = []
-read-initializer = ["io", "futures-io/read-initializer", "futures-io/unstable"]
 write-all-vectored = ["io"]
 
 # These features are no longer used.
 # TODO: remove in the next major version.
 cfg-target-has-atomic = []
 
-[build-dependencies]
-autocfg = "1"
-
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.17", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.17", default-features = false, optional = true }
-proc-macro-hack = { version = "0.5.19", optional = true }
-proc-macro-nested = { version = "0.1.2", optional = true }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.21", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.21", default-features = false, optional = true }
 slab = { version = "0.4.2", optional = true }
 memchr = { version = "2.2", optional = true }
 futures_01 = { version = "0.1.25", optional = true, package = "futures" }
diff --git a/METADATA b/METADATA
index 0ace75a..ed41eb9 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures-util/futures-util-0.3.17.crate"
+    value: "https://static.crates.io/crates/futures-util/futures-util-0.3.21.crate"
   }
-  version: "0.3.17"
+  version: "0.3.21"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2021
-    month: 9
-    day: 22
+    year: 2022
+    month: 3
+    day: 1
   }
 }
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6e0aaed
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+# futures-util
+
+Common utilities and extension traits for the futures-rs library.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures-util = "0.3"
+```
+
+The current `futures-util` requires Rust 1.45 or later.
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/benches/flatten_unordered.rs b/benches/flatten_unordered.rs
new file mode 100644
index 0000000..64d5f9a
--- /dev/null
+++ b/benches/flatten_unordered.rs
@@ -0,0 +1,66 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshot_streams(b: &mut Bencher) {
+    const STREAM_COUNT: usize = 10_000;
+    const STREAM_ITEM_COUNT: usize = 1;
+
+    b.iter(|| {
+        let mut txs = VecDeque::with_capacity(STREAM_COUNT);
+        let mut rxs = Vec::new();
+
+        for _ in 0..STREAM_COUNT {
+            let (tx, rx) = oneshot::channel();
+            txs.push_back(tx);
+            rxs.push(rx);
+        }
+
+        thread::spawn(move || {
+            let mut last = 1;
+            while let Some(tx) = txs.pop_front() {
+                let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
+                last += STREAM_ITEM_COUNT;
+            }
+        });
+
+        let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
+            async {
+                if let Some(next) = vals.next() {
+                    let val = next.await.unwrap();
+                    Some((val, vals))
+                } else {
+                    None
+                }
+            }
+            .boxed()
+        })
+        .flatten_unordered(None);
+
+        block_on(future::poll_fn(move |cx| {
+            let mut count = 0;
+            loop {
+                match flatten.poll_next_unpin(cx) {
+                    Poll::Ready(None) => break,
+                    Poll::Ready(Some(_)) => {
+                        count += 1;
+                    }
+                    _ => {}
+                }
+            }
+            assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
+
+            Poll::Ready(())
+        }))
+    });
+}
diff --git a/build.rs b/build.rs
index f8aa5fe..05e0496 100644
--- a/build.rs
+++ b/build.rs
@@ -1,10 +1,3 @@
-#![warn(rust_2018_idioms, single_use_lifetimes)]
-
-use autocfg::AutoCfg;
-use std::env;
-
-include!("no_atomic_cas.rs");
-
 // The rustc-cfg listed below are considered public API, but it is *unstable*
 // and outside of the normal semver guarantees:
 //
@@ -14,10 +7,15 @@
 //      need to enable it manually when building for custom targets or using
 //      non-cargo build systems that don't run the build script.
 //
-// With the exceptions mentioned above, the rustc-cfg strings below are
-// *not* public API. Please let us know by opening a GitHub issue if your build
-// environment requires some way to enable these cfgs other than by executing
-// our build script.
+// With the exceptions mentioned above, the rustc-cfg emitted by the build
+// script are *not* public API.
+
+#![warn(rust_2018_idioms, single_use_lifetimes)]
+
+use std::env;
+
+include!("no_atomic_cas.rs");
+
 fn main() {
     let target = match env::var("TARGET") {
         Ok(target) => target,
@@ -35,27 +33,9 @@
     // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
     // run. This is needed for compatibility with non-cargo build systems that
     // don't run the build script.
-    if NO_ATOMIC_CAS_TARGETS.contains(&&*target) {
+    if NO_ATOMIC_CAS.contains(&&*target) {
         println!("cargo:rustc-cfg=futures_no_atomic_cas");
     }
 
-    let cfg = match AutoCfg::new() {
-        Ok(cfg) => cfg,
-        Err(e) => {
-            println!(
-                "cargo:warning={}: unable to determine rustc version: {}",
-                env!("CARGO_PKG_NAME"),
-                e
-            );
-            return;
-        }
-    };
-
-    // Function like procedural macros in expressions patterns statements stabilized in Rust 1.45:
-    // https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements
-    if cfg.probe_rustc_version(1, 45) {
-        println!("cargo:rustc-cfg=fn_like_proc_macro");
-    }
-
     println!("cargo:rerun-if-changed=no_atomic_cas.rs");
 }
diff --git a/no_atomic_cas.rs b/no_atomic_cas.rs
index 4708bf8..9b05d4b 100644
--- a/no_atomic_cas.rs
+++ b/no_atomic_cas.rs
@@ -1,7 +1,7 @@
 // This file is @generated by no_atomic_cas.sh.
 // It is not intended for manual editing.
 
-const NO_ATOMIC_CAS_TARGETS: &[&str] = &[
+const NO_ATOMIC_CAS: &[&str] = &[
     "avr-unknown-gnu-atmega328",
     "bpfeb-unknown-none",
     "bpfel-unknown-none",
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs
index c5cdd9b..28f3b23 100644
--- a/src/async_await/join_mod.rs
+++ b/src/async_await/join_mod.rs
@@ -81,12 +81,10 @@
 
 #[allow(unreachable_pub)]
 #[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
 pub use futures_macro::join_internal;
 
 #[allow(unreachable_pub)]
 #[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
 pub use futures_macro::try_join_internal;
 
 document_join_macro! {
diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs
index 37e938d..1d13067 100644
--- a/src/async_await/select_mod.rs
+++ b/src/async_await/select_mod.rs
@@ -29,9 +29,6 @@
         /// It is also gated behind the `async-await` feature of this library, which is
         /// activated by default.
         ///
-        /// Note that `select!` relies on `proc-macro-hack`, and may require to set the
-        /// compiler's recursion limit very high, e.g. `#![recursion_limit="1024"]`.
-        ///
         /// # Examples
         ///
         /// ```
@@ -309,12 +306,10 @@
 #[cfg(feature = "std")]
 #[allow(unreachable_pub)]
 #[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
 pub use futures_macro::select_internal;
 
 #[allow(unreachable_pub)]
 #[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
 pub use futures_macro::select_biased_internal;
 
 document_select_macro! {
diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs
index 7743406..1c8002f 100644
--- a/src/async_await/stream_select_mod.rs
+++ b/src/async_await/stream_select_mod.rs
@@ -3,7 +3,6 @@
 #[cfg(feature = "std")]
 #[allow(unreachable_pub)]
 #[doc(hidden)]
-#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
 pub use futures_macro::stream_select_internal;
 
 /// Combines several streams, all producing the same `Item` type, into one stream.
@@ -13,10 +12,6 @@
 ///
 /// If multiple streams are ready, one will be pseudo randomly selected at runtime.
 ///
-/// This macro is gated behind the `async-await` feature of this library, which is activated by default.
-/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion
-/// limit very high, e.g. `#![recursion_limit="1024"]`.
-///
 /// # Examples
 ///
 /// ```
diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs
index 17239a4..36de1da 100644
--- a/src/compat/compat01as03.rs
+++ b/src/compat/compat01as03.rs
@@ -64,6 +64,7 @@
     /// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
     /// # futures::executor::block_on(async {
     /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
     /// # // feature issues
@@ -90,6 +91,7 @@
     /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
     /// # futures::executor::block_on(async {
     /// use futures::stream::StreamExt;
     /// use futures_util::compat::Stream01CompatExt;
@@ -119,6 +121,7 @@
     /// [`Sink<T, Error = E>`](futures_sink::Sink).
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
     /// # futures::executor::block_on(async {
     /// use futures::{sink::SinkExt, stream::StreamExt};
     /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
@@ -351,8 +354,6 @@
 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
 mod io {
     use super::*;
-    #[cfg(feature = "read-initializer")]
-    use futures_io::Initializer;
     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
     use std::io::Error;
     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
@@ -364,6 +365,7 @@
         /// [`AsyncRead`](futures_io::AsyncRead).
         ///
         /// ```
+        /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
         /// # futures::executor::block_on(async {
         /// use futures::io::AsyncReadExt;
         /// use futures_util::compat::AsyncRead01CompatExt;
@@ -393,6 +395,7 @@
         /// [`AsyncWrite`](futures_io::AsyncWrite).
         ///
         /// ```
+        /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
         /// # futures::executor::block_on(async {
         /// use futures::io::AsyncWriteExt;
         /// use futures_util::compat::AsyncWrite01CompatExt;
@@ -416,16 +419,6 @@
     impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}
 
     impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
-        #[cfg(feature = "read-initializer")]
-        unsafe fn initializer(&self) -> Initializer {
-            // check if `prepare_uninitialized_buffer` needs zeroing
-            if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
-                Initializer::zeroing()
-            } else {
-                Initializer::nop()
-            }
-        }
-
         fn poll_read(
             mut self: Pin<&mut Self>,
             cx: &mut Context<'_>,
diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs
index 2573fe7..5d3a6e9 100644
--- a/src/compat/compat03as01.rs
+++ b/src/compat/compat03as01.rs
@@ -236,17 +236,7 @@
         }
     }
 
-    impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
-        #[cfg(feature = "read-initializer")]
-        unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
-            let initializer = self.inner.initializer();
-            let does_init = initializer.should_initialize();
-            if does_init {
-                initializer.initialize(buf);
-            }
-            does_init
-        }
-    }
+    impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {}
 
     impl<W: AsyncWrite03 + Unpin> std::io::Write for Compat<W> {
         fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
diff --git a/src/compat/executor.rs b/src/compat/executor.rs
index e25705b..ea0c67a 100644
--- a/src/compat/executor.rs
+++ b/src/compat/executor.rs
@@ -17,6 +17,7 @@
     /// futures 0.3 [`Spawn`](futures_task::Spawn).
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // Miri does not support epoll
     /// use futures::task::SpawnExt;
     /// use futures::future::{FutureExt, TryFutureExt};
     /// use futures_util::compat::Executor01CompatExt;
diff --git a/src/future/either.rs b/src/future/either.rs
index 35650da..9602de7 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -184,8 +184,6 @@
 
     use core::pin::Pin;
     use core::task::{Context, Poll};
-    #[cfg(feature = "read-initializer")]
-    use futures_io::Initializer;
     use futures_io::{
         AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
     };
@@ -195,14 +193,6 @@
         A: AsyncRead,
         B: AsyncRead,
     {
-        #[cfg(feature = "read-initializer")]
-        unsafe fn initializer(&self) -> Initializer {
-            match self {
-                Either::Left(x) => x.initializer(),
-                Either::Right(x) => x.initializer(),
-            }
-        }
-
         fn poll_read(
             self: Pin<&mut Self>,
             cx: &mut Context<'_>,
diff --git a/src/io/allow_std.rs b/src/io/allow_std.rs
index 1d13e0c..ec30ee3 100644
--- a/src/io/allow_std.rs
+++ b/src/io/allow_std.rs
@@ -1,6 +1,4 @@
 use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
 use std::pin::Pin;
 use std::{fmt, io};
@@ -121,10 +119,6 @@
     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
         self.0.read_vectored(bufs)
     }
-    #[cfg(feature = "read-initializer")]
-    unsafe fn initializer(&self) -> Initializer {
-        self.0.initializer()
-    }
     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
         self.0.read_to_end(buf)
     }
@@ -155,11 +149,6 @@
     ) -> Poll<io::Result<usize>> {
         Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
     }
-
-    #[cfg(feature = "read-initializer")]
-    unsafe fn initializer(&self) -> Initializer {
-        self.0.initializer()
-    }
 }
 
 impl<T> io::Seek for AllowStdIo<T>
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 2d585a9..0334a9f 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -2,8 +2,6 @@
 use futures_core::future::Future;
 use futures_core::ready;
 use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
 use pin_project_lite::pin_project;
 use std::io::{self, Read};
@@ -144,12 +142,6 @@
         self.consume(nread);
         Poll::Ready(Ok(nread))
     }
-
-    // we can't skip unconditionally because of the large buffer case in read.
-    #[cfg(feature = "read-initializer")]
-    unsafe fn initializer(&self) -> Initializer {
-        self.inner.initializer()
-    }
 }
 
 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs
index f292b87..cb74863 100644
--- a/src/io/buf_writer.rs
+++ b/src/io/buf_writer.rs
@@ -6,6 +6,7 @@
 use std::fmt;
 use std::io::{self, Write};
 use std::pin::Pin;
+use std::ptr;
 
 pin_project! {
     /// Wraps a writer and buffers its output.
@@ -49,7 +50,7 @@
         Self { inner, buf: Vec::with_capacity(cap), written: 0 }
     }
 
-    fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+    pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
         let mut this = self.project();
 
         let len = this.buf.len();
@@ -83,6 +84,68 @@
     pub fn buffer(&self) -> &[u8] {
         &self.buf
     }
+
+    /// Capacity of `buf`. how many chars can be held in buffer
+    pub(super) fn capacity(&self) -> usize {
+        self.buf.capacity()
+    }
+
+    /// Remaining number of bytes to reach `buf` 's capacity
+    #[inline]
+    pub(super) fn spare_capacity(&self) -> usize {
+        self.buf.capacity() - self.buf.len()
+    }
+
+    /// Write a byte slice directly into buffer
+    ///
+    /// Will truncate the number of bytes written to `spare_capacity()` so you want to
+    /// calculate the size of your slice to avoid losing bytes
+    ///
+    /// Based on `std::io::BufWriter`
+    pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize {
+        let available = self.spare_capacity();
+        let amt_to_buffer = available.min(buf.len());
+
+        // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
+        unsafe {
+            self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
+        }
+
+        amt_to_buffer
+    }
+
+    /// Write byte slice directly into `self.buf`
+    ///
+    /// Based on `std::io::BufWriter`
+    #[inline]
+    unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) {
+        debug_assert!(buf.len() <= self.spare_capacity());
+        let this = self.project();
+        let old_len = this.buf.len();
+        let buf_len = buf.len();
+        let src = buf.as_ptr();
+        let dst = this.buf.as_mut_ptr().add(old_len);
+        ptr::copy_nonoverlapping(src, dst, buf_len);
+        this.buf.set_len(old_len + buf_len);
+    }
+
+    /// Write directly using `inner`, bypassing buffering
+    pub(super) fn inner_poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        self.project().inner.poll_write(cx, buf)
+    }
+
+    /// Write directly using `inner`, bypassing buffering
+    pub(super) fn inner_poll_write_vectored(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        bufs: &[IoSlice<'_>],
+    ) -> Poll<io::Result<usize>> {
+        self.project().inner.poll_write_vectored(cx, bufs)
+    }
 }
 
 impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
diff --git a/src/io/chain.rs b/src/io/chain.rs
index a35c50d..728a3d2 100644
--- a/src/io/chain.rs
+++ b/src/io/chain.rs
@@ -1,7 +1,5 @@
 use futures_core::ready;
 use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
 use pin_project_lite::pin_project;
 use std::fmt;
@@ -111,16 +109,6 @@
         }
         this.second.poll_read_vectored(cx, bufs)
     }
-
-    #[cfg(feature = "read-initializer")]
-    unsafe fn initializer(&self) -> Initializer {
-        let initializer = self.first.initializer();
-        if initializer.should_initialize() {
-            initializer
-        } else {
-            self.second.initializer()
-        }
-    }
 }
 
 impl<T, U> AsyncBufRead for Chain<T, U>
diff --git a/src/io/empty.rs b/src/io/empty.rs
index ab2395a..02f6103 100644
--- a/src/io/empty.rs
+++ b/src/io/empty.rs
@@ -1,6 +1,4 @@
 use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead};
 use std::fmt;
 use std::io;
@@ -43,12 +41,6 @@
     ) -> Poll<io::Result<usize>> {
         Poll::Ready(Ok(0))
     }
-
-    #[cfg(feature = "read-initializer")]
-    #[inline]
-    unsafe fn initializer(&self) -> Initializer {
-        Initializer::nop()
-    }
 }
 
 impl AsyncBufRead for Empty {
diff --git a/src/io/line_writer.rs b/src/io/line_writer.rs
new file mode 100644
index 0000000..71cd668
--- /dev/null
+++ b/src/io/line_writer.rs
@@ -0,0 +1,155 @@
+use super::buf_writer::BufWriter;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_io::IoSlice;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+pin_project! {
+/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines
+///
+/// This was written based on `std::io::LineWriter` which goes into further details
+/// explaining the code.
+///
+/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter`
+/// to write on-each-line.
+#[derive(Debug)]
+pub struct LineWriter<W: AsyncWrite> {
+    #[pin]
+    buf_writer: BufWriter<W>,
+}
+}
+
+impl<W: AsyncWrite> LineWriter<W> {
+    /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB
+    /// which was taken from `std::io::LineWriter`
+    pub fn new(inner: W) -> LineWriter<W> {
+        LineWriter::with_capacity(1024, inner)
+    }
+
+    /// Creates a new `LineWriter` with the specified buffer capacity.
+    pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
+        LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) }
+    }
+
+    /// Flush `buf_writer` if last char is "new line"
+    fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        let this = self.project();
+        match this.buf_writer.buffer().last().copied() {
+            Some(b'\n') => this.buf_writer.flush_buf(cx),
+            _ => Poll::Ready(Ok(())),
+        }
+    }
+
+    /// Returns a reference to `buf_writer`'s internally buffered data.
+    pub fn buffer(&self) -> &[u8] {
+        self.buf_writer.buffer()
+    }
+
+    /// Acquires a reference to the underlying sink or stream that this combinator is
+    /// pulling from.
+    pub fn get_ref(&self) -> &W {
+        self.buf_writer.get_ref()
+    }
+}
+
+impl<W: AsyncWrite> AsyncWrite for LineWriter<W> {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        let mut this = self.as_mut().project();
+        let newline_index = match memchr::memrchr(b'\n', buf) {
+            None => {
+                ready!(self.as_mut().flush_if_completed_line(cx)?);
+                return self.project().buf_writer.poll_write(cx, buf);
+            }
+            Some(newline_index) => newline_index + 1,
+        };
+
+        ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+        let lines = &buf[..newline_index];
+
+        let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? };
+
+        if flushed == 0 {
+            return Poll::Ready(Ok(0));
+        }
+
+        let tail = if flushed >= newline_index {
+            &buf[flushed..]
+        } else if newline_index - flushed <= this.buf_writer.capacity() {
+            &buf[flushed..newline_index]
+        } else {
+            let scan_area = &buf[flushed..];
+            let scan_area = &scan_area[..this.buf_writer.capacity()];
+            match memchr::memrchr(b'\n', scan_area) {
+                Some(newline_index) => &scan_area[..newline_index + 1],
+                None => scan_area,
+            }
+        };
+
+        let buffered = this.buf_writer.as_mut().write_to_buf(tail);
+        Poll::Ready(Ok(flushed + buffered))
+    }
+
+    fn poll_write_vectored(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        bufs: &[IoSlice<'_>],
+    ) -> Poll<io::Result<usize>> {
+        let mut this = self.as_mut().project();
+        // `is_write_vectored()` is handled in original code, but not in this crate
+        // see https://github.com/rust-lang/rust/issues/70436
+
+        let last_newline_buf_idx = bufs
+            .iter()
+            .enumerate()
+            .rev()
+            .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
+        let last_newline_buf_idx = match last_newline_buf_idx {
+            None => {
+                ready!(self.as_mut().flush_if_completed_line(cx)?);
+                return self.project().buf_writer.poll_write_vectored(cx, bufs);
+            }
+            Some(i) => i,
+        };
+
+        ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+        let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
+
+        let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? };
+        if flushed == 0 {
+            return Poll::Ready(Ok(0));
+        }
+
+        let lines_len = lines.iter().map(|buf| buf.len()).sum();
+        if flushed < lines_len {
+            return Poll::Ready(Ok(flushed));
+        }
+
+        let buffered: usize = tail
+            .iter()
+            .filter(|buf| !buf.is_empty())
+            .map(|buf| this.buf_writer.as_mut().write_to_buf(buf))
+            .take_while(|&n| n > 0)
+            .sum();
+
+        Poll::Ready(Ok(flushed + buffered))
+    }
+
+    /// Forward to `buf_writer` 's `BufWriter::poll_flush()`
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        self.as_mut().project().buf_writer.poll_flush(cx)
+    }
+
+    /// Forward to `buf_writer` 's `BufWriter::poll_close()`
+    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        self.as_mut().project().buf_writer.poll_close(cx)
+    }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 16cf5a7..4dd2e02 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -26,10 +26,6 @@
 // Re-export some types from `std::io` so that users don't have to deal
 // with conflicts when `use`ing `futures::io` and `std::io`.
 #[doc(no_inline)]
-#[cfg(feature = "read-initializer")]
-#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
-pub use std::io::Initializer;
-#[doc(no_inline)]
 pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
 
 pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
@@ -40,15 +36,9 @@
 
 /// Initializes a buffer if necessary.
 ///
-/// A buffer is always initialized if `read-initializer` feature is disabled.
+/// A buffer is currently always initialized.
 #[inline]
 unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
-    #[cfg(feature = "read-initializer")]
-    {
-        if !_reader.initializer().should_initialize() {
-            return;
-        }
-    }
     ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
 }
 
@@ -61,6 +51,9 @@
 mod buf_writer;
 pub use self::buf_writer::BufWriter;
 
+mod line_writer;
+pub use self::line_writer::LineWriter;
+
 mod chain;
 pub use self::chain::Chain;
 
diff --git a/src/io/repeat.rs b/src/io/repeat.rs
index 4cefcb2..2828bf0 100644
--- a/src/io/repeat.rs
+++ b/src/io/repeat.rs
@@ -1,7 +1,5 @@
 use futures_core::ready;
 use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
 use futures_io::{AsyncRead, IoSliceMut};
 use std::fmt;
 use std::io;
@@ -59,12 +57,6 @@
         }
         Poll::Ready(Ok(nwritten))
     }
-
-    #[cfg(feature = "read-initializer")]
-    #[inline]
-    unsafe fn initializer(&self) -> Initializer {
-        Initializer::nop()
-    }
 }
 
 impl fmt::Debug for Repeat {
diff --git a/src/io/take.rs b/src/io/take.rs
index 0583020..2c49480 100644
--- a/src/io/take.rs
+++ b/src/io/take.rs
@@ -1,7 +1,5 @@
 use futures_core::ready;
 use futures_core::task::{Context, Poll};
-#[cfg(feature = "read-initializer")]
-use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead};
 use pin_project_lite::pin_project;
 use std::pin::Pin;
@@ -100,11 +98,6 @@
         *this.limit -= n as u64;
         Poll::Ready(Ok(n))
     }
-
-    #[cfg(feature = "read-initializer")]
-    unsafe fn initializer(&self) -> Initializer {
-        self.inner.initializer()
-    }
 }
 
 impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
diff --git a/src/lib.rs b/src/lib.rs
index 76d3799..9a10c93 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,7 +1,6 @@
 //! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
 //! and the `AsyncRead` and `AsyncWrite` traits.
 
-#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
 #![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
 #![cfg_attr(not(feature = "std"), no_std)]
 #![warn(
@@ -23,9 +22,6 @@
 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
 
-#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
-compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
 #[cfg(feature = "alloc")]
 extern crate alloc;
 
@@ -148,11 +144,6 @@
 #[cfg(feature = "std")]
 macro_rules! delegate_async_read {
     ($field:ident) => {
-        #[cfg(feature = "read-initializer")]
-        unsafe fn initializer(&self) -> $crate::io::Initializer {
-            self.$field.initializer()
-        }
-
         fn poll_read(
             self: core::pin::Pin<&mut Self>,
             cx: &mut core::task::Context<'_>,
diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs
index 4a05d88..fdbd53d 100644
--- a/src/stream/futures_unordered/mod.rs
+++ b/src/stream/futures_unordered/mod.rs
@@ -121,8 +121,9 @@
             next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
             queued: AtomicBool::new(true),
             ready_to_run_queue: Weak::new(),
+            woken: AtomicBool::new(false),
         });
-        let stub_ptr = &*stub as *const Task<Fut>;
+        let stub_ptr = Arc::as_ptr(&stub);
         let ready_to_run_queue = Arc::new(ReadyToRunQueue {
             waker: AtomicWaker::new(),
             head: AtomicPtr::new(stub_ptr as *mut _),
@@ -167,6 +168,7 @@
             next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
             queued: AtomicBool::new(true),
             ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
+            woken: AtomicBool::new(false),
         });
 
         // Reset the `is_terminated` flag if we've previously marked ourselves
@@ -375,7 +377,7 @@
         // The `ReadyToRunQueue` stub is never inserted into the `head_all`
         // list, and its pointer value will remain valid for the lifetime of
         // this `FuturesUnordered`, so we can make use of its value here.
-        &*self.ready_to_run_queue.stub as *const _ as *mut _
+        Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _
     }
 }
 
@@ -383,25 +385,12 @@
     type Item = Fut::Output;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        // Variable to determine how many times it is allowed to poll underlying
-        // futures without yielding.
-        //
-        // A single call to `poll_next` may potentially do a lot of work before
-        // yielding. This happens in particular if the underlying futures are awoken
-        // frequently but continue to return `Pending`. This is problematic if other
-        // tasks are waiting on the executor, since they do not get to run. This value
-        // caps the number of calls to `poll` on underlying futures a single call to
-        // `poll_next` is allowed to make.
-        //
-        // The value is the length of FuturesUnordered. This ensures that each
-        // future is polled only once at most per iteration.
-        //
-        // See also https://github.com/rust-lang/futures-rs/issues/2047.
-        let yield_every = self.len();
+        let len = self.len();
 
         // Keep track of how many child futures we have polled,
         // in case we want to forcibly yield.
         let mut polled = 0;
+        let mut yielded = 0;
 
         // Ensure `parent` is correctly set.
         self.ready_to_run_queue.waker.register(cx.waker());
@@ -512,7 +501,11 @@
             // the internal allocation, appropriately accessing fields and
             // deallocating the task if need be.
             let res = {
-                let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
+                let task = bomb.task.as_ref().unwrap();
+                // We are only interested in whether the future is awoken before it
+                // finishes polling, so reset the flag here.
+                task.woken.store(false, Relaxed);
+                let waker = Task::waker_ref(task);
                 let mut cx = Context::from_waker(&waker);
 
                 // Safety: We won't move the future ever again
@@ -525,12 +518,17 @@
             match res {
                 Poll::Pending => {
                     let task = bomb.task.take().unwrap();
+                    // If the future was awoken during polling, we assume
+                    // the future wanted to explicitly yield.
+                    yielded += task.woken.load(Relaxed) as usize;
                     bomb.queue.link(task);
 
-                    if polled == yield_every {
-                        // We have polled a large number of futures in a row without yielding.
-                        // To ensure we do not starve other tasks waiting on the executor,
-                        // we yield here, but immediately wake ourselves up to continue.
+                    // If a future yields, we respect it and yield here.
+                    // If all futures have been polled, we also yield here to
+                    // avoid starving other tasks waiting on the executor.
+                    // (polling the same future twice per iteration may cause
+                    // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
+                    if yielded >= 2 || polled == len {
                         cx.waker().wake_by_ref();
                         return Poll::Pending;
                     }
diff --git a/src/stream/futures_unordered/ready_to_run_queue.rs b/src/stream/futures_unordered/ready_to_run_queue.rs
index 5ef6cde..4518705 100644
--- a/src/stream/futures_unordered/ready_to_run_queue.rs
+++ b/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -83,7 +83,7 @@
     }
 
     pub(super) fn stub(&self) -> *const Task<Fut> {
-        &*self.stub
+        Arc::as_ptr(&self.stub)
     }
 
     // Clear the queue of tasks.
diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs
index da2cd67..ec2114e 100644
--- a/src/stream/futures_unordered/task.rs
+++ b/src/stream/futures_unordered/task.rs
@@ -1,6 +1,6 @@
 use alloc::sync::{Arc, Weak};
 use core::cell::UnsafeCell;
-use core::sync::atomic::Ordering::{self, SeqCst};
+use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
 use core::sync::atomic::{AtomicBool, AtomicPtr};
 
 use super::abort::abort;
@@ -31,6 +31,11 @@
 
     // Whether or not this task is currently in the ready to run queue
     pub(super) queued: AtomicBool,
+
+    // Whether the future was awoken during polling
+    // It is possible for this flag to be set to true after the polling,
+    // but it will be ignored.
+    pub(super) woken: AtomicBool,
 }
 
 // `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@
             None => return,
         };
 
+        arc_self.woken.store(true, Relaxed);
+
         // It's our job to enqueue this task it into the ready to run queue. To
         // do this we set the `queued` flag, and if successful we then do the
         // actual queueing operation, ensuring that we're only queued once.
@@ -62,7 +69,7 @@
         // still.
         let prev = arc_self.queued.swap(true, SeqCst);
         if !prev {
-            inner.enqueue(&**arc_self);
+            inner.enqueue(Arc::as_ptr(arc_self));
             inner.waker.wake();
         }
     }
diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs
new file mode 100644
index 0000000..513cab7
--- /dev/null
+++ b/src/stream/stream/count.rs
@@ -0,0 +1,53 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::{FusedFuture, Future};
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future for the [`count`](super::StreamExt::count) method.
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct Count<St> {
+        #[pin]
+        stream: St,
+        count: usize
+    }
+}
+
+impl<St> fmt::Debug for Count<St>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Count").field("stream", &self.stream).field("count", &self.count).finish()
+    }
+}
+
+impl<St: Stream> Count<St> {
+    pub(super) fn new(stream: St) -> Self {
+        Self { stream, count: 0 }
+    }
+}
+
+impl<St: FusedStream> FusedFuture for Count<St> {
+    fn is_terminated(&self) -> bool {
+        self.stream.is_terminated()
+    }
+}
+
+impl<St: Stream> Future for Count<St> {
+    type Output = usize;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let mut this = self.project();
+
+        Poll::Ready(loop {
+            match ready!(this.stream.as_mut().poll_next(cx)) {
+                Some(_) => *this.count += 1,
+                None => break *this.count,
+            }
+        })
+    }
+}
diff --git a/src/stream/stream/flatten_unordered.rs b/src/stream/stream/flatten_unordered.rs
new file mode 100644
index 0000000..07f971c
--- /dev/null
+++ b/src/stream/stream/flatten_unordered.rs
@@ -0,0 +1,509 @@
+use alloc::sync::Arc;
+use core::{
+    cell::UnsafeCell,
+    convert::identity,
+    fmt,
+    num::NonZeroUsize,
+    pin::Pin,
+    sync::atomic::{AtomicU8, Ordering},
+};
+
+use pin_project_lite::pin_project;
+
+use futures_core::{
+    future::Future,
+    ready,
+    stream::{FusedStream, Stream},
+    task::{Context, Poll, Waker},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use futures_task::{waker, ArcWake};
+
+use crate::stream::FuturesUnordered;
+
+/// There is nothing to poll and stream isn't being
+/// polled or waking at the moment.
+const NONE: u8 = 0;
+
+/// Inner streams need to be polled.
+const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
+
+/// The base stream needs to be polled.
+const NEED_TO_POLL_STREAM: u8 = 0b10;
+
+/// It needs to poll base stream and inner streams.
+const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
+
+/// The current stream is being polled at the moment.
+const POLLING: u8 = 0b100;
+
+/// Inner streams are being woken at the moment.
+const WAKING_INNER_STREAMS: u8 = 0b1000;
+
+/// The base stream is being woken at the moment.
+const WAKING_STREAM: u8 = 0b10000;
+
+/// The base stream and inner streams are being woken at the moment.
+const WAKING_ALL: u8 = WAKING_STREAM | WAKING_INNER_STREAMS;
+
+/// The stream was waked and will be polled.
+const WOKEN: u8 = 0b100000;
+
+/// Determines what needs to be polled, and is stream being polled at the
+/// moment or not.
+#[derive(Clone, Debug)]
+struct SharedPollState {
+    state: Arc<AtomicU8>,
+}
+
+impl SharedPollState {
+    /// Constructs new `SharedPollState` with the given state.
+    fn new(value: u8) -> SharedPollState {
+        SharedPollState { state: Arc::new(AtomicU8::new(value)) }
+    }
+
+    /// Attempts to start polling, returning stored state in case of success.
+    /// Returns `None` if some waker is waking at the moment.
+    fn start_polling(
+        &self,
+    ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+        let value = self
+            .state
+            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+                if value & WAKING_ALL == NONE {
+                    Some(POLLING)
+                } else {
+                    None
+                }
+            })
+            .ok()?;
+        let bomb = PollStateBomb::new(self, SharedPollState::reset);
+
+        Some((value, bomb))
+    }
+
+    /// Starts the waking process and performs bitwise or with the given value.
+    fn start_waking(
+        &self,
+        to_poll: u8,
+        waking: u8,
+    ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+        let value = self
+            .state
+            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+                // Waking process for this waker already started
+                if value & waking != NONE {
+                    return None;
+                }
+                let mut next_value = value | to_poll;
+                // Only start the waking process if we're not in the polling phase and the stream isn't woken already
+                if value & (WOKEN | POLLING) == NONE {
+                    next_value |= waking;
+                }
+
+                if next_value != value {
+                    Some(next_value)
+                } else {
+                    None
+                }
+            })
+            .ok()?;
+
+        if value & (WOKEN | POLLING) == NONE {
+            let bomb = PollStateBomb::new(self, move |state| state.stop_waking(waking));
+
+            Some((value, bomb))
+        } else {
+            None
+        }
+    }
+
+    /// Sets current state to
+    /// - `!POLLING` allowing to use wakers
+    /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
+    ///   or `will_be_woken` flag supplied
+    /// - `!WAKING_ALL` as
+    ///   * Wakers called during the `POLLING` phase won't propagate their calls
+    ///   * `POLLING` phase can't start if some of the wakers are active
+    ///   So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
+    fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 {
+        self.state
+            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
+                let mut next_value = to_poll;
+
+                value &= NEED_TO_POLL_ALL;
+                if value != NONE || will_be_woken {
+                    next_value |= WOKEN;
+                }
+                next_value |= value;
+
+                Some(next_value & !POLLING & !WAKING_ALL)
+            })
+            .unwrap()
+    }
+
+    /// Toggles state to non-waking, allowing to start polling.
+    fn stop_waking(&self, waking: u8) -> u8 {
+        self.state
+            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+                let mut next_value = value & !waking;
+                // Waker will be called only if the current waking state is the same as the specified waker state
+                if value & WAKING_ALL == waking {
+                    next_value |= WOKEN;
+                }
+
+                if next_value != value {
+                    Some(next_value)
+                } else {
+                    None
+                }
+            })
+            .unwrap_or_else(identity)
+    }
+
+    /// Resets current state allowing to poll the stream and wake up wakers.
+    fn reset(&self) -> u8 {
+        self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
+    }
+}
+
+/// Used to execute some function on the given state when dropped.
+struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> {
+    state: &'a SharedPollState,
+    drop: Option<F>,
+}
+
+impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
+    /// Constructs new bomb with the given state.
+    fn new(state: &'a SharedPollState, drop: F) -> Self {
+        Self { state, drop: Some(drop) }
+    }
+
+    /// Deactivates bomb, forces it to not call provided function when dropped.
+    fn deactivate(mut self) {
+        self.drop.take();
+    }
+
+    /// Manually fires the bomb, returning supplied state.
+    fn fire(mut self) -> Option<u8> {
+        self.drop.take().map(|drop| (drop)(self.state))
+    }
+}
+
+impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
+    fn drop(&mut self) {
+        if let Some(drop) = self.drop.take() {
+            (drop)(self.state);
+        }
+    }
+}
+
+/// Will update state with the provided value on `wake_by_ref` call
+/// and then, if there is a need, call `inner_waker`.
+struct InnerWaker {
+    inner_waker: UnsafeCell<Option<Waker>>,
+    poll_state: SharedPollState,
+    need_to_poll: u8,
+}
+
+unsafe impl Send for InnerWaker {}
+unsafe impl Sync for InnerWaker {}
+
+impl InnerWaker {
+    /// Replaces given waker's inner_waker for polling stream/futures which will
+    /// update poll state on `wake_by_ref` call. Use only if you need several
+    /// contexts.
+    ///
+    /// ## Safety
+    ///
+    /// This function will modify waker's `inner_waker` via `UnsafeCell`, so
+    /// it should be used only during `POLLING` phase.
+    unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) -> Waker {
+        *self_arc.inner_waker.get() = cx.waker().clone().into();
+        waker(self_arc.clone())
+    }
+
+    /// Attempts to start the waking process for the waker with the given value.
+    /// If succeeded, then the stream isn't yet woken and not being polled at the moment.
+    fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+        self.poll_state.start_waking(self.need_to_poll, self.waking_state())
+    }
+
+    /// Returns the corresponding waking state toggled by this waker.
+    fn waking_state(&self) -> u8 {
+        self.need_to_poll << 3
+    }
+}
+
+impl ArcWake for InnerWaker {
+    fn wake_by_ref(self_arc: &Arc<Self>) {
+        if let Some((_, state_bomb)) = self_arc.start_waking() {
+            // Safety: now state is not `POLLING`
+            let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
+
+            if let Some(inner_waker) = waker_opt.clone() {
+                // Stop waking to allow polling stream
+                let poll_state_value = state_bomb.fire().unwrap();
+
+                // Here we want to call waker only if stream isn't woken yet and
+                // also to optimize the case when two wakers are called at the same time.
+                //
+                // In this case the best strategy will be to propagate only the latest waker's awake,
+                // and then poll both entities in a single `poll_next` call
+                if poll_state_value & (WOKEN | WAKING_ALL) == self_arc.waking_state() {
+                    // Wake up inner waker
+                    inner_waker.wake();
+                }
+            }
+        }
+    }
+}
+
+pin_project! {
+    /// Future which contains optional stream.
+    ///
+    /// If it's `Some`, it will attempt to call `poll_next` on it,
+    /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
+    /// or `None` in case of `Poll::Ready(None)`.
+    ///
+    /// If `poll_next` will return `Poll::Pending`, it will be forwarded to
+    /// the future and current task will be notified by waker.
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    struct PollStreamFut<St> {
+        #[pin]
+        stream: Option<St>,
+    }
+}
+
+impl<St> PollStreamFut<St> {
+    /// Constructs new `PollStreamFut` using given `stream`.
+    fn new(stream: impl Into<Option<St>>) -> Self {
+        Self { stream: stream.into() }
+    }
+}
+
+impl<St: Stream + Unpin> Future for PollStreamFut<St> {
+    type Output = Option<(St::Item, PollStreamFut<St>)>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let mut stream = self.project().stream;
+
+        let item = if let Some(stream) = stream.as_mut().as_pin_mut() {
+            ready!(stream.poll_next(cx))
+        } else {
+            None
+        };
+        let next_item_fut = PollStreamFut::new(stream.get_mut().take());
+        let out = item.map(|item| (item, next_item_fut));
+
+        Poll::Ready(out)
+    }
+}
+
+pin_project! {
+    /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+    /// method.
+    #[project = FlattenUnorderedProj]
+    #[must_use = "streams do nothing unless polled"]
+    pub struct FlattenUnordered<St> where St: Stream {
+        #[pin]
+        inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
+        #[pin]
+        stream: St,
+        poll_state: SharedPollState,
+        limit: Option<NonZeroUsize>,
+        is_stream_done: bool,
+        inner_streams_waker: Arc<InnerWaker>,
+        stream_waker: Arc<InnerWaker>,
+    }
+}
+
+impl<St> fmt::Debug for FlattenUnordered<St>
+where
+    St: Stream + fmt::Debug,
+    St::Item: Stream + fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("FlattenUnordered")
+            .field("poll_state", &self.poll_state)
+            .field("inner_streams", &self.inner_streams)
+            .field("limit", &self.limit)
+            .field("stream", &self.stream)
+            .field("is_stream_done", &self.is_stream_done)
+            .finish()
+    }
+}
+
+impl<St> FlattenUnordered<St>
+where
+    St: Stream,
+    St::Item: Stream + Unpin,
+{
+    pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
+        let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
+
+        FlattenUnordered {
+            inner_streams: FuturesUnordered::new(),
+            stream,
+            is_stream_done: false,
+            limit: limit.and_then(NonZeroUsize::new),
+            inner_streams_waker: Arc::new(InnerWaker {
+                inner_waker: UnsafeCell::new(None),
+                poll_state: poll_state.clone(),
+                need_to_poll: NEED_TO_POLL_INNER_STREAMS,
+            }),
+            stream_waker: Arc::new(InnerWaker {
+                inner_waker: UnsafeCell::new(None),
+                poll_state: poll_state.clone(),
+                need_to_poll: NEED_TO_POLL_STREAM,
+            }),
+            poll_state,
+        }
+    }
+
+    delegate_access_inner!(stream, St, ());
+}
+
+impl<St> FlattenUnorderedProj<'_, St>
+where
+    St: Stream,
+{
+    /// Checks if current `inner_streams` size is less than optional limit.
+    fn is_exceeded_limit(&self) -> bool {
+        self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
+    }
+}
+
+impl<St> FusedStream for FlattenUnordered<St>
+where
+    St: FusedStream,
+    St::Item: FusedStream + Unpin,
+{
+    fn is_terminated(&self) -> bool {
+        self.stream.is_terminated() && self.inner_streams.is_empty()
+    }
+}
+
+impl<St> Stream for FlattenUnordered<St>
+where
+    St: Stream,
+    St::Item: Stream + Unpin,
+{
+    type Item = <St::Item as Stream>::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut next_item = None;
+        let mut need_to_poll_next = NONE;
+
+        let mut this = self.as_mut().project();
+
+        let (mut poll_state_value, state_bomb) = match this.poll_state.start_polling() {
+            Some(value) => value,
+            _ => {
+                // Waker was called, just wait for the next poll
+                return Poll::Pending;
+            }
+        };
+
+        if poll_state_value & NEED_TO_POLL_STREAM != NONE {
+            // Safety: now state is `POLLING`.
+            let stream_waker = unsafe { InnerWaker::replace_waker(this.stream_waker, cx) };
+
+            // Here we need to poll the base stream.
+            //
+            // To improve performance, we will attempt to place as many items as we can
+            // to the `FuturesUnordered` bucket before polling inner streams
+            loop {
+                if this.is_exceeded_limit() || *this.is_stream_done {
+                    // We either exceeded the limit or the stream is exhausted
+                    if !*this.is_stream_done {
+                        // The stream needs to be polled in the next iteration
+                        need_to_poll_next |= NEED_TO_POLL_STREAM;
+                    }
+
+                    break;
+                } else {
+                    match this.stream.as_mut().poll_next(&mut Context::from_waker(&stream_waker)) {
+                        Poll::Ready(Some(inner_stream)) => {
+                            // Add new stream to the inner streams bucket
+                            this.inner_streams.as_mut().push(PollStreamFut::new(inner_stream));
+                            // Inner streams must be polled afterward
+                            poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
+                        }
+                        Poll::Ready(None) => {
+                            // Mark the stream as done
+                            *this.is_stream_done = true;
+                        }
+                        Poll::Pending => {
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
+            // Safety: now state is `POLLING`.
+            let inner_streams_waker =
+                unsafe { InnerWaker::replace_waker(this.inner_streams_waker, cx) };
+
+            match this
+                .inner_streams
+                .as_mut()
+                .poll_next(&mut Context::from_waker(&inner_streams_waker))
+            {
+                Poll::Ready(Some(Some((item, next_item_fut)))) => {
+                    // Push next inner stream item future to the list of inner streams futures
+                    this.inner_streams.as_mut().push(next_item_fut);
+                    // Take the received item
+                    next_item = Some(item);
+                    // On the next iteration, inner streams must be polled again
+                    need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+                }
+                Poll::Ready(Some(None)) => {
+                    // On the next iteration, inner streams must be polled again
+                    need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+                }
+                _ => {}
+            }
+        }
+
+        // We didn't have any `poll_next` panic, so it's time to deactivate the bomb
+        state_bomb.deactivate();
+
+        let mut force_wake =
+            // we need to poll the stream and didn't reach the limit yet
+            need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
+            // or we need to poll inner streams again
+            || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;
+
+        // Stop polling and swap the latest state
+        poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
+        // If state was changed during `POLLING` phase, need to manually call a waker
+        force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;
+
+        let is_done = *this.is_stream_done && this.inner_streams.is_empty();
+
+        if next_item.is_some() || is_done {
+            Poll::Ready(next_item)
+        } else {
+            if force_wake {
+                cx.waker().wake_by_ref();
+            }
+
+            Poll::Pending
+        }
+    }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for FlattenUnordered<St>
+where
+    St: Stream + Sink<Item>,
+{
+    type Error = St::Error;
+
+    delegate_sink!(stream, Item);
+}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 86997f4..642b91e 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -40,6 +40,10 @@
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use self::concat::Concat;
 
+mod count;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::count::Count;
+
 mod cycle;
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use self::cycle::Cycle;
@@ -195,6 +199,25 @@
 
 #[cfg(not(futures_no_atomic_cas))]
 #[cfg(feature = "alloc")]
+mod flatten_unordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)]
+pub use self::flatten_unordered::FlattenUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+delegate_all!(
+    /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
+    FlatMapUnordered<St, U, F>(
+        FlattenUnordered<Map<St, F>>
+    ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
+    where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
+);
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
 mod for_each_concurrent;
 #[cfg(not(futures_no_atomic_cas))]
 #[cfg(feature = "alloc")]
@@ -386,9 +409,9 @@
     /// use futures::stream::{self, StreamExt};
     ///
     /// let stream = stream::iter(1..=10);
-    /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
+    /// let events = stream.filter(|x| future::ready(x % 2 == 0));
     ///
-    /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
+    /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
     /// # });
     /// ```
     fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
@@ -418,11 +441,11 @@
     /// use futures::stream::{self, StreamExt};
     ///
     /// let stream = stream::iter(1..=10);
-    /// let evens = stream.filter_map(|x| async move {
+    /// let events = stream.filter_map(|x| async move {
     ///     if x % 2 == 0 { Some(x + 1) } else { None }
     /// });
     ///
-    /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
+    /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
     /// # });
     /// ```
     fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
@@ -576,6 +599,38 @@
         assert_future::<Self::Item, _>(Concat::new(self))
     }
 
+    /// Drives the stream to completion, counting the number of items.
+    ///
+    /// # Overflow Behavior
+    ///
+    /// The method does no guarding against overflows, so counting elements of a
+    /// stream with more than [`usize::MAX`] elements either produces the wrong
+    /// result or panics. If debug assertions are enabled, a panic is guaranteed.
+    ///
+    /// # Panics
+    ///
+    /// This function might panic if the iterator has more than [`usize::MAX`]
+    /// elements.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # futures::executor::block_on(async {
+    /// use futures::stream::{self, StreamExt};
+    ///
+    /// let stream = stream::iter(1..=10);
+    /// let count = stream.count().await;
+    ///
+    /// assert_eq!(count, 10);
+    /// # });
+    /// ```
+    fn count(self) -> Count<Self>
+    where
+        Self: Sized,
+    {
+        assert_future::<usize, _>(Count::new(self))
+    }
+
     /// Repeats a stream endlessly.
     ///
     /// The stream never terminates. Note that you likely want to avoid
@@ -718,13 +773,57 @@
         assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
     }
 
+    /// Flattens a stream of streams into just one continuous stream. Polls
+    /// inner streams concurrently.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # futures::executor::block_on(async {
+    /// use futures::channel::mpsc;
+    /// use futures::stream::StreamExt;
+    /// use std::thread;
+    ///
+    /// let (tx1, rx1) = mpsc::unbounded();
+    /// let (tx2, rx2) = mpsc::unbounded();
+    /// let (tx3, rx3) = mpsc::unbounded();
+    ///
+    /// thread::spawn(move || {
+    ///     tx1.unbounded_send(1).unwrap();
+    ///     tx1.unbounded_send(2).unwrap();
+    /// });
+    /// thread::spawn(move || {
+    ///     tx2.unbounded_send(3).unwrap();
+    ///     tx2.unbounded_send(4).unwrap();
+    /// });
+    /// thread::spawn(move || {
+    ///     tx3.unbounded_send(rx1).unwrap();
+    ///     tx3.unbounded_send(rx2).unwrap();
+    /// });
+    ///
+    /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
+    /// output.sort();
+    ///
+    /// assert_eq!(output, vec![1, 2, 3, 4]);
+    /// # });
+    /// ```
+    #[cfg(not(futures_no_atomic_cas))]
+    #[cfg(feature = "alloc")]
+    fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
+    where
+        Self::Item: Stream + Unpin,
+        Self: Sized,
+    {
+        FlattenUnordered::new(self, limit.into())
+    }
+
     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
     ///
     /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
     /// you would have to chain combinators like `.map(f).flatten()` while this
     /// combinator provides ability to write `.flat_map(f)` instead of chaining.
     ///
-    /// The provided closure which produce inner streams is executed over all elements
+    /// The provided closure which produces inner streams is executed over all elements
     /// of stream as last inner stream is terminated and next stream item is available.
     ///
     /// Note that this function consumes the stream passed into it and returns a
@@ -752,6 +851,59 @@
         assert_stream::<U::Item, _>(FlatMap::new(self, f))
     }
 
+    /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
+    /// and polls them concurrently, yielding items in any order, as they made
+    /// available.
+    ///
+    /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
+    /// instead, and you need to poll all of them concurrently, you would
+    /// have to use something like `for_each_concurrent` and merge values
+    /// by hand. This combinator provides ability to collect all values
+    /// from concurrently polled streams into one stream.
+    ///
+    /// The first argument is an optional limit on the number of concurrently
+    /// polled streams. If this limit is not `None`, no more than `limit` streams
+    /// will be polled concurrently. The `limit` argument is of type
+    /// `Into<Option<usize>>`, and so can be provided as either `None`,
+    /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+    /// no limit at all, and will have the same result as passing in `None`.
+    ///
+    /// The provided closure which produces inner streams is executed over
+    /// all elements of stream as next stream item is available and limit
+    /// of concurrently processed streams isn't exceeded.
+    ///
+    /// Note that this function consumes the stream passed into it and
+    /// returns a wrapped version of it.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # futures::executor::block_on(async {
+    /// use futures::stream::{self, StreamExt};
+    ///
+    /// let stream = stream::iter(1..5);
+    /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
+    /// let mut values = stream.collect::<Vec<_>>().await;
+    /// values.sort();
+    ///
+    /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
+    /// # });
+    /// ```
+    #[cfg(not(futures_no_atomic_cas))]
+    #[cfg(feature = "alloc")]
+    fn flat_map_unordered<U, F>(
+        self,
+        limit: impl Into<Option<usize>>,
+        f: F,
+    ) -> FlatMapUnordered<Self, U, F>
+    where
+        U: Stream + Unpin,
+        F: FnMut(Self::Item) -> U,
+        Self: Sized,
+    {
+        FlatMapUnordered::new(self, limit.into(), f)
+    }
+
     /// Combinator similar to [`StreamExt::fold`] that holds internal state
     /// and produces a new stream.
     ///
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index 8724145..f5cfde9 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -118,11 +118,11 @@
 
 // Forwarding impl of Sink from the underlying stream
 #[cfg(feature = "sink")]
-impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F>
+impl<St, S, Fut, F, Item> Sink<Item> for Scan<St, S, Fut, F>
 where
-    S: Stream + Sink<Item>,
+    St: Stream + Sink<Item>,
 {
-    type Error = S::Error;
+    type Error = St::Error;
 
     delegate_sink!(stream, Item);
 }
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 455ddca..6bf2cb7 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -736,17 +736,21 @@
     /// thread::spawn(move || {
     ///     tx2.unbounded_send(Ok(2)).unwrap();
     ///     tx2.unbounded_send(Err(3)).unwrap();
+    ///     tx2.unbounded_send(Ok(4)).unwrap();
     /// });
     /// thread::spawn(move || {
     ///     tx3.unbounded_send(Ok(rx1)).unwrap();
     ///     tx3.unbounded_send(Ok(rx2)).unwrap();
-    ///     tx3.unbounded_send(Err(4)).unwrap();
+    ///     tx3.unbounded_send(Err(5)).unwrap();
     /// });
     ///
     /// let mut stream = rx3.try_flatten();
     /// assert_eq!(stream.next().await, Some(Ok(1)));
     /// assert_eq!(stream.next().await, Some(Ok(2)));
     /// assert_eq!(stream.next().await, Some(Err(3)));
+    /// assert_eq!(stream.next().await, Some(Ok(4)));
+    /// assert_eq!(stream.next().await, Some(Err(5)));
+    /// assert_eq!(stream.next().await, None);
     /// # });
     /// ```
     fn try_flatten(self) -> TryFlatten<Self>
@@ -1001,6 +1005,7 @@
     /// Wraps a [`TryStream`] into a stream compatible with libraries using
     /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
     /// ```
+    /// # if cfg!(miri) { return; } // Miri does not support epoll
     /// use futures::future::{FutureExt, TryFutureExt};
     /// # let (tx, rx) = futures::channel::oneshot::channel();
     ///
diff --git a/src/task/mod.rs b/src/task/mod.rs
index eff6d48..0a31eea 100644
--- a/src/task/mod.rs
+++ b/src/task/mod.rs
@@ -16,7 +16,6 @@
 pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj};
 
 pub use futures_task::noop_waker;
-#[cfg(feature = "std")]
 pub use futures_task::noop_waker_ref;
 
 #[cfg(not(futures_no_atomic_cas))]
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index f877923..87ca360 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -34,6 +34,7 @@
     /// today. Feel free to use this method in the meantime.
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
     /// use futures::executor::ThreadPool;
     /// use futures::task::SpawnExt;
     ///
@@ -58,6 +59,7 @@
     /// resolves to the output of the spawned future.
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
     /// use futures::executor::{block_on, ThreadPool};
     /// use futures::future;
     /// use futures::task::SpawnExt;
@@ -136,6 +138,7 @@
     /// resolves to the output of the spawned future.
     ///
     /// ```
+    /// # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
     /// use futures::executor::LocalPool;
     /// use futures::task::LocalSpawnExt;
     ///