actix/supervisor.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 task::{self, Poll},
5};
6
7use actix_rt::ArbiterHandle;
8use pin_project_lite::pin_project;
9
10use crate::{
11 actor::{Actor, AsyncContext, Supervised},
12 address::{channel, Addr},
13 context::Context,
14 context_impl::ContextFut,
15 mailbox::DEFAULT_CAPACITY,
16};
17
18pin_project! {
19 /// Actor supervisor
20 ///
21 /// A Supervisor manages incoming messages for an actor. In case of actor failure,
22 /// the supervisor creates a new execution context and restarts the actor's lifecycle.
23 /// A Supervisor does not re-create their actor, it just calls the `restarting()`
24 /// method.
25 ///
26 /// Supervisors have the same lifecycle as actors. If all addresses to
27 /// a supervisor gets dropped and its actor does not execute anything, the supervisor
28 /// terminates.
29 ///
30 /// Supervisors can not guarantee that their actors successfully processes incoming
31 /// messages. If the actor fails during message processing, the message can not be
32 /// recovered. The sender would receive an `Err(Cancelled)` error in this situation.
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// # use actix::prelude::*;
38 /// #[derive(Message)]
39 /// #[rtype(result = "()")]
40 /// struct Die;
41 ///
42 /// struct MyActor;
43 ///
44 /// impl Actor for MyActor {
45 /// type Context = Context<Self>;
46 /// }
47 ///
48 /// // To use actor with supervisor actor has to implement `Supervised` trait
49 /// impl actix::Supervised for MyActor {
50 /// fn restarting(&mut self, ctx: &mut Context<MyActor>) {
51 /// println!("restarting");
52 /// }
53 /// }
54 ///
55 /// impl Handler<Die> for MyActor {
56 /// type Result = ();
57 ///
58 /// fn handle(&mut self, _: Die, ctx: &mut Context<MyActor>) {
59 /// ctx.stop();
60 /// # System::current().stop();
61 /// }
62 /// }
63 ///
64 /// fn main() {
65 /// let mut sys = System::new();
66 ///
67 /// let addr = sys.block_on(async { actix::Supervisor::start(|_| MyActor) });
68 /// addr.do_send(Die);
69 ///
70 /// sys.run();
71 /// }
72 /// ```
73 #[derive(Debug)]
74 pub struct Supervisor<A>
75 where
76 A: Supervised,
77 A: Actor<Context = Context<A>>
78 {
79 #[pin]
80 fut: ContextFut<A, Context<A>>,
81 }
82}
83
84impl<A> Supervisor<A>
85where
86 A: Supervised + Actor<Context = Context<A>>,
87{
88 /// Start new supervised actor in current tokio runtime.
89 ///
90 /// Type of returned address depends on variable type. For example to get
91 /// `Addr<Syn, _>` of newly created actor, use explicitly `Addr<Syn,
92 /// _>` type as type of a variable.
93 ///
94 /// ```
95 /// # use actix::prelude::*;
96 /// struct MyActor;
97 ///
98 /// impl Actor for MyActor {
99 /// type Context = Context<Self>;
100 /// }
101 ///
102 /// # impl actix::Supervised for MyActor {}
103 /// # fn main() {
104 /// # System::new().block_on(async {
105 /// // Get `Addr` of a MyActor actor
106 /// let addr = actix::Supervisor::start(|_| MyActor);
107 /// # System::current().stop();
108 /// # });}
109 /// ```
110 pub fn start<F>(f: F) -> Addr<A>
111 where
112 F: FnOnce(&mut A::Context) -> A + 'static,
113 A: Actor<Context = Context<A>>,
114 {
115 // create actor
116 let mut ctx = Context::new();
117 let act = f(&mut ctx);
118 let addr = ctx.address();
119 let fut = ctx.into_future(act);
120
121 // create supervisor
122 actix_rt::spawn(Self { fut });
123
124 addr
125 }
126
127 /// Start new supervised actor in arbiter's thread.
128 pub fn start_in_arbiter<F>(sys: &ArbiterHandle, f: F) -> Addr<A>
129 where
130 A: Actor<Context = Context<A>>,
131 F: FnOnce(&mut Context<A>) -> A + Send + 'static,
132 {
133 let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
134
135 sys.spawn_fn(move || {
136 let mut ctx = Context::with_receiver(rx);
137 let act = f(&mut ctx);
138 let fut = ctx.into_future(act);
139
140 actix_rt::spawn(Self { fut });
141 });
142
143 Addr::new(tx)
144 }
145}
146
147#[doc(hidden)]
148impl<A> Future for Supervisor<A>
149where
150 A: Supervised + Actor<Context = Context<A>>,
151{
152 type Output = ();
153
154 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
155 let mut this = self.project();
156 loop {
157 match this.fut.as_mut().poll(cx) {
158 Poll::Pending => return Poll::Pending,
159 Poll::Ready(_) => {
160 // stop if context's address is not connected
161 if !this.fut.restart() {
162 return Poll::Ready(());
163 }
164 }
165 }
166 }
167 }
168}