actix/fut/stream/
mod.rs

1use std::{
2    marker::PhantomData,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Duration,
6};
7
8pub use collect::Collect;
9pub use finish::Finish;
10pub use fold::Fold;
11use futures_core::stream::Stream;
12pub use map::Map;
13use pin_project_lite::pin_project;
14pub use skip_while::SkipWhile;
15pub use take_while::TakeWhile;
16pub use then::Then;
17pub use timeout::Timeout;
18
19use super::future::ActorFuture;
20use crate::actor::Actor;
21
22mod collect;
23mod finish;
24mod fold;
25mod map;
26mod skip_while;
27mod take_while;
28mod then;
29mod timeout;
30
31/// A stream of values, not all of which may have been produced yet.
32///
33/// This is similar to `futures_util::stream::Stream` trait, except it works with `Actor`
34pub trait ActorStream<A: Actor> {
35    /// The type of item this stream will yield on success.
36    type Item;
37
38    fn poll_next(
39        self: Pin<&mut Self>,
40        srv: &mut A,
41        ctx: &mut A::Context,
42        task: &mut Context<'_>,
43    ) -> Poll<Option<Self::Item>>;
44}
45
46pub trait ActorStreamExt<A: Actor>: ActorStream<A> {
47    /// Maps this stream's items to a different type, returning a new stream of
48    /// the resulting type.
49    ///
50    /// The provided closure is executed over all elements of this stream as
51    /// they are made available. It is executed inline with calls to
52    /// [`poll_next`](ActorStream::poll_next).
53    ///
54    /// Note that this function consumes the stream passed into it and returns a
55    /// wrapped version of it, similar to the existing `map` methods in the
56    /// standard library.
57    fn map<F, U>(self, f: F) -> Map<Self, F>
58    where
59        F: FnMut(Self::Item, &mut A, &mut A::Context) -> U,
60        Self: Sized,
61    {
62        map::new(self, f)
63    }
64
65    /// Computes from this stream's items new items of a different type using
66    /// an asynchronous closure.
67    ///
68    /// The provided closure `f` will be called with an `Item` once a value is
69    /// ready, it returns a future which will then be run to completion
70    /// to produce the next value on this stream.
71    ///
72    /// Note that this function consumes the stream passed into it and returns a
73    /// wrapped version of it.
74    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
75    where
76        F: FnMut(Self::Item, &mut A, &mut A::Context) -> Fut,
77        Fut: ActorFuture<A>,
78        Self: Sized,
79    {
80        then::new(self, f)
81    }
82
83    /// Execute an accumulating asynchronous computation over a stream,
84    /// collecting all the values into one final result.
85    ///
86    /// This combinator will accumulate all values returned by this stream
87    /// according to the closure provided. The initial state is also provided to
88    /// this method and then is returned again by each execution of the closure.
89    /// Once the entire stream has been exhausted the returned future will
90    /// resolve to this value.
91    fn fold<F, Fut>(self, init: Fut::Output, f: F) -> Fold<Self, F, Fut, Fut::Output>
92    where
93        F: FnMut(Fut::Output, Self::Item, &mut A, &mut A::Context) -> Fut,
94        Fut: ActorFuture<A>,
95        Self: Sized,
96    {
97        fold::new(self, f, init)
98    }
99
100    /// Take elements from this stream while the provided asynchronous predicate
101    /// resolves to `true`.
102    ///
103    /// This function, like `Iterator::take_while`, will take elements from the
104    /// stream until the predicate `f` resolves to `false`. Once one element
105    /// returns `false`, it will always return that the stream is done.
106    fn take_while<F, Fut>(self, f: F) -> TakeWhile<Self, Self::Item, F, Fut>
107    where
108        F: FnMut(&Self::Item, &mut A, &mut A::Context) -> Fut,
109        Fut: ActorFuture<A, Output = bool>,
110        Self: Sized,
111    {
112        take_while::new(self, f)
113    }
114
115    /// Skip elements on this stream while the provided asynchronous predicate
116    /// resolves to `true`.
117    ///
118    /// This function, like `Iterator::skip_while`, will skip elements on the
119    /// stream until the predicate `f` resolves to `false`. Once one element
120    /// returns `false`, all future elements will be returned from the underlying
121    /// stream.
122    fn skip_while<F, Fut>(self, f: F) -> SkipWhile<Self, Self::Item, F, Fut>
123    where
124        F: FnMut(&Self::Item, &mut A, &mut A::Context) -> Fut,
125        Fut: ActorFuture<A, Output = bool>,
126        Self: Sized,
127    {
128        skip_while::new(self, f)
129    }
130
131    /// Add timeout to stream.
132    ///
133    /// `Err(())` returned as a timeout error.
134    fn timeout(self, timeout: Duration) -> Timeout<Self>
135    where
136        Self: Sized,
137    {
138        Timeout::new(self, timeout)
139    }
140
141    /// Transforms a stream into a collection, returning a
142    /// future representing the result of that computation.
143    ///
144    /// The returned future will be resolved when the stream terminates.
145    fn collect<C>(self) -> Collect<Self, C>
146    where
147        C: Default + Extend<Self::Item>,
148        Self: Sized,
149    {
150        Collect::new(self)
151    }
152
153    /// Transforms a stream to a future that resolves when stream finishes.
154    fn finish(self) -> Finish<Self>
155    where
156        Self: Sized,
157    {
158        Finish::new(self)
159    }
160}
161
162impl<A, S> ActorStreamExt<A> for S
163where
164    S: ActorStream<A>,
165    A: Actor,
166{
167}
168
169/// Helper trait that allows conversion of normal stream into `ActorStream`
170pub trait WrapStream<A>
171where
172    A: Actor,
173{
174    /// The stream that this type can be converted into.
175    type Stream: ActorStream<A>;
176
177    #[deprecated(since = "0.11.0", note = "Please use WrapStream::into_actor")]
178    #[doc(hidden)]
179    fn actstream(self) -> Self::Stream;
180
181    /// Convert normal stream to a [`ActorStream`]
182    fn into_actor(self, a: &A) -> Self::Stream;
183}
184
185impl<S, A> WrapStream<A> for S
186where
187    S: Stream,
188    A: Actor,
189{
190    type Stream = StreamWrap<S, A>;
191
192    #[doc(hidden)]
193    fn actstream(self) -> Self::Stream {
194        wrap_stream(self)
195    }
196
197    fn into_actor(self, _: &A) -> Self::Stream {
198        wrap_stream(self)
199    }
200}
201
202pin_project! {
203    pub struct StreamWrap<S, A>
204    where
205        S: Stream,
206        A: Actor
207    {
208        #[pin]
209        stream: S,
210        _act: PhantomData<A>
211    }
212}
213
214/// Converts normal stream into `ActorStream`
215pub fn wrap_stream<S, A>(stream: S) -> StreamWrap<S, A>
216where
217    S: Stream,
218    A: Actor,
219{
220    StreamWrap {
221        stream,
222        _act: PhantomData,
223    }
224}
225
226impl<S, A> ActorStream<A> for StreamWrap<S, A>
227where
228    S: Stream,
229    A: Actor,
230{
231    type Item = S::Item;
232
233    fn poll_next(
234        self: Pin<&mut Self>,
235        _: &mut A,
236        _: &mut A::Context,
237        task: &mut Context<'_>,
238    ) -> Poll<Option<Self::Item>> {
239        self.project().stream.poll_next(task)
240    }
241}