actix/
supervisor.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{self, Poll},
5};
6
7use actix_rt::ArbiterHandle;
8use pin_project_lite::pin_project;
9
10use crate::{
11    actor::{Actor, AsyncContext, Supervised},
12    address::{channel, Addr},
13    context::Context,
14    context_impl::ContextFut,
15    mailbox::DEFAULT_CAPACITY,
16};
17
18pin_project! {
19    /// Actor supervisor
20    ///
21    /// A Supervisor manages incoming messages for an actor. In case of actor failure,
22    /// the supervisor creates a new execution context and restarts the actor's lifecycle.
23    /// A Supervisor does not re-create their actor, it just calls the `restarting()`
24    /// method.
25    ///
26    /// Supervisors have the same lifecycle as actors. If all addresses to
27    /// a supervisor gets dropped and its actor does not execute anything, the supervisor
28    /// terminates.
29    ///
30    /// Supervisors can not guarantee that their actors successfully processes incoming
31    /// messages. If the actor fails during message processing, the message can not be
32    /// recovered. The sender would receive an `Err(Cancelled)` error in this situation.
33    ///
34    /// # Examples
35    ///
36    /// ```
37    /// # use actix::prelude::*;
38    /// #[derive(Message)]
39    /// #[rtype(result = "()")]
40    /// struct Die;
41    ///
42    /// struct MyActor;
43    ///
44    /// impl Actor for MyActor {
45    ///     type Context = Context<Self>;
46    /// }
47    ///
48    /// // To use actor with supervisor actor has to implement `Supervised` trait
49    /// impl actix::Supervised for MyActor {
50    ///     fn restarting(&mut self, ctx: &mut Context<MyActor>) {
51    ///         println!("restarting");
52    ///     }
53    /// }
54    ///
55    /// impl Handler<Die> for MyActor {
56    ///     type Result = ();
57    ///
58    ///     fn handle(&mut self, _: Die, ctx: &mut Context<MyActor>) {
59    ///         ctx.stop();
60    /// #       System::current().stop();
61    ///     }
62    /// }
63    ///
64    /// fn main() {
65    ///     let mut sys = System::new();
66    ///
67    ///     let addr = sys.block_on(async { actix::Supervisor::start(|_| MyActor) });
68    ///     addr.do_send(Die);
69    ///
70    ///     sys.run();
71    /// }
72    /// ```
73    #[derive(Debug)]
74    pub struct Supervisor<A>
75    where
76        A: Supervised,
77        A: Actor<Context = Context<A>>
78    {
79        #[pin]
80        fut: ContextFut<A, Context<A>>,
81    }
82}
83
84impl<A> Supervisor<A>
85where
86    A: Supervised + Actor<Context = Context<A>>,
87{
88    /// Start new supervised actor in current tokio runtime.
89    ///
90    /// Type of returned address depends on variable type. For example to get
91    /// `Addr<Syn, _>` of newly created actor, use explicitly `Addr<Syn,
92    /// _>` type as type of a variable.
93    ///
94    /// ```
95    /// # use actix::prelude::*;
96    /// struct MyActor;
97    ///
98    /// impl Actor for MyActor {
99    ///     type Context = Context<Self>;
100    /// }
101    ///
102    /// # impl actix::Supervised for MyActor {}
103    /// # fn main() {
104    /// #    System::new().block_on(async {
105    /// // Get `Addr` of a MyActor actor
106    /// let addr = actix::Supervisor::start(|_| MyActor);
107    /// #         System::current().stop();
108    /// # });}
109    /// ```
110    pub fn start<F>(f: F) -> Addr<A>
111    where
112        F: FnOnce(&mut A::Context) -> A + 'static,
113        A: Actor<Context = Context<A>>,
114    {
115        // create actor
116        let mut ctx = Context::new();
117        let act = f(&mut ctx);
118        let addr = ctx.address();
119        let fut = ctx.into_future(act);
120
121        // create supervisor
122        actix_rt::spawn(Self { fut });
123
124        addr
125    }
126
127    /// Start new supervised actor in arbiter's thread.
128    pub fn start_in_arbiter<F>(sys: &ArbiterHandle, f: F) -> Addr<A>
129    where
130        A: Actor<Context = Context<A>>,
131        F: FnOnce(&mut Context<A>) -> A + Send + 'static,
132    {
133        let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
134
135        sys.spawn_fn(move || {
136            let mut ctx = Context::with_receiver(rx);
137            let act = f(&mut ctx);
138            let fut = ctx.into_future(act);
139
140            actix_rt::spawn(Self { fut });
141        });
142
143        Addr::new(tx)
144    }
145}
146
147#[doc(hidden)]
148impl<A> Future for Supervisor<A>
149where
150    A: Supervised + Actor<Context = Context<A>>,
151{
152    type Output = ();
153
154    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
155        let mut this = self.project();
156        loop {
157            match this.fut.as_mut().poll(cx) {
158                Poll::Pending => return Poll::Pending,
159                Poll::Ready(_) => {
160                    // stop if context's address is not connected
161                    if !this.fut.restart() {
162                        return Poll::Ready(());
163                    }
164                }
165            }
166        }
167    }
168}