1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_core::ready;
7use pin_project_lite::pin_project;
8
9use crate::{
10 actor::Actor,
11 fut::{ActorFuture, ActorStream},
12};
13
14pin_project! {
15 #[derive(Debug)]
17 #[must_use = "streams do nothing unless polled"]
18 pub struct Then<S, F, Fut> {
19 #[pin]
20 stream: S,
21 #[pin]
22 future: Option<Fut>,
23 f: F,
24 }
25}
26
27pub(super) fn new<S, A, F, Fut>(stream: S, f: F) -> Then<S, F, Fut>
28where
29 S: ActorStream<A>,
30 A: Actor,
31 F: FnMut(S::Item, &mut A, &mut A::Context) -> Fut,
32 Fut: ActorFuture<A>,
33{
34 Then {
35 stream,
36 f,
37 future: None,
38 }
39}
40
41impl<S, A, F, Fut> ActorStream<A> for Then<S, F, Fut>
42where
43 S: ActorStream<A>,
44 A: Actor,
45 F: FnMut(S::Item, &mut A, &mut A::Context) -> Fut,
46 Fut: ActorFuture<A>,
47{
48 type Item = Fut::Output;
49
50 fn poll_next(
51 self: Pin<&mut Self>,
52 act: &mut A,
53 ctx: &mut A::Context,
54 task: &mut Context<'_>,
55 ) -> Poll<Option<Self::Item>> {
56 let mut this = self.project();
57
58 Poll::Ready(loop {
59 if let Some(fut) = this.future.as_mut().as_pin_mut() {
60 let item = ready!(fut.poll(act, ctx, task));
61 this.future.set(None);
62 break Some(item);
63 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(act, ctx, task)) {
64 this.future.set(Some((this.f)(item, act, ctx)));
65 } else {
66 break None;
67 }
68 })
69 }
70}