1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_core::ready;
7use pin_project_lite::pin_project;
8
9use crate::{
10 actor::Actor,
11 fut::{ActorFuture, ActorStream},
12};
13
14pin_project! {
15 #[derive(Debug)]
17 #[must_use = "streams do nothing unless polled"]
18 pub struct Fold<S, F, Fut, T> {
19 #[pin]
20 stream: S,
21 f: F,
22 accum: Option<T>,
23 #[pin]
24 future: Option<Fut>,
25 }
26}
27
28pub(super) fn new<S, A, F, Fut>(stream: S, f: F, t: Fut::Output) -> Fold<S, F, Fut, Fut::Output>
29where
30 S: ActorStream<A>,
31 A: Actor,
32 F: FnMut(Fut::Output, S::Item, &mut A, &mut A::Context) -> Fut,
33 Fut: ActorFuture<A>,
34{
35 Fold {
36 stream,
37 f,
38 accum: Some(t),
39 future: None,
40 }
41}
42
43impl<S, A, F, Fut> ActorFuture<A> for Fold<S, F, Fut, Fut::Output>
44where
45 S: ActorStream<A>,
46 A: Actor,
47 F: FnMut(Fut::Output, S::Item, &mut A, &mut A::Context) -> Fut,
48 Fut: ActorFuture<A>,
49{
50 type Output = Fut::Output;
51
52 fn poll(
53 self: Pin<&mut Self>,
54 act: &mut A,
55 ctx: &mut A::Context,
56 task: &mut Context<'_>,
57 ) -> Poll<Self::Output> {
58 let mut this = self.project();
59 Poll::Ready(loop {
60 if let Some(fut) = this.future.as_mut().as_pin_mut() {
61 *this.accum = Some(ready!(fut.poll(act, ctx, task)));
63 this.future.set(None);
64 } else if this.accum.is_some() {
65 let res = ready!(this.stream.as_mut().poll_next(act, ctx, task));
67 let a = this.accum.take().unwrap();
68 if let Some(item) = res {
69 this.future.set(Some((this.f)(a, item, act, ctx)));
70 } else {
71 break a;
72 }
73 } else {
74 panic!("Fold polled after completion")
75 }
76 })
77 }
78}