1#[cfg(feature = "alloc")]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(feature = "alloc")]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54    BlockOn(stream)
55}
56
57#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63    type Item = S::Item;
64
65    fn next(&mut self) -> Option<Self::Item> {
66        crate::future::block_on(self.0.next())
67    }
68
69    fn size_hint(&self) -> (usize, Option<usize>) {
70        self.0.size_hint()
71    }
72
73    fn count(self) -> usize {
74        crate::future::block_on(self.0.count())
75    }
76
77    fn last(self) -> Option<Self::Item> {
78        crate::future::block_on(self.0.last())
79    }
80
81    fn nth(&mut self, n: usize) -> Option<Self::Item> {
82        crate::future::block_on(self.0.nth(n))
83    }
84
85    fn fold<B, F>(self, init: B, f: F) -> B
86    where
87        F: FnMut(B, Self::Item) -> B,
88    {
89        crate::future::block_on(self.0.fold(init, f))
90    }
91
92    fn for_each<F>(self, f: F) -> F::Output
93    where
94        F: FnMut(Self::Item),
95    {
96        crate::future::block_on(self.0.for_each(f))
97    }
98
99    fn all<F>(&mut self, f: F) -> bool
100    where
101        F: FnMut(Self::Item) -> bool,
102    {
103        crate::future::block_on(self.0.all(f))
104    }
105
106    fn any<F>(&mut self, f: F) -> bool
107    where
108        F: FnMut(Self::Item) -> bool,
109    {
110        crate::future::block_on(self.0.any(f))
111    }
112
113    fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
114    where
115        P: FnMut(&Self::Item) -> bool,
116    {
117        crate::future::block_on(self.0.find(predicate))
118    }
119
120    fn find_map<B, F>(&mut self, f: F) -> Option<B>
121    where
122        F: FnMut(Self::Item) -> Option<B>,
123    {
124        crate::future::block_on(self.0.find_map(f))
125    }
126
127    fn position<P>(&mut self, predicate: P) -> Option<usize>
128    where
129        P: FnMut(Self::Item) -> bool,
130    {
131        crate::future::block_on(self.0.position(predicate))
132    }
133}
134
135pub fn empty<T>() -> Empty<T> {
148    Empty {
149        _marker: PhantomData,
150    }
151}
152
153#[derive(Clone, Debug)]
155#[must_use = "streams do nothing unless polled"]
156pub struct Empty<T> {
157    _marker: PhantomData<T>,
158}
159
160impl<T> Unpin for Empty<T> {}
161
162impl<T> Stream for Empty<T> {
163    type Item = T;
164
165    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166        Poll::Ready(None)
167    }
168
169    fn size_hint(&self) -> (usize, Option<usize>) {
170        (0, Some(0))
171    }
172}
173
174pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
190    Iter {
191        iter: iter.into_iter(),
192    }
193}
194
195#[derive(Clone, Debug)]
197#[must_use = "streams do nothing unless polled"]
198pub struct Iter<I> {
199    iter: I,
200}
201
202impl<I> Unpin for Iter<I> {}
203
204impl<I: Iterator> Stream for Iter<I> {
205    type Item = I::Item;
206
207    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
208        Poll::Ready(self.iter.next())
209    }
210
211    fn size_hint(&self) -> (usize, Option<usize>) {
212        self.iter.size_hint()
213    }
214}
215
216pub fn once<T>(t: T) -> Once<T> {
231    Once { value: Some(t) }
232}
233
234pin_project! {
235    #[derive(Clone, Debug)]
237    #[must_use = "streams do nothing unless polled"]
238    pub struct Once<T> {
239        value: Option<T>,
240    }
241}
242
243impl<T> Stream for Once<T> {
244    type Item = T;
245
246    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
247        Poll::Ready(self.project().value.take())
248    }
249
250    fn size_hint(&self) -> (usize, Option<usize>) {
251        if self.value.is_some() {
252            (1, Some(1))
253        } else {
254            (0, Some(0))
255        }
256    }
257}
258
259pub fn pending<T>() -> Pending<T> {
273    Pending {
274        _marker: PhantomData,
275    }
276}
277
278#[derive(Clone, Debug)]
280#[must_use = "streams do nothing unless polled"]
281pub struct Pending<T> {
282    _marker: PhantomData<T>,
283}
284
285impl<T> Unpin for Pending<T> {}
286
287impl<T> Stream for Pending<T> {
288    type Item = T;
289
290    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
291        Poll::Pending
292    }
293
294    fn size_hint(&self) -> (usize, Option<usize>) {
295        (0, Some(0))
296    }
297}
298
299pub fn poll_fn<T, F>(f: F) -> PollFn<F>
316where
317    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
318{
319    PollFn { f }
320}
321
322#[derive(Clone)]
324#[must_use = "streams do nothing unless polled"]
325pub struct PollFn<F> {
326    f: F,
327}
328
329impl<F> Unpin for PollFn<F> {}
330
331impl<F> fmt::Debug for PollFn<F> {
332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333        f.debug_struct("PollFn").finish()
334    }
335}
336
337impl<T, F> Stream for PollFn<F>
338where
339    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
340{
341    type Item = T;
342
343    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
344        (&mut self.f)(cx)
345    }
346}
347
348pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
363    Repeat { item }
364}
365
366#[derive(Clone, Debug)]
368#[must_use = "streams do nothing unless polled"]
369pub struct Repeat<T> {
370    item: T,
371}
372
373impl<T> Unpin for Repeat<T> {}
374
375impl<T: Clone> Stream for Repeat<T> {
376    type Item = T;
377
378    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379        Poll::Ready(Some(self.item.clone()))
380    }
381
382    fn size_hint(&self) -> (usize, Option<usize>) {
383        (usize::max_value(), None)
384    }
385}
386
387pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
402where
403    F: FnMut() -> T,
404{
405    RepeatWith { f: repeater }
406}
407
408#[derive(Clone, Debug)]
410#[must_use = "streams do nothing unless polled"]
411pub struct RepeatWith<F> {
412    f: F,
413}
414
415impl<F> Unpin for RepeatWith<F> {}
416
417impl<T, F> Stream for RepeatWith<F>
418where
419    F: FnMut() -> T,
420{
421    type Item = T;
422
423    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
424        let item = (&mut self.f)();
425        Poll::Ready(Some(item))
426    }
427
428    fn size_hint(&self) -> (usize, Option<usize>) {
429        (usize::max_value(), None)
430    }
431}
432
433pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
455where
456    F: FnMut(T) -> Fut,
457    Fut: Future<Output = Option<(Item, T)>>,
458{
459    Unfold {
460        f,
461        state: Some(seed),
462        fut: None,
463    }
464}
465
466pin_project! {
467    #[derive(Clone)]
469    #[must_use = "streams do nothing unless polled"]
470    pub struct Unfold<T, F, Fut> {
471        f: F,
472        state: Option<T>,
473        #[pin]
474        fut: Option<Fut>,
475    }
476}
477
478impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
479where
480    T: fmt::Debug,
481    Fut: fmt::Debug,
482{
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        f.debug_struct("Unfold")
485            .field("state", &self.state)
486            .field("fut", &self.fut)
487            .finish()
488    }
489}
490
491impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
492where
493    F: FnMut(T) -> Fut,
494    Fut: Future<Output = Option<(Item, T)>>,
495{
496    type Item = Item;
497
498    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499        let mut this = self.project();
500
501        if let Some(state) = this.state.take() {
502            this.fut.set(Some((this.f)(state)));
503        }
504
505        let step = ready!(this
506            .fut
507            .as_mut()
508            .as_pin_mut()
509            .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
510            .poll(cx));
511        this.fut.set(None);
512
513        if let Some((item, next_state)) = step {
514            *this.state = Some(next_state);
515            Poll::Ready(Some(item))
516        } else {
517            Poll::Ready(None)
518        }
519    }
520}
521
522pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
544where
545    F: FnMut(T) -> Fut,
546    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
547{
548    TryUnfold {
549        f,
550        state: Some(init),
551        fut: None,
552    }
553}
554
555pin_project! {
556    #[derive(Clone)]
558    #[must_use = "streams do nothing unless polled"]
559    pub struct TryUnfold<T, F, Fut> {
560        f: F,
561        state: Option<T>,
562        #[pin]
563        fut: Option<Fut>,
564    }
565}
566
567impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
568where
569    T: fmt::Debug,
570    Fut: fmt::Debug,
571{
572    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573        f.debug_struct("TryUnfold")
574            .field("state", &self.state)
575            .field("fut", &self.fut)
576            .finish()
577    }
578}
579
580impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
581where
582    F: FnMut(T) -> Fut,
583    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
584{
585    type Item = Result<Item, E>;
586
587    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
588        let mut this = self.project();
589
590        if let Some(state) = this.state.take() {
591            this.fut.set(Some((this.f)(state)));
592        }
593
594        match this.fut.as_mut().as_pin_mut() {
595            None => {
596                Poll::Ready(None)
598            }
599            Some(future) => {
600                let step = ready!(future.poll(cx));
601                this.fut.set(None);
602
603                match step {
604                    Ok(Some((item, next_state))) => {
605                        *this.state = Some(next_state);
606                        Poll::Ready(Some(Ok(item)))
607                    }
608                    Ok(None) => Poll::Ready(None),
609                    Err(e) => Poll::Ready(Some(Err(e))),
610                }
611            }
612        }
613    }
614}
615
616pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
631    OnceFuture {
632        future: Some(future),
633    }
634}
635
636pin_project! {
637    #[derive(Debug)]
639    #[must_use = "futures do nothing unless you `.await` or poll them"]
640    pub struct OnceFuture<F> {
641        #[pin]
642        future: Option<F>,
643    }
644}
645
646impl<F: Future> Stream for OnceFuture<F> {
647    type Item = F::Output;
648
649    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
650        let mut this = self.project();
651
652        match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
653            Some(Poll::Ready(t)) => {
654                this.future.set(None);
655                Poll::Ready(Some(t))
656            }
657            Some(Poll::Pending) => Poll::Pending,
658            None => Poll::Ready(None),
659        }
660    }
661}
662
663pub trait StreamExt: Stream {
665    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
667    where
668        Self: Unpin,
669    {
670        Stream::poll_next(Pin::new(self), cx)
671    }
672
673    fn next(&mut self) -> NextFuture<'_, Self>
693    where
694        Self: Unpin,
695    {
696        NextFuture { stream: self }
697    }
698
699    fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
721    where
722        Self: Stream<Item = Result<T, E>> + Unpin,
723    {
724        TryNextFuture { stream: self }
725    }
726
727    fn count(self) -> CountFuture<Self>
743    where
744        Self: Sized,
745    {
746        CountFuture {
747            stream: self,
748            count: 0,
749        }
750    }
751
752    fn map<T, F>(self, f: F) -> Map<Self, F>
770    where
771        Self: Sized,
772        F: FnMut(Self::Item) -> T,
773    {
774        Map { stream: self, f }
775    }
776
777    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
796    where
797        Self: Sized,
798        U: Stream,
799        F: FnMut(Self::Item) -> U,
800    {
801        FlatMap {
802            stream: self.map(f),
803            inner_stream: None,
804        }
805    }
806
807    fn flatten(self) -> Flatten<Self>
824    where
825        Self: Sized,
826        Self::Item: Stream,
827    {
828        Flatten {
829            stream: self,
830            inner_stream: None,
831        }
832    }
833
834    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
854    where
855        Self: Sized,
856        F: FnMut(Self::Item) -> Fut,
857        Fut: Future,
858    {
859        Then {
860            stream: self,
861            future: None,
862            f,
863        }
864    }
865
866    fn filter<P>(self, predicate: P) -> Filter<Self, P>
883    where
884        Self: Sized,
885        P: FnMut(&Self::Item) -> bool,
886    {
887        Filter {
888            stream: self,
889            predicate,
890        }
891    }
892
893    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
911    where
912        Self: Sized,
913        F: FnMut(Self::Item) -> Option<T>,
914    {
915        FilterMap { stream: self, f }
916    }
917
918    fn take(self, n: usize) -> Take<Self>
934    where
935        Self: Sized,
936    {
937        Take { stream: self, n }
938    }
939
940    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
957    where
958        Self: Sized,
959        P: FnMut(&Self::Item) -> bool,
960    {
961        TakeWhile {
962            stream: self,
963            predicate,
964        }
965    }
966
967    fn skip(self, n: usize) -> Skip<Self>
983    where
984        Self: Sized,
985    {
986        Skip { stream: self, n }
987    }
988
989    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1006    where
1007        Self: Sized,
1008        P: FnMut(&Self::Item) -> bool,
1009    {
1010        SkipWhile {
1011            stream: self,
1012            predicate: Some(predicate),
1013        }
1014    }
1015
1016    fn step_by(self, step: usize) -> StepBy<Self>
1038    where
1039        Self: Sized,
1040    {
1041        assert!(step > 0, "`step` must be greater than zero");
1042        StepBy {
1043            stream: self,
1044            step,
1045            i: 0,
1046        }
1047    }
1048
1049    fn chain<U>(self, other: U) -> Chain<Self, U>
1069    where
1070        Self: Sized,
1071        U: Stream<Item = Self::Item> + Sized,
1072    {
1073        Chain {
1074            first: self.fuse(),
1075            second: other.fuse(),
1076        }
1077    }
1078
1079    fn cloned<'a, T>(self) -> Cloned<Self>
1096    where
1097        Self: Stream<Item = &'a T> + Sized,
1098        T: Clone + 'a,
1099    {
1100        Cloned { stream: self }
1101    }
1102
1103    fn copied<'a, T>(self) -> Copied<Self>
1120    where
1121        Self: Stream<Item = &'a T> + Sized,
1122        T: Copy + 'a,
1123    {
1124        Copied { stream: self }
1125    }
1126
1127    fn collect<C>(self) -> CollectFuture<Self, C>
1142    where
1143        Self: Sized,
1144        C: Default + Extend<Self::Item>,
1145    {
1146        CollectFuture {
1147            stream: self,
1148            collection: Default::default(),
1149        }
1150    }
1151
1152    fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1168    where
1169        Self: Stream<Item = Result<T, E>> + Sized,
1170        C: Default + Extend<T>,
1171    {
1172        TryCollectFuture {
1173            stream: self,
1174            items: Default::default(),
1175        }
1176    }
1177
1178    fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1195    where
1196        Self: Sized,
1197        B: Default + Extend<Self::Item>,
1198        P: FnMut(&Self::Item) -> bool,
1199    {
1200        PartitionFuture {
1201            stream: self,
1202            predicate,
1203            res: Some(Default::default()),
1204        }
1205    }
1206
1207    fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1225    where
1226        Self: Sized,
1227        F: FnMut(T, Self::Item) -> T,
1228    {
1229        FoldFuture {
1230            stream: self,
1231            f,
1232            acc: Some(init),
1233        }
1234    }
1235
1236    fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1263    where
1264        Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1265        F: FnMut(B, T) -> Result<B, E>,
1266    {
1267        TryFoldFuture {
1268            stream: self,
1269            f,
1270            acc: Some(init),
1271        }
1272    }
1273
1274    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1298    where
1299        Self: Sized,
1300        F: FnMut(&mut St, Self::Item) -> Option<B>,
1301    {
1302        Scan {
1303            stream: self,
1304            state_f: (initial_state, f),
1305        }
1306    }
1307
1308    fn fuse(self) -> Fuse<Self>
1324    where
1325        Self: Sized,
1326    {
1327        Fuse {
1328            stream: self,
1329            done: false,
1330        }
1331    }
1332
1333    fn cycle(self) -> Cycle<Self>
1350    where
1351        Self: Clone + Sized,
1352    {
1353        Cycle {
1354            orig: self.clone(),
1355            stream: self,
1356        }
1357    }
1358
1359    fn enumerate(self) -> Enumerate<Self>
1377    where
1378        Self: Sized,
1379    {
1380        Enumerate { stream: self, i: 0 }
1381    }
1382
1383    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1402    where
1403        Self: Sized,
1404        F: FnMut(&Self::Item),
1405    {
1406        Inspect { stream: self, f }
1407    }
1408
1409    fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1427    where
1428        Self: Unpin,
1429    {
1430        NthFuture { stream: self, n }
1431    }
1432
1433    fn last(self) -> LastFuture<Self>
1449    where
1450        Self: Sized,
1451    {
1452        LastFuture {
1453            stream: self,
1454            last: None,
1455        }
1456    }
1457
1458    fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1473    where
1474        Self: Unpin,
1475        P: FnMut(&Self::Item) -> bool,
1476    {
1477        FindFuture {
1478            stream: self,
1479            predicate,
1480        }
1481    }
1482
1483    fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1498    where
1499        Self: Unpin,
1500        F: FnMut(Self::Item) -> Option<B>,
1501    {
1502        FindMapFuture { stream: self, f }
1503    }
1504
1505    fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1521    where
1522        Self: Unpin,
1523        P: FnMut(Self::Item) -> bool,
1524    {
1525        PositionFuture {
1526            stream: self,
1527            predicate,
1528            index: 0,
1529        }
1530    }
1531
1532    fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1553    where
1554        Self: Unpin,
1555        P: FnMut(Self::Item) -> bool,
1556    {
1557        AllFuture {
1558            stream: self,
1559            predicate,
1560        }
1561    }
1562
1563    fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1584    where
1585        Self: Unpin,
1586        P: FnMut(Self::Item) -> bool,
1587    {
1588        AnyFuture {
1589            stream: self,
1590            predicate,
1591        }
1592    }
1593
1594    fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1607    where
1608        Self: Sized,
1609        F: FnMut(Self::Item),
1610    {
1611        ForEachFuture { stream: self, f }
1612    }
1613
1614    fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1641    where
1642        Self: Unpin,
1643        F: FnMut(Self::Item) -> Result<(), E>,
1644    {
1645        TryForEachFuture { stream: self, f }
1646    }
1647
1648    fn zip<U>(self, other: U) -> Zip<Self, U>
1669    where
1670        Self: Sized,
1671        U: Stream,
1672    {
1673        Zip {
1674            item_slot: None,
1675            first: self,
1676            second: other,
1677        }
1678    }
1679
1680    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1696    where
1697        FromA: Default + Extend<A>,
1698        FromB: Default + Extend<B>,
1699        Self: Stream<Item = (A, B)> + Sized,
1700    {
1701        UnzipFuture {
1702            stream: self,
1703            res: Some(Default::default()),
1704        }
1705    }
1706
1707    fn or<S>(self, other: S) -> Or<Self, S>
1724    where
1725        Self: Sized,
1726        S: Stream<Item = Self::Item>,
1727    {
1728        Or {
1729            stream1: self,
1730            stream2: other,
1731        }
1732    }
1733
1734    #[cfg(feature = "std")]
1751    fn race<S>(self, other: S) -> Race<Self, S>
1752    where
1753        Self: Sized,
1754        S: Stream<Item = Self::Item>,
1755    {
1756        Race {
1757            stream1: self,
1758            stream2: other,
1759        }
1760    }
1761
1762    #[cfg(feature = "alloc")]
1779    fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
1780    where
1781        Self: Send + Sized + 'a,
1782    {
1783        Box::pin(self)
1784    }
1785
1786    #[cfg(feature = "alloc")]
1803    fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
1804    where
1805        Self: Sized + 'a,
1806    {
1807        Box::pin(self)
1808    }
1809}
1810
1811impl<S: Stream + ?Sized> StreamExt for S {}
1812
1813#[cfg(feature = "alloc")]
1825pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
1826
1827#[cfg(feature = "alloc")]
1839pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
1840
1841#[derive(Debug)]
1843#[must_use = "futures do nothing unless you `.await` or poll them"]
1844pub struct NextFuture<'a, S: ?Sized> {
1845    stream: &'a mut S,
1846}
1847
1848impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
1849
1850impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
1851    type Output = Option<S::Item>;
1852
1853    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1854        self.stream.poll_next(cx)
1855    }
1856}
1857
1858#[derive(Debug)]
1860#[must_use = "futures do nothing unless you `.await` or poll them"]
1861pub struct TryNextFuture<'a, S: ?Sized> {
1862    stream: &'a mut S,
1863}
1864
1865impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
1866
1867impl<T, E, S> Future for TryNextFuture<'_, S>
1868where
1869    S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
1870{
1871    type Output = Result<Option<T>, E>;
1872
1873    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1874        let res = ready!(self.stream.poll_next(cx));
1875        Poll::Ready(res.transpose())
1876    }
1877}
1878
1879pin_project! {
1880    #[derive(Debug)]
1882    #[must_use = "futures do nothing unless you `.await` or poll them"]
1883    pub struct CountFuture<S: ?Sized> {
1884        count: usize,
1885        #[pin]
1886        stream: S,
1887    }
1888}
1889
1890impl<S: Stream + ?Sized> Future for CountFuture<S> {
1891    type Output = usize;
1892
1893    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1894        loop {
1895            match ready!(self.as_mut().project().stream.poll_next(cx)) {
1896                None => return Poll::Ready(self.count),
1897                Some(_) => *self.as_mut().project().count += 1,
1898            }
1899        }
1900    }
1901}
1902
1903pin_project! {
1904    #[derive(Debug)]
1906    #[must_use = "futures do nothing unless you `.await` or poll them"]
1907    pub struct CollectFuture<S, C> {
1908        #[pin]
1909        stream: S,
1910        collection: C,
1911    }
1912}
1913
1914impl<S, C> Future for CollectFuture<S, C>
1915where
1916    S: Stream,
1917    C: Default + Extend<S::Item>,
1918{
1919    type Output = C;
1920
1921    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
1922        let mut this = self.as_mut().project();
1923        loop {
1924            match ready!(this.stream.as_mut().poll_next(cx)) {
1925                Some(e) => this.collection.extend(Some(e)),
1926                None => return Poll::Ready(mem::take(self.project().collection)),
1927            }
1928        }
1929    }
1930}
1931
1932pin_project! {
1933    #[derive(Debug)]
1935    #[must_use = "futures do nothing unless you `.await` or poll them"]
1936    pub struct TryCollectFuture<S, C> {
1937        #[pin]
1938        stream: S,
1939        items: C,
1940    }
1941}
1942
1943impl<T, E, S, C> Future for TryCollectFuture<S, C>
1944where
1945    S: Stream<Item = Result<T, E>>,
1946    C: Default + Extend<T>,
1947{
1948    type Output = Result<C, E>;
1949
1950    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1951        let mut this = self.project();
1952        Poll::Ready(Ok(loop {
1953            match ready!(this.stream.as_mut().poll_next(cx)?) {
1954                Some(x) => this.items.extend(Some(x)),
1955                None => break mem::take(this.items),
1956            }
1957        }))
1958    }
1959}
1960
1961pin_project! {
1962    #[derive(Debug)]
1964    #[must_use = "futures do nothing unless you `.await` or poll them"]
1965    pub struct PartitionFuture<S, P, B> {
1966        #[pin]
1967        stream: S,
1968        predicate: P,
1969        res: Option<(B, B)>,
1970    }
1971}
1972
1973impl<S, P, B> Future for PartitionFuture<S, P, B>
1974where
1975    S: Stream + Sized,
1976    P: FnMut(&S::Item) -> bool,
1977    B: Default + Extend<S::Item>,
1978{
1979    type Output = (B, B);
1980
1981    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1982        let mut this = self.project();
1983        loop {
1984            match ready!(this.stream.as_mut().poll_next(cx)) {
1985                Some(v) => {
1986                    let res = this.res.as_mut().unwrap();
1987                    if (this.predicate)(&v) {
1988                        res.0.extend(Some(v))
1989                    } else {
1990                        res.1.extend(Some(v))
1991                    }
1992                }
1993                None => return Poll::Ready(this.res.take().unwrap()),
1994            }
1995        }
1996    }
1997}
1998
1999pin_project! {
2000    #[derive(Debug)]
2002    #[must_use = "futures do nothing unless you `.await` or poll them"]
2003    pub struct FoldFuture<S, F, T> {
2004        #[pin]
2005        stream: S,
2006        f: F,
2007        acc: Option<T>,
2008    }
2009}
2010
2011impl<S, F, T> Future for FoldFuture<S, F, T>
2012where
2013    S: Stream,
2014    F: FnMut(T, S::Item) -> T,
2015{
2016    type Output = T;
2017
2018    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2019        let mut this = self.project();
2020        loop {
2021            match ready!(this.stream.as_mut().poll_next(cx)) {
2022                Some(v) => {
2023                    let old = this.acc.take().unwrap();
2024                    let new = (this.f)(old, v);
2025                    *this.acc = Some(new);
2026                }
2027                None => return Poll::Ready(this.acc.take().unwrap()),
2028            }
2029        }
2030    }
2031}
2032
2033#[derive(Debug)]
2035#[must_use = "futures do nothing unless you `.await` or poll them"]
2036pub struct TryFoldFuture<'a, S, F, B> {
2037    stream: &'a mut S,
2038    f: F,
2039    acc: Option<B>,
2040}
2041
2042impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2043
2044impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2045where
2046    S: Stream<Item = Result<T, E>> + Unpin,
2047    F: FnMut(B, T) -> Result<B, E>,
2048{
2049    type Output = Result<B, E>;
2050
2051    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2052        loop {
2053            match ready!(self.stream.poll_next(cx)) {
2054                Some(Err(e)) => return Poll::Ready(Err(e)),
2055                Some(Ok(t)) => {
2056                    let old = self.acc.take().unwrap();
2057                    let new = (&mut self.f)(old, t);
2058
2059                    match new {
2060                        Ok(t) => self.acc = Some(t),
2061                        Err(e) => return Poll::Ready(Err(e)),
2062                    }
2063                }
2064                None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2065            }
2066        }
2067    }
2068}
2069
2070pin_project! {
2071    #[derive(Clone, Debug)]
2073    #[must_use = "streams do nothing unless polled"]
2074    pub struct Scan<S, St, F> {
2075        #[pin]
2076        stream: S,
2077        state_f: (St, F),
2078    }
2079}
2080
2081impl<S, St, F, B> Stream for Scan<S, St, F>
2082where
2083    S: Stream,
2084    F: FnMut(&mut St, S::Item) -> Option<B>,
2085{
2086    type Item = B;
2087
2088    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2089        let mut this = self.project();
2090        this.stream.as_mut().poll_next(cx).map(|item| {
2091            item.and_then(|item| {
2092                let (state, f) = this.state_f;
2093                f(state, item)
2094            })
2095        })
2096    }
2097}
2098
2099pin_project! {
2100    #[derive(Clone, Debug)]
2102    #[must_use = "streams do nothing unless polled"]
2103    pub struct Fuse<S> {
2104        #[pin]
2105        stream: S,
2106        done: bool,
2107    }
2108}
2109
2110impl<S: Stream> Stream for Fuse<S> {
2111    type Item = S::Item;
2112
2113    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2114        let this = self.project();
2115
2116        if *this.done {
2117            Poll::Ready(None)
2118        } else {
2119            let next = ready!(this.stream.poll_next(cx));
2120            if next.is_none() {
2121                *this.done = true;
2122            }
2123            Poll::Ready(next)
2124        }
2125    }
2126}
2127
2128pin_project! {
2129    #[derive(Clone, Debug)]
2131    #[must_use = "streams do nothing unless polled"]
2132    pub struct Map<S, F> {
2133        #[pin]
2134        stream: S,
2135        f: F,
2136    }
2137}
2138
2139impl<S, F, T> Stream for Map<S, F>
2140where
2141    S: Stream,
2142    F: FnMut(S::Item) -> T,
2143{
2144    type Item = T;
2145
2146    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2147        let this = self.project();
2148        let next = ready!(this.stream.poll_next(cx));
2149        Poll::Ready(next.map(this.f))
2150    }
2151
2152    fn size_hint(&self) -> (usize, Option<usize>) {
2153        self.stream.size_hint()
2154    }
2155}
2156
2157pin_project! {
2158    #[derive(Clone, Debug)]
2160    #[must_use = "streams do nothing unless polled"]
2161    pub struct FlatMap<S, U, F> {
2162        #[pin]
2163        stream: Map<S, F>,
2164        #[pin]
2165        inner_stream: Option<U>,
2166    }
2167}
2168
2169impl<S, U, F> Stream for FlatMap<S, U, F>
2170where
2171    S: Stream,
2172    U: Stream,
2173    F: FnMut(S::Item) -> U,
2174{
2175    type Item = U::Item;
2176
2177    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2178        let mut this = self.project();
2179        loop {
2180            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2181                match ready!(inner.poll_next(cx)) {
2182                    Some(item) => return Poll::Ready(Some(item)),
2183                    None => this.inner_stream.set(None),
2184                }
2185            }
2186
2187            match ready!(this.stream.as_mut().poll_next(cx)) {
2188                Some(stream) => this.inner_stream.set(Some(stream)),
2189                None => return Poll::Ready(None),
2190            }
2191        }
2192    }
2193}
2194
2195pin_project! {
2196    #[derive(Clone, Debug)]
2198    #[must_use = "streams do nothing unless polled"]
2199    pub struct Flatten<S: Stream> {
2200        #[pin]
2201        stream: S,
2202        #[pin]
2203        inner_stream: Option<S::Item>,
2204    }
2205}
2206
2207impl<S, U> Stream for Flatten<S>
2208where
2209    S: Stream<Item = U>,
2210    U: Stream,
2211{
2212    type Item = U::Item;
2213
2214    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2215        let mut this = self.project();
2216        loop {
2217            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2218                match ready!(inner.poll_next(cx)) {
2219                    Some(item) => return Poll::Ready(Some(item)),
2220                    None => this.inner_stream.set(None),
2221                }
2222            }
2223
2224            match ready!(this.stream.as_mut().poll_next(cx)) {
2225                Some(inner) => this.inner_stream.set(Some(inner)),
2226                None => return Poll::Ready(None),
2227            }
2228        }
2229    }
2230}
2231
2232pin_project! {
2233    #[derive(Clone, Debug)]
2235    #[must_use = "streams do nothing unless polled"]
2236    pub struct Then<S, F, Fut> {
2237        #[pin]
2238        stream: S,
2239        #[pin]
2240        future: Option<Fut>,
2241        f: F,
2242    }
2243}
2244
2245impl<S, F, Fut> Stream for Then<S, F, Fut>
2246where
2247    S: Stream,
2248    F: FnMut(S::Item) -> Fut,
2249    Fut: Future,
2250{
2251    type Item = Fut::Output;
2252
2253    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2254        let mut this = self.project();
2255
2256        loop {
2257            if let Some(fut) = this.future.as_mut().as_pin_mut() {
2258                let item = ready!(fut.poll(cx));
2259                this.future.set(None);
2260                return Poll::Ready(Some(item));
2261            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2262                this.future.set(Some((this.f)(item)));
2263            } else {
2264                return Poll::Ready(None);
2265            }
2266        }
2267    }
2268
2269    fn size_hint(&self) -> (usize, Option<usize>) {
2270        let future_len = self.future.is_some() as usize;
2271        let (lower, upper) = self.stream.size_hint();
2272        let lower = lower.saturating_add(future_len);
2273        let upper = upper.and_then(|u| u.checked_add(future_len));
2274        (lower, upper)
2275    }
2276}
2277
2278pin_project! {
2279    #[derive(Clone, Debug)]
2281    #[must_use = "streams do nothing unless polled"]
2282    pub struct Filter<S, P> {
2283        #[pin]
2284        stream: S,
2285        predicate: P,
2286    }
2287}
2288
2289impl<S, P> Stream for Filter<S, P>
2290where
2291    S: Stream,
2292    P: FnMut(&S::Item) -> bool,
2293{
2294    type Item = S::Item;
2295
2296    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2297        let mut this = self.project();
2298        loop {
2299            match ready!(this.stream.as_mut().poll_next(cx)) {
2300                None => return Poll::Ready(None),
2301                Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2302                Some(_) => {}
2303            }
2304        }
2305    }
2306}
2307
2308pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2324where
2325    S1: Stream<Item = T>,
2326    S2: Stream<Item = T>,
2327{
2328    Or { stream1, stream2 }
2329}
2330
2331pin_project! {
2332    #[derive(Clone, Debug)]
2334    #[must_use = "streams do nothing unless polled"]
2335    pub struct Or<S1, S2> {
2336        #[pin]
2337        stream1: S1,
2338        #[pin]
2339        stream2: S2,
2340    }
2341}
2342
2343impl<T, S1, S2> Stream for Or<S1, S2>
2344where
2345    S1: Stream<Item = T>,
2346    S2: Stream<Item = T>,
2347{
2348    type Item = T;
2349
2350    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2351        let mut this = self.project();
2352
2353        if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2354            return Poll::Ready(Some(t));
2355        }
2356        this.stream2.as_mut().poll_next(cx)
2357    }
2358}
2359
2360#[cfg(feature = "std")]
2375pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2376where
2377    S1: Stream<Item = T>,
2378    S2: Stream<Item = T>,
2379{
2380    Race { stream1, stream2 }
2381}
2382
2383#[cfg(feature = "std")]
2384pin_project! {
2385    #[derive(Clone, Debug)]
2387    #[must_use = "streams do nothing unless polled"]
2388    pub struct Race<S1, S2> {
2389        #[pin]
2390        stream1: S1,
2391        #[pin]
2392        stream2: S2,
2393    }
2394}
2395
2396#[cfg(feature = "std")]
2397impl<T, S1, S2> Stream for Race<S1, S2>
2398where
2399    S1: Stream<Item = T>,
2400    S2: Stream<Item = T>,
2401{
2402    type Item = T;
2403
2404    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2405        let mut this = self.project();
2406
2407        if fastrand::bool() {
2408            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2409                return Poll::Ready(Some(t));
2410            }
2411            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2412                return Poll::Ready(Some(t));
2413            }
2414        } else {
2415            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2416                return Poll::Ready(Some(t));
2417            }
2418            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2419                return Poll::Ready(Some(t));
2420            }
2421        }
2422        Poll::Pending
2423    }
2424}
2425
2426pin_project! {
2427    #[derive(Clone, Debug)]
2429    #[must_use = "streams do nothing unless polled"]
2430    pub struct FilterMap<S, F> {
2431        #[pin]
2432        stream: S,
2433        f: F,
2434    }
2435}
2436
2437impl<S, F, T> Stream for FilterMap<S, F>
2438where
2439    S: Stream,
2440    F: FnMut(S::Item) -> Option<T>,
2441{
2442    type Item = T;
2443
2444    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2445        let mut this = self.project();
2446        loop {
2447            match ready!(this.stream.as_mut().poll_next(cx)) {
2448                None => return Poll::Ready(None),
2449                Some(v) => {
2450                    if let Some(t) = (this.f)(v) {
2451                        return Poll::Ready(Some(t));
2452                    }
2453                }
2454            }
2455        }
2456    }
2457}
2458
2459pin_project! {
2460    #[derive(Clone, Debug)]
2462    #[must_use = "streams do nothing unless polled"]
2463    pub struct Take<S> {
2464        #[pin]
2465        stream: S,
2466        n: usize,
2467    }
2468}
2469
2470impl<S: Stream> Stream for Take<S> {
2471    type Item = S::Item;
2472
2473    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2474        let this = self.project();
2475
2476        if *this.n == 0 {
2477            Poll::Ready(None)
2478        } else {
2479            let next = ready!(this.stream.poll_next(cx));
2480            match next {
2481                Some(_) => *this.n -= 1,
2482                None => *this.n = 0,
2483            }
2484            Poll::Ready(next)
2485        }
2486    }
2487}
2488
2489pin_project! {
2490    #[derive(Clone, Debug)]
2492    #[must_use = "streams do nothing unless polled"]
2493    pub struct TakeWhile<S, P> {
2494        #[pin]
2495        stream: S,
2496        predicate: P,
2497    }
2498}
2499
2500impl<S, P> Stream for TakeWhile<S, P>
2501where
2502    S: Stream,
2503    P: FnMut(&S::Item) -> bool,
2504{
2505    type Item = S::Item;
2506
2507    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2508        let this = self.project();
2509
2510        match ready!(this.stream.poll_next(cx)) {
2511            Some(v) => {
2512                if (this.predicate)(&v) {
2513                    Poll::Ready(Some(v))
2514                } else {
2515                    Poll::Ready(None)
2516                }
2517            }
2518            None => Poll::Ready(None),
2519        }
2520    }
2521}
2522
2523pin_project! {
2524    #[derive(Clone, Debug)]
2526    #[must_use = "streams do nothing unless polled"]
2527    pub struct Skip<S> {
2528        #[pin]
2529        stream: S,
2530        n: usize,
2531    }
2532}
2533
2534impl<S: Stream> Stream for Skip<S> {
2535    type Item = S::Item;
2536
2537    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2538        let mut this = self.project();
2539        loop {
2540            match ready!(this.stream.as_mut().poll_next(cx)) {
2541                Some(v) => match *this.n {
2542                    0 => return Poll::Ready(Some(v)),
2543                    _ => *this.n -= 1,
2544                },
2545                None => return Poll::Ready(None),
2546            }
2547        }
2548    }
2549}
2550
2551pin_project! {
2552    #[derive(Clone, Debug)]
2554    #[must_use = "streams do nothing unless polled"]
2555    pub struct SkipWhile<S, P> {
2556        #[pin]
2557        stream: S,
2558        predicate: Option<P>,
2559    }
2560}
2561
2562impl<S, P> Stream for SkipWhile<S, P>
2563where
2564    S: Stream,
2565    P: FnMut(&S::Item) -> bool,
2566{
2567    type Item = S::Item;
2568
2569    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2570        let mut this = self.project();
2571        loop {
2572            match ready!(this.stream.as_mut().poll_next(cx)) {
2573                Some(v) => match this.predicate {
2574                    Some(p) => {
2575                        if !p(&v) {
2576                            *this.predicate = None;
2577                            return Poll::Ready(Some(v));
2578                        }
2579                    }
2580                    None => return Poll::Ready(Some(v)),
2581                },
2582                None => return Poll::Ready(None),
2583            }
2584        }
2585    }
2586}
2587
2588pin_project! {
2589    #[derive(Clone, Debug)]
2591    #[must_use = "streams do nothing unless polled"]
2592    pub struct StepBy<S> {
2593        #[pin]
2594        stream: S,
2595        step: usize,
2596        i: usize,
2597    }
2598}
2599
2600impl<S: Stream> Stream for StepBy<S> {
2601    type Item = S::Item;
2602
2603    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2604        let mut this = self.project();
2605        loop {
2606            match ready!(this.stream.as_mut().poll_next(cx)) {
2607                Some(v) => {
2608                    if *this.i == 0 {
2609                        *this.i = *this.step - 1;
2610                        return Poll::Ready(Some(v));
2611                    } else {
2612                        *this.i -= 1;
2613                    }
2614                }
2615                None => return Poll::Ready(None),
2616            }
2617        }
2618    }
2619}
2620
2621pin_project! {
2622    #[derive(Clone, Debug)]
2624    #[must_use = "streams do nothing unless polled"]
2625    pub struct Chain<S, U> {
2626        #[pin]
2627        first: Fuse<S>,
2628        #[pin]
2629        second: Fuse<U>,
2630    }
2631}
2632
2633impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2634    type Item = S::Item;
2635
2636    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2637        let mut this = self.project();
2638
2639        if !this.first.done {
2640            let next = ready!(this.first.as_mut().poll_next(cx));
2641            if let Some(next) = next {
2642                return Poll::Ready(Some(next));
2643            }
2644        }
2645
2646        if !this.second.done {
2647            let next = ready!(this.second.as_mut().poll_next(cx));
2648            if let Some(next) = next {
2649                return Poll::Ready(Some(next));
2650            }
2651        }
2652
2653        if this.first.done && this.second.done {
2654            Poll::Ready(None)
2655        } else {
2656            Poll::Pending
2657        }
2658    }
2659}
2660
2661pin_project! {
2662    #[derive(Clone, Debug)]
2664    #[must_use = "streams do nothing unless polled"]
2665    pub struct Cloned<S> {
2666        #[pin]
2667        stream: S,
2668    }
2669}
2670
2671impl<'a, S, T: 'a> Stream for Cloned<S>
2672where
2673    S: Stream<Item = &'a T>,
2674    T: Clone,
2675{
2676    type Item = T;
2677
2678    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2679        let this = self.project();
2680        let next = ready!(this.stream.poll_next(cx));
2681        Poll::Ready(next.cloned())
2682    }
2683}
2684
2685pin_project! {
2686    #[derive(Clone, Debug)]
2688    #[must_use = "streams do nothing unless polled"]
2689    pub struct Copied<S> {
2690        #[pin]
2691        stream: S,
2692    }
2693}
2694
2695impl<'a, S, T: 'a> Stream for Copied<S>
2696where
2697    S: Stream<Item = &'a T>,
2698    T: Copy,
2699{
2700    type Item = T;
2701
2702    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2703        let this = self.project();
2704        let next = ready!(this.stream.poll_next(cx));
2705        Poll::Ready(next.copied())
2706    }
2707}
2708
2709pin_project! {
2710    #[derive(Clone, Debug)]
2712    #[must_use = "streams do nothing unless polled"]
2713    pub struct Cycle<S> {
2714        orig: S,
2715        #[pin]
2716        stream: S,
2717    }
2718}
2719
2720impl<S> Stream for Cycle<S>
2721where
2722    S: Stream + Clone,
2723{
2724    type Item = S::Item;
2725
2726    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2727        match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2728            Some(item) => Poll::Ready(Some(item)),
2729            None => {
2730                let new = self.as_mut().orig.clone();
2731                self.as_mut().project().stream.set(new);
2732                self.project().stream.poll_next(cx)
2733            }
2734        }
2735    }
2736}
2737
2738pin_project! {
2739    #[derive(Clone, Debug)]
2741    #[must_use = "streams do nothing unless polled"]
2742    pub struct Enumerate<S> {
2743        #[pin]
2744        stream: S,
2745        i: usize,
2746    }
2747}
2748
2749impl<S> Stream for Enumerate<S>
2750where
2751    S: Stream,
2752{
2753    type Item = (usize, S::Item);
2754
2755    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2756        let this = self.project();
2757
2758        match ready!(this.stream.poll_next(cx)) {
2759            Some(v) => {
2760                let ret = (*this.i, v);
2761                *this.i += 1;
2762                Poll::Ready(Some(ret))
2763            }
2764            None => Poll::Ready(None),
2765        }
2766    }
2767}
2768
2769pin_project! {
2770    #[derive(Clone, Debug)]
2772    #[must_use = "streams do nothing unless polled"]
2773    pub struct Inspect<S, F> {
2774        #[pin]
2775        stream: S,
2776        f: F,
2777    }
2778}
2779
2780impl<S, F> Stream for Inspect<S, F>
2781where
2782    S: Stream,
2783    F: FnMut(&S::Item),
2784{
2785    type Item = S::Item;
2786
2787    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2788        let mut this = self.project();
2789        let next = ready!(this.stream.as_mut().poll_next(cx));
2790        if let Some(x) = &next {
2791            (this.f)(x);
2792        }
2793        Poll::Ready(next)
2794    }
2795}
2796
2797#[derive(Debug)]
2799#[must_use = "futures do nothing unless you `.await` or poll them"]
2800pub struct NthFuture<'a, S: ?Sized> {
2801    stream: &'a mut S,
2802    n: usize,
2803}
2804
2805impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
2806
2807impl<'a, S> Future for NthFuture<'a, S>
2808where
2809    S: Stream + Unpin + ?Sized,
2810{
2811    type Output = Option<S::Item>;
2812
2813    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2814        loop {
2815            match ready!(self.stream.poll_next(cx)) {
2816                Some(v) => match self.n {
2817                    0 => return Poll::Ready(Some(v)),
2818                    _ => self.n -= 1,
2819                },
2820                None => return Poll::Ready(None),
2821            }
2822        }
2823    }
2824}
2825
2826pin_project! {
2827    #[derive(Debug)]
2829    #[must_use = "futures do nothing unless you `.await` or poll them"]
2830    pub struct LastFuture<S: Stream> {
2831        #[pin]
2832        stream: S,
2833        last: Option<S::Item>,
2834    }
2835}
2836
2837impl<S: Stream> Future for LastFuture<S> {
2838    type Output = Option<S::Item>;
2839
2840    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2841        let mut this = self.project();
2842        loop {
2843            match ready!(this.stream.as_mut().poll_next(cx)) {
2844                Some(new) => *this.last = Some(new),
2845                None => return Poll::Ready(this.last.take()),
2846            }
2847        }
2848    }
2849}
2850
2851#[derive(Debug)]
2853#[must_use = "futures do nothing unless you `.await` or poll them"]
2854pub struct FindFuture<'a, S: ?Sized, P> {
2855    stream: &'a mut S,
2856    predicate: P,
2857}
2858
2859impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
2860
2861impl<'a, S, P> Future for FindFuture<'a, S, P>
2862where
2863    S: Stream + Unpin + ?Sized,
2864    P: FnMut(&S::Item) -> bool,
2865{
2866    type Output = Option<S::Item>;
2867
2868    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2869        loop {
2870            match ready!(self.stream.poll_next(cx)) {
2871                Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
2872                Some(_) => {}
2873                None => return Poll::Ready(None),
2874            }
2875        }
2876    }
2877}
2878
2879#[derive(Debug)]
2881#[must_use = "futures do nothing unless you `.await` or poll them"]
2882pub struct FindMapFuture<'a, S: ?Sized, F> {
2883    stream: &'a mut S,
2884    f: F,
2885}
2886
2887impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
2888
2889impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
2890where
2891    S: Stream + Unpin + ?Sized,
2892    F: FnMut(S::Item) -> Option<B>,
2893{
2894    type Output = Option<B>;
2895
2896    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2897        loop {
2898            match ready!(self.stream.poll_next(cx)) {
2899                Some(v) => {
2900                    if let Some(v) = (&mut self.f)(v) {
2901                        return Poll::Ready(Some(v));
2902                    }
2903                }
2904                None => return Poll::Ready(None),
2905            }
2906        }
2907    }
2908}
2909
2910#[derive(Debug)]
2912#[must_use = "futures do nothing unless you `.await` or poll them"]
2913pub struct PositionFuture<'a, S: ?Sized, P> {
2914    stream: &'a mut S,
2915    predicate: P,
2916    index: usize,
2917}
2918
2919impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
2920
2921impl<'a, S, P> Future for PositionFuture<'a, S, P>
2922where
2923    S: Stream + Unpin + ?Sized,
2924    P: FnMut(S::Item) -> bool,
2925{
2926    type Output = Option<usize>;
2927
2928    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2929        loop {
2930            match ready!(self.stream.poll_next(cx)) {
2931                Some(v) => {
2932                    if (&mut self.predicate)(v) {
2933                        return Poll::Ready(Some(self.index));
2934                    } else {
2935                        self.index += 1;
2936                    }
2937                }
2938                None => return Poll::Ready(None),
2939            }
2940        }
2941    }
2942}
2943
2944#[derive(Debug)]
2946#[must_use = "futures do nothing unless you `.await` or poll them"]
2947pub struct AllFuture<'a, S: ?Sized, P> {
2948    stream: &'a mut S,
2949    predicate: P,
2950}
2951
2952impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
2953
2954impl<S, P> Future for AllFuture<'_, S, P>
2955where
2956    S: Stream + Unpin + ?Sized,
2957    P: FnMut(S::Item) -> bool,
2958{
2959    type Output = bool;
2960
2961    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2962        loop {
2963            match ready!(self.stream.poll_next(cx)) {
2964                Some(v) => {
2965                    if !(&mut self.predicate)(v) {
2966                        return Poll::Ready(false);
2967                    }
2968                }
2969                None => return Poll::Ready(true),
2970            }
2971        }
2972    }
2973}
2974
2975#[derive(Debug)]
2977#[must_use = "futures do nothing unless you `.await` or poll them"]
2978pub struct AnyFuture<'a, S: ?Sized, P> {
2979    stream: &'a mut S,
2980    predicate: P,
2981}
2982
2983impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
2984
2985impl<S, P> Future for AnyFuture<'_, S, P>
2986where
2987    S: Stream + Unpin + ?Sized,
2988    P: FnMut(S::Item) -> bool,
2989{
2990    type Output = bool;
2991
2992    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2993        loop {
2994            match ready!(self.stream.poll_next(cx)) {
2995                Some(v) => {
2996                    if (&mut self.predicate)(v) {
2997                        return Poll::Ready(true);
2998                    }
2999                }
3000                None => return Poll::Ready(false),
3001            }
3002        }
3003    }
3004}
3005
3006pin_project! {
3007    #[derive(Debug)]
3009    #[must_use = "futures do nothing unless you `.await` or poll them"]
3010    pub struct ForEachFuture<S, F> {
3011        #[pin]
3012        stream: S,
3013        f: F,
3014    }
3015}
3016
3017impl<S, F> Future for ForEachFuture<S, F>
3018where
3019    S: Stream,
3020    F: FnMut(S::Item),
3021{
3022    type Output = ();
3023
3024    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3025        let mut this = self.project();
3026        loop {
3027            match ready!(this.stream.as_mut().poll_next(cx)) {
3028                Some(v) => (this.f)(v),
3029                None => return Poll::Ready(()),
3030            }
3031        }
3032    }
3033}
3034
3035#[derive(Debug)]
3037#[must_use = "futures do nothing unless you `.await` or poll them"]
3038pub struct TryForEachFuture<'a, S: ?Sized, F> {
3039    stream: &'a mut S,
3040    f: F,
3041}
3042
3043impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3044
3045impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3046where
3047    S: Stream + Unpin + ?Sized,
3048    F: FnMut(S::Item) -> Result<(), E>,
3049{
3050    type Output = Result<(), E>;
3051
3052    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3053        loop {
3054            match ready!(self.stream.poll_next(cx)) {
3055                None => return Poll::Ready(Ok(())),
3056                Some(v) => (&mut self.f)(v)?,
3057            }
3058        }
3059    }
3060}
3061
3062pin_project! {
3063    #[derive(Clone, Debug)]
3065    #[must_use = "streams do nothing unless polled"]
3066    pub struct Zip<A: Stream, B> {
3067        item_slot: Option<A::Item>,
3068        #[pin]
3069        first: A,
3070        #[pin]
3071        second: B,
3072    }
3073}
3074
3075impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3076    type Item = (A::Item, B::Item);
3077
3078    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3079        let this = self.project();
3080
3081        if this.item_slot.is_none() {
3082            match this.first.poll_next(cx) {
3083                Poll::Pending => return Poll::Pending,
3084                Poll::Ready(None) => return Poll::Ready(None),
3085                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3086            }
3087        }
3088
3089        let second_item = ready!(this.second.poll_next(cx));
3090        let first_item = this.item_slot.take().unwrap();
3091        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3092    }
3093}
3094
3095pin_project! {
3096    #[derive(Debug)]
3098    #[must_use = "futures do nothing unless you `.await` or poll them"]
3099    pub struct UnzipFuture<S, FromA, FromB> {
3100        #[pin]
3101        stream: S,
3102        res: Option<(FromA, FromB)>,
3103    }
3104}
3105
3106impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3107where
3108    S: Stream<Item = (A, B)>,
3109    FromA: Default + Extend<A>,
3110    FromB: Default + Extend<B>,
3111{
3112    type Output = (FromA, FromB);
3113
3114    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3115        let mut this = self.project();
3116
3117        loop {
3118            match ready!(this.stream.as_mut().poll_next(cx)) {
3119                Some((a, b)) => {
3120                    let res = this.res.as_mut().unwrap();
3121                    res.0.extend(Some(a));
3122                    res.1.extend(Some(b));
3123                }
3124                None => return Poll::Ready(this.res.take().unwrap()),
3125            }
3126        }
3127    }
3128}