actix/fut/stream/
fold.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_core::ready;
7use pin_project_lite::pin_project;
8
9use crate::{
10    actor::Actor,
11    fut::{ActorFuture, ActorStream},
12};
13
14pin_project! {
15    /// Stream for the [`fold`](super::ActorStreamExt::fold) method.
16    #[derive(Debug)]
17    #[must_use = "streams do nothing unless polled"]
18    pub struct Fold<S, F, Fut, T> {
19        #[pin]
20        stream: S,
21        f: F,
22        accum: Option<T>,
23        #[pin]
24        future: Option<Fut>,
25    }
26}
27
28pub(super) fn new<S, A, F, Fut>(stream: S, f: F, t: Fut::Output) -> Fold<S, F, Fut, Fut::Output>
29where
30    S: ActorStream<A>,
31    A: Actor,
32    F: FnMut(Fut::Output, S::Item, &mut A, &mut A::Context) -> Fut,
33    Fut: ActorFuture<A>,
34{
35    Fold {
36        stream,
37        f,
38        accum: Some(t),
39        future: None,
40    }
41}
42
43impl<S, A, F, Fut> ActorFuture<A> for Fold<S, F, Fut, Fut::Output>
44where
45    S: ActorStream<A>,
46    A: Actor,
47    F: FnMut(Fut::Output, S::Item, &mut A, &mut A::Context) -> Fut,
48    Fut: ActorFuture<A>,
49{
50    type Output = Fut::Output;
51
52    fn poll(
53        self: Pin<&mut Self>,
54        act: &mut A,
55        ctx: &mut A::Context,
56        task: &mut Context<'_>,
57    ) -> Poll<Self::Output> {
58        let mut this = self.project();
59        Poll::Ready(loop {
60            if let Some(fut) = this.future.as_mut().as_pin_mut() {
61                // we're currently processing a future to produce a new accum value
62                *this.accum = Some(ready!(fut.poll(act, ctx, task)));
63                this.future.set(None);
64            } else if this.accum.is_some() {
65                // we're waiting on a new item from the stream
66                let res = ready!(this.stream.as_mut().poll_next(act, ctx, task));
67                let a = this.accum.take().unwrap();
68                if let Some(item) = res {
69                    this.future.set(Some((this.f)(a, item, act, ctx)));
70                } else {
71                    break a;
72                }
73            } else {
74                panic!("Fold polled after completion")
75            }
76        })
77    }
78}