actix/
stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_core::{ready, stream::Stream};
7use log::error;
8use pin_project_lite::pin_project;
9
10use crate::{
11    actor::{Actor, ActorContext, ActorState, AsyncContext, SpawnHandle},
12    fut::ActorFuture,
13};
14
15/// Stream handling for Actors.
16///
17/// This is helper trait that allows handling [`Stream`]s in a similar way to normal actor messages.
18/// When stream resolves its next item, `handle()` is called with that item.
19///
20/// When the stream completes, `finished()` is called. By default, it stops Actor execution.
21///
22/// # Examples
23/// ```
24/// use actix::prelude::*;
25/// use futures_util::stream::once;
26///
27/// #[derive(Message)]
28/// #[rtype(result = "()")]
29/// struct Ping;
30///
31/// struct MyActor;
32///
33/// impl StreamHandler<Ping> for MyActor {
34///     fn handle(&mut self, item: Ping, ctx: &mut Context<MyActor>) {
35///         println!("PING");
36///         System::current().stop()
37///     }
38///
39///     fn finished(&mut self, ctx: &mut Self::Context) {
40///         println!("finished");
41///     }
42/// }
43///
44/// impl Actor for MyActor {
45///    type Context = Context<Self>;
46///
47///    fn started(&mut self, ctx: &mut Context<Self>) {
48///        Self::add_stream(once(async { Ping }), ctx);
49///    }
50/// }
51///
52/// #[actix::main]
53/// async fn main() {
54///     MyActor.start();
55///     # System::current().stop();
56/// }
57/// ```
58#[allow(unused_variables)]
59pub trait StreamHandler<I>
60where
61    Self: Actor,
62{
63    /// Called for every message emitted by the stream.
64    fn handle(&mut self, item: I, ctx: &mut Self::Context);
65
66    /// Called when stream emits first item.
67    ///
68    /// Default implementation does nothing.
69    fn started(&mut self, ctx: &mut Self::Context) {}
70
71    /// Called when stream finishes.
72    ///
73    /// Default implementation stops Actor execution.
74    fn finished(&mut self, ctx: &mut Self::Context) {
75        ctx.stop()
76    }
77
78    /// Register a Stream to the actor context.
79    fn add_stream<S>(stream: S, ctx: &mut Self::Context) -> SpawnHandle
80    where
81        S: Stream + 'static,
82        Self: StreamHandler<S::Item>,
83        Self::Context: AsyncContext<Self>,
84    {
85        if ctx.state() == ActorState::Stopped {
86            error!("Context::add_stream called for stopped actor.");
87            SpawnHandle::default()
88        } else {
89            ctx.spawn(ActorStream::new(stream))
90        }
91    }
92}
93
94pin_project! {
95    pub(crate) struct ActorStream<S> {
96        #[pin]
97        stream: S,
98        started: bool,
99    }
100}
101
102impl<S> ActorStream<S> {
103    pub fn new(fut: S) -> Self {
104        Self {
105            stream: fut,
106            started: false,
107        }
108    }
109}
110
111impl<A, S> ActorFuture<A> for ActorStream<S>
112where
113    S: Stream,
114    A: Actor + StreamHandler<S::Item>,
115    A::Context: AsyncContext<A>,
116{
117    type Output = ();
118
119    fn poll(
120        self: Pin<&mut Self>,
121        act: &mut A,
122        ctx: &mut A::Context,
123        task: &mut Context<'_>,
124    ) -> Poll<Self::Output> {
125        let mut this = self.project();
126
127        if !*this.started {
128            *this.started = true;
129            <A as StreamHandler<S::Item>>::started(act, ctx);
130        }
131
132        let mut polled = 0;
133
134        while let Some(msg) = ready!(this.stream.as_mut().poll_next(task)) {
135            A::handle(act, msg, ctx);
136
137            polled += 1;
138
139            if ctx.waiting() {
140                return Poll::Pending;
141            } else if polled == 16 {
142                // Yield after 16 consecutive polls on this stream and self wake up.
143                // This is to prevent starvation of other actor futures when this stream yield
144                // too many item in short period of time.
145                task.waker().wake_by_ref();
146                return Poll::Pending;
147            }
148        }
149
150        A::finished(act, ctx);
151        Poll::Ready(())
152    }
153}