actix/
actor.rs

1use std::time::Duration;
2
3use actix_rt::ArbiterHandle;
4use futures_core::stream::Stream;
5use log::error;
6
7use crate::{
8    address::{channel, Addr},
9    context::Context,
10    context_items::{ActorDelayedMessageItem, ActorMessageItem, ActorMessageStreamItem},
11    fut::{ActorFuture, ActorStreamExt},
12    handler::{Handler, Message},
13    mailbox::DEFAULT_CAPACITY,
14    stream::StreamHandler,
15    utils::{IntervalFunc, TimerFunc},
16};
17
18/// Actors are objects which encapsulate state and behavior.
19///
20/// Actors run within a specific execution context
21/// [`Context<A>`](struct.Context.html). The context object is available
22/// only during execution. Each actor has a separate execution
23/// context. The execution context also controls the lifecycle of an
24/// actor.
25///
26/// Actors communicate exclusively by exchanging messages. The sender
27/// actor can wait for a response. Actors are not referenced directly,
28/// but by address [`Addr`](struct.Addr.html) To be able to handle a
29/// specific message actor has to provide a
30/// [`Handler<M>`](trait.Handler.html) implementation for this
31/// message. All messages are statically typed. A message can be
32/// handled in asynchronous fashion. An actor can spawn other actors
33/// or add futures or streams to the execution context. The actor
34/// trait provides several methods that allow controlling the actor
35/// lifecycle.
36///
37/// # Actor lifecycle
38///
39/// ## Started
40///
41/// An actor starts in the `Started` state, during this state the
42/// `started` method gets called.
43///
44/// ## Running
45///
46/// After an actor's `started` method got called, the actor
47/// transitions to the `Running` state. An actor can stay in the
48/// `running` state for an indefinite amount of time.
49///
50/// ## Stopping
51///
52/// The actor's execution state changes to `stopping` in the following
53/// situations:
54///
55/// * `Context::stop` gets called by actor itself
56/// * all addresses to the actor get dropped
57/// * no evented objects are registered in its context.
58///
59/// An actor can return from the `stopping` state to the `running`
60/// state by creating a new address or adding an evented object, like
61/// a future or stream, in its `Actor::stopping` method.
62///
63/// If an actor changed to a `stopping` state because
64/// `Context::stop()` got called, the context then immediately stops
65/// processing incoming messages and calls the `Actor::stopping()`
66/// method. If an actor does not return back to a `running` state,
67/// all unprocessed messages get dropped.
68///
69/// ## Stopped
70///
71/// If an actor does not modify execution context while in stopping
72/// state, the actor state changes to `Stopped`. This state is
73/// considered final and at this point the actor gets dropped.
74#[allow(unused_variables)]
75pub trait Actor: Sized + Unpin + 'static {
76    /// Actor execution context type
77    type Context: ActorContext;
78
79    /// Called when an actor gets polled the first time.
80    fn started(&mut self, ctx: &mut Self::Context) {}
81
82    /// Called after an actor is in `Actor::Stopping` state.
83    ///
84    /// There can be several reasons for stopping:
85    ///
86    /// - `Context::stop` gets called by the actor itself.
87    /// - All addresses to the current actor get dropped and no more
88    ///   evented objects are left in the context.
89    ///
90    /// An actor can return from the stopping state to the running
91    /// state by returning `Running::Continue`.
92    fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
93        Running::Stop
94    }
95
96    /// Called after an actor is stopped.
97    ///
98    /// This method can be used to perform any needed cleanup work or
99    /// to spawn more actors. This is the final state, after this
100    /// method got called, the actor will be dropped.
101    fn stopped(&mut self, ctx: &mut Self::Context) {}
102
103    /// Start a new asynchronous actor, returning its address.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// use actix::prelude::*;
109    ///
110    /// struct MyActor;
111    /// impl Actor for MyActor {
112    ///     type Context = Context<Self>;
113    /// }
114    ///
115    /// #[actix::main]
116    /// async fn main() {
117    ///     // start actor and get its address
118    ///     let addr = MyActor.start();
119    ///     # System::current().stop();
120    /// }
121    /// ```
122    fn start(self) -> Addr<Self>
123    where
124        Self: Actor<Context = Context<Self>>,
125    {
126        Context::new().run(self)
127    }
128
129    /// Construct and start a new asynchronous actor, returning its
130    /// address.
131    ///
132    /// This is constructs a new actor using the `Default` trait, and
133    /// invokes its `start` method.
134    fn start_default() -> Addr<Self>
135    where
136        Self: Actor<Context = Context<Self>> + Default,
137    {
138        Self::default().start()
139    }
140
141    /// Start new actor in arbiter's thread.
142    fn start_in_arbiter<F>(wrk: &ArbiterHandle, f: F) -> Addr<Self>
143    where
144        Self: Actor<Context = Context<Self>>,
145        F: FnOnce(&mut Context<Self>) -> Self + Send + 'static,
146    {
147        let (tx, rx) = channel::channel(DEFAULT_CAPACITY);
148
149        // create actor
150        wrk.spawn_fn(move || {
151            let mut ctx = Context::with_receiver(rx);
152            let act = f(&mut ctx);
153            let fut = ctx.into_future(act);
154
155            actix_rt::spawn(fut);
156        });
157
158        Addr::new(tx)
159    }
160
161    /// Start a new asynchronous actor given a `Context`.
162    ///
163    /// Use this method if you need the `Context` object during actor
164    /// initialization.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use actix::prelude::*;
170    ///
171    /// struct MyActor {
172    ///     val: usize,
173    /// }
174    /// impl Actor for MyActor {
175    ///     type Context = Context<Self>;
176    /// }
177    ///
178    /// #[actix::main]
179    /// async fn main() {
180    ///     let addr = MyActor::create(|ctx: &mut Context<MyActor>| MyActor { val: 10 });
181    ///     # System::current().stop();
182    /// }
183    /// ```
184    fn create<F>(f: F) -> Addr<Self>
185    where
186        Self: Actor<Context = Context<Self>>,
187        F: FnOnce(&mut Context<Self>) -> Self,
188    {
189        let mut ctx = Context::new();
190        let act = f(&mut ctx);
191        ctx.run(act)
192    }
193}
194
195#[allow(unused_variables)]
196/// Actors with the ability to restart after failure.
197///
198/// Supervised actors can be managed by a
199/// [`Supervisor`](struct.Supervisor.html). As an additional lifecycle
200/// event, the `restarting` method can be implemented.
201///
202/// If a supervised actor fails, its supervisor creates new execution
203/// context and restarts the actor, invoking its `restarting` method.
204/// After a call to this method, the actor's execution state changes
205/// to `Started` and the regular lifecycle process starts.
206///
207/// The `restarting` method gets called with the newly constructed
208/// `Context` object.
209pub trait Supervised: Actor {
210    /// Called when the supervisor restarts a failed actor.
211    fn restarting(&mut self, ctx: &mut <Self as Actor>::Context) {}
212}
213
214/// Actor execution state
215#[derive(PartialEq, Debug, Copy, Clone)]
216pub enum ActorState {
217    /// Actor is started.
218    Started,
219    /// Actor is running.
220    Running,
221    /// Actor is stopping.
222    Stopping,
223    /// Actor is stopped.
224    Stopped,
225}
226
227#[derive(Clone, Copy, Debug, PartialEq)]
228pub enum Running {
229    Stop,
230    Continue,
231}
232
233impl ActorState {
234    /// Indicates whether the actor is alive.
235    pub fn alive(self) -> bool {
236        self == ActorState::Started || self == ActorState::Running
237    }
238    /// Indicates whether the actor is stopped or stopping.
239    pub fn stopping(self) -> bool {
240        self == ActorState::Stopping || self == ActorState::Stopped
241    }
242}
243
244/// Actor execution context.
245///
246/// Each actor runs within a specific execution context. The actor's
247/// associated type `Actor::Context` defines the context to use for
248/// the actor, and must implement the `ActorContext` trait.
249///
250/// The execution context defines the type of execution, and the
251/// actor's communication channels (message handling).
252pub trait ActorContext: Sized {
253    /// Immediately stop processing incoming messages and switch to a
254    /// `stopping` state. This only affects actors that are currently
255    /// `running`. Future attempts to queue messages will fail.
256    fn stop(&mut self);
257
258    /// Terminate actor execution unconditionally. This sets the actor
259    /// into the `stopped` state. This causes future attempts to queue
260    /// messages to fail.
261    fn terminate(&mut self);
262
263    /// Retrieve the current Actor execution state.
264    fn state(&self) -> ActorState;
265}
266
267/// Asynchronous execution context.
268pub trait AsyncContext<A>: ActorContext
269where
270    A: Actor<Context = Self>,
271{
272    /// Returns the address of the context.
273    fn address(&self) -> Addr<A>;
274
275    /// Spawns a future into the context.
276    ///
277    /// Returns a handle of the spawned future, which can be used for
278    /// cancelling its execution.
279    ///
280    /// All futures spawned into an actor's context are cancelled
281    /// during the actor's stopping stage.
282    fn spawn<F>(&mut self, fut: F) -> SpawnHandle
283    where
284        F: ActorFuture<A, Output = ()> + 'static;
285
286    /// Spawns a future into the context, waiting for it to resolve.
287    ///
288    /// This stops processing any incoming events until the future
289    /// resolves.
290    fn wait<F>(&mut self, fut: F)
291    where
292        F: ActorFuture<A, Output = ()> + 'static;
293
294    /// Checks if the context is paused (waiting for future completion or stopping).
295    fn waiting(&self) -> bool;
296
297    /// Cancels a spawned future.
298    ///
299    /// The `handle` is a value returned by the `spawn` method.
300    fn cancel_future(&mut self, handle: SpawnHandle) -> bool;
301
302    /// Registers a stream with the context.
303    ///
304    /// This allows handling a `Stream` in a way similar to normal
305    /// actor messages.
306    ///
307    /// ```
308    /// # use std::io;
309    /// use actix::prelude::*;
310    /// use futures_util::stream::once;
311    ///
312    /// #[derive(Message)]
313    /// #[rtype(result = "()")]
314    /// struct Ping;
315    ///
316    /// struct MyActor;
317    ///
318    /// impl StreamHandler<Ping> for MyActor {
319    ///
320    ///     fn handle(&mut self, item: Ping, ctx: &mut Context<MyActor>) {
321    ///         println!("PING");
322    ///         System::current().stop();
323    ///     }
324    ///
325    ///     fn finished(&mut self, ctx: &mut Self::Context) {
326    ///         println!("finished");
327    ///     }
328    /// }
329    ///
330    /// impl Actor for MyActor {
331    ///    type Context = Context<Self>;
332    ///
333    ///    fn started(&mut self, ctx: &mut Context<Self>) {
334    ///        // add stream
335    ///        ctx.add_stream(once(async { Ping }));
336    ///    }
337    /// }
338    ///
339    /// fn main() {
340    ///     let mut sys = System::new();
341    ///     let addr = sys.block_on(async { MyActor.start() });
342    ///     sys.run();
343    ///  }
344    /// ```
345    fn add_stream<S>(&mut self, fut: S) -> SpawnHandle
346    where
347        S: Stream + 'static,
348        A: StreamHandler<S::Item>,
349    {
350        <A as StreamHandler<S::Item>>::add_stream(fut, self)
351    }
352
353    /// Registers a stream with the context, ignoring errors.
354    ///
355    /// This method is similar to `add_stream` but it skips stream
356    /// errors.
357    ///
358    /// ```
359    /// use actix::prelude::*;
360    /// use futures_util::stream::once;
361    ///
362    /// #[derive(Message)]
363    /// #[rtype(result = "()")]
364    /// struct Ping;
365    ///
366    /// struct MyActor;
367    ///
368    /// impl Handler<Ping> for MyActor {
369    ///     type Result = ();
370    ///
371    ///     fn handle(&mut self, msg: Ping, ctx: &mut Context<MyActor>) {
372    ///         println!("PING");
373    /// #       System::current().stop();
374    ///     }
375    /// }
376    ///
377    /// impl Actor for MyActor {
378    ///     type Context = Context<Self>;
379    ///
380    ///     fn started(&mut self, ctx: &mut Context<Self>) {
381    ///         // add messages stream
382    ///         ctx.add_message_stream(once(async { Ping }));
383    ///     }
384    /// }
385    ///
386    /// # #[actix::main] async fn main () {
387    /// let addr = MyActor.start();
388    /// # System::current().stop(); }
389    /// ```
390    fn add_message_stream<S>(&mut self, fut: S)
391    where
392        S: Stream + 'static,
393        S::Item: Message,
394        A: Handler<S::Item>,
395    {
396        if self.state() == ActorState::Stopped {
397            error!("Context::add_message_stream called for stopped actor.");
398        } else {
399            self.spawn(ActorMessageStreamItem::new(fut));
400        }
401    }
402
403    /// Sends the message `msg` to self. This bypasses the mailbox capacity, and
404    /// will always queue the message. If the actor is in the `stopped` state, an
405    /// error will be raised.
406    fn notify<M>(&mut self, msg: M)
407    where
408        A: Handler<M>,
409        M: Message + 'static,
410    {
411        if self.state() == ActorState::Stopped {
412            error!("Context::notify called for stopped actor.");
413        } else {
414            self.spawn(ActorMessageItem::new(msg));
415        }
416    }
417
418    /// Sends the message `msg` to self after a specified period of time.
419    ///
420    /// Returns a spawn handle which can be used for cancellation. The
421    /// notification gets cancelled if the context's stop method gets
422    /// called. This bypasses the mailbox capacity, and
423    /// will always queue the message. If the actor is in the `stopped` state, an
424    /// error will be raised.
425    fn notify_later<M>(&mut self, msg: M, after: Duration) -> SpawnHandle
426    where
427        A: Handler<M>,
428        M: Message + 'static,
429    {
430        if self.state() == ActorState::Stopped {
431            error!("Context::notify_later called for stopped actor.");
432            SpawnHandle::default()
433        } else {
434            self.spawn(ActorDelayedMessageItem::new(msg, after))
435        }
436    }
437
438    /// Executes a closure after a specified period of time.
439    ///
440    /// The closure gets passed the same actor and its
441    /// context. Execution gets cancelled if the context's stop method
442    /// gets called.
443    fn run_later<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
444    where
445        F: FnOnce(&mut A, &mut A::Context) + 'static,
446    {
447        self.spawn(TimerFunc::new(dur, f))
448    }
449
450    /// Spawns a job to execute the given closure periodically, at a
451    /// specified fixed interval.
452    fn run_interval<F>(&mut self, dur: Duration, f: F) -> SpawnHandle
453    where
454        F: FnMut(&mut A, &mut A::Context) + 'static,
455    {
456        self.spawn(IntervalFunc::new(dur, f).finish())
457    }
458
459    /// Spawns a periodic `task` function to begin executing at the given `start` time, and with the
460    /// given `interval` duration.
461    fn run_interval_at<F>(
462        &mut self,
463        start: tokio::time::Instant,
464        interval: Duration,
465        task: F,
466    ) -> SpawnHandle
467    where
468        F: FnMut(&mut A, &mut A::Context) + 'static,
469    {
470        self.spawn(IntervalFunc::new_at(start, interval, task).finish())
471    }
472}
473
474/// A handle to a spawned future.
475///
476/// Can be used to cancel the future.
477#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
478pub struct SpawnHandle(usize);
479
480impl SpawnHandle {
481    /// Gets the next handle.
482    pub fn next(self) -> SpawnHandle {
483        SpawnHandle(self.0 + 1)
484    }
485    #[doc(hidden)]
486    pub fn into_usize(self) -> usize {
487        self.0
488    }
489}