actix/
handler.rs

1use std::{fmt, future::Future, pin::Pin, sync::Arc};
2
3pub use tokio::sync::oneshot::Sender as OneshotSender;
4
5use crate::{
6    actor::{Actor, AsyncContext},
7    address::Addr,
8    fut::{ActorFuture, ActorFutureExt, LocalBoxActorFuture},
9};
10
11/// Describes how to handle messages of a specific type.
12///
13/// Implementing `Handler` is a general way to handle incoming
14/// messages, streams, and futures.
15///
16/// The type `M` is a message which can be handled by the actor.
17#[allow(unused_variables)]
18pub trait Handler<M>
19where
20    Self: Actor,
21    M: Message,
22{
23    /// The type of value that this handler will return.
24    ///
25    /// Check the [`MessageResponse`] trait for some details
26    /// on how a message can be responded to.
27    type Result: MessageResponse<Self, M>;
28
29    /// This method is called for every message received by this actor.
30    fn handle(&mut self, msg: M, ctx: &mut Self::Context) -> Self::Result;
31}
32
33/// Represent message that can be handled by an actor.
34pub trait Message {
35    /// The type of value that this message will resolved with if it is
36    /// successful.
37    type Result: 'static;
38}
39
40/// Allow users to use `Arc<M>` as a message without having to re-impl `Message`
41impl<M> Message for Arc<M>
42where
43    M: Message,
44{
45    type Result = M::Result;
46}
47
48/// Allow users to use `Box<M>` as a message without having to re-impl `Message`
49impl<M> Message for Box<M>
50where
51    M: Message,
52{
53    type Result = M::Result;
54}
55
56/// A helper type that implements the [`MessageResponse`] trait.
57///
58/// # Examples
59/// ```no_run
60/// use actix::prelude::*;
61///
62/// #[derive(Message)]
63/// #[rtype(Response)]
64/// struct Msg;
65///
66/// struct Response;
67///
68/// struct MyActor;
69///
70/// impl Actor for MyActor {
71///     type Context = Context<Self>;
72/// }
73///
74/// impl Handler<Msg> for MyActor {
75///     type Result = MessageResult<Msg>;
76///
77///     fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
78///         MessageResult(Response {})
79///     }
80/// }
81/// ```
82pub struct MessageResult<M: Message>(pub M::Result);
83
84/// A specialized actor future holder for atomic asynchronous message handling.
85///
86/// Intended be used when the future returned will need exclusive access Actor's
87/// internal state or context, e.g., it can yield at critical sessions.
88/// When the actor starts to process this future, it will not pull any other
89/// spawned futures until this one as been completed.
90/// Check [`ActorFuture`] for available methods for accessing Actor's
91/// internal state.
92///
93/// ## Note
94/// The runtime itself is not blocked in the process, only the Actor,
95/// other futures, and therefore, other actors are still allowed to make
96/// progress when this [`AtomicResponse`] is used.
97///
98/// # Examples
99/// On the following example, the response to `Msg` would always be 29
100/// even if there are multiple `Msg` sent to `MyActor`.
101/// ```no_run
102/// # use actix::prelude::*;
103/// # use actix::clock::sleep;
104/// # use std::time::Duration;
105/// #
106/// # #[derive(Message)]
107/// # #[rtype(usize)]
108/// # struct Msg;
109/// #
110/// # struct MyActor(usize);
111/// #
112/// # impl Actor for MyActor {
113/// #    type Context = Context<Self>;
114/// # }
115/// #
116/// impl Handler<Msg> for MyActor {
117///     type Result = AtomicResponse<Self, usize>;
118///
119///     fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
120///         AtomicResponse::new(Box::pin(
121///             async {}
122///                 .into_actor(self)
123///                 .map(|_, this, _| {
124///                     this.0 = 30;
125///                 })
126///                 .then(|_, this, _| {
127///                     sleep(Duration::from_secs(3)).into_actor(this)
128///                 })
129///                 .map(|_, this, _| {
130///                     this.0 -= 1;
131///                     this.0
132///                 }),
133///         ))
134///     }
135/// }
136/// ```
137pub struct AtomicResponse<A, T>(ResponseActFuture<A, T>);
138
139impl<A, T> AtomicResponse<A, T> {
140    pub fn new(fut: ResponseActFuture<A, T>) -> Self {
141        AtomicResponse(fut)
142    }
143}
144
145impl<A, M> MessageResponse<A, M> for AtomicResponse<A, M::Result>
146where
147    A: Actor,
148    M: Message,
149    A::Context: AsyncContext<A>,
150{
151    fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
152        ctx.wait(self.0.map(|res, _, _| tx.send(res)));
153    }
154}
155
156/// A specialized actor future for asynchronous message handling.
157///
158/// Intended be used when the future returned will, at some point, need to access Actor's internal
159/// state or context in order to finish. Check [`ActorFuture`] for available methods for accessing
160/// Actor's internal state.
161///
162/// # Note
163/// It's important to keep in mind that the provided [`AsyncContext`], does not enforce the poll of
164/// any [`ActorFuture`] to be exclusive. Therefore, if other instances of [`ActorFuture`] are spawned
165/// into this Context **their execution won't necessarily be atomic**. Check [`AtomicResponse`] if you
166/// need exclusive access over the actor.
167///
168/// # Examples
169/// ```no_run
170/// use actix::prelude::*;
171///
172/// #[derive(Message)]
173/// #[rtype(result = "Result<usize, ()>")]
174/// struct Msg;
175///
176/// struct MyActor;
177///
178/// impl Actor for MyActor {
179///     type Context = Context<Self>;
180/// }
181///
182/// impl Handler<Msg> for MyActor {
183///     type Result = ResponseActFuture<Self, Result<usize, ()>>;
184///
185///     fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
186///         Box::pin(
187///             async {
188///                 // Some async computation
189///                 42
190///             }
191///             .into_actor(self) // converts future to ActorFuture
192///             .map(|res, _act, _ctx| {
193///                 // Do some computation with actor's state or context
194///                 Ok(res)
195///             }),
196///         )
197///     }
198/// }
199/// ```
200pub type ResponseActFuture<A, I> = LocalBoxActorFuture<A, I>;
201
202/// A specialized future for asynchronous message handling.
203///
204/// Intended be used for when the future returned doesn't
205/// need to access Actor's internal state or context to progress, either
206/// because it's completely agnostic to it, or because the required data has
207/// already been moved inside the future and it won't need Actor state to continue.
208///
209/// # Examples
210/// ```no_run
211/// use actix::prelude::*;
212///
213/// #[derive(Message)]
214/// #[rtype(result = "Result<(), ()>")]
215/// struct Msg;
216///
217/// struct MyActor;
218///
219/// impl Actor for MyActor {
220///     type Context = Context<Self>;
221/// }
222///
223/// impl Handler<Msg> for MyActor {
224///     type Result = ResponseFuture<Result<(), ()>>;
225///
226///     fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result {
227///         Box::pin(async move {
228///             // Some async computation
229///             Ok(())
230///         })
231///     }
232/// }
233/// ```
234pub type ResponseFuture<I> = Pin<Box<dyn Future<Output = I>>>;
235
236/// A trait which defines message responses.
237///
238/// We offer implementation for some common language types, if you need
239/// to respond with a new type you can use [`MessageResult`].
240///
241/// If `Actor::Context` implements [`AsyncContext`] it's possible to handle
242/// the message asynchronously.
243/// For asynchronous message handling we offer the following possible response types:
244/// - [`ResponseFuture`] should be used for when the future returned doesn't
245///   need to access Actor's internal state or context to progress, either
246///   because it's completely agnostic to it or because the required data has
247///   already been moved to it and it won't need Actor state to continue.
248/// - [`ResponseActFuture`] should be used when the future returned
249///   will, at some point, need to access Actor's internal state or context
250///   in order to finish.
251/// - [`AtomicResponse`] should be used when the future returned needs exclusive
252///   access to  Actor's internal state or context.
253pub trait MessageResponse<A: Actor, M: Message> {
254    fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>);
255}
256
257impl<A, M> MessageResponse<A, M> for MessageResult<M>
258where
259    A: Actor,
260    M: Message,
261{
262    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
263        tx.send(self.0)
264    }
265}
266
267impl<A, M, I, E> MessageResponse<A, M> for Result<I, E>
268where
269    A: Actor,
270    M: Message<Result = Self>,
271    I: 'static,
272    E: 'static,
273{
274    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
275        tx.send(self)
276    }
277}
278
279impl<A, M, I> MessageResponse<A, M> for Arc<I>
280where
281    A: Actor,
282    M: Message<Result = Self>,
283    I: 'static,
284{
285    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
286        tx.send(self)
287    }
288}
289
290impl<A, M, I> MessageResponse<A, M> for Option<I>
291where
292    A: Actor,
293    M: Message<Result = Self>,
294    I: 'static,
295{
296    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
297        tx.send(self)
298    }
299}
300
301impl<A, M, I> MessageResponse<A, M> for Vec<I>
302where
303    A: Actor,
304    M: Message<Result = Self>,
305    I: 'static,
306{
307    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
308        tx.send(self)
309    }
310}
311
312impl<A, M, B> MessageResponse<A, M> for Addr<B>
313where
314    A: Actor,
315    M: Message<Result = Self>,
316    B: Actor,
317{
318    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<Self>>) {
319        tx.send(self)
320    }
321}
322
323impl<A, M> MessageResponse<A, M> for ResponseActFuture<A, M::Result>
324where
325    A: Actor,
326    M: Message,
327    A::Context: AsyncContext<A>,
328{
329    fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
330        ctx.spawn(self.map(|res, _, _| tx.send(res)));
331    }
332}
333
334/// [`MessageResponse`] trait impl to enable the use of any `I: 'static` with asynchronous
335/// message handling
336///
337/// # Examples
338/// Usage with `Result<I,E>`:
339/// ```
340/// # pub struct MyActorAsync {}
341/// # impl Actor for MyActorAsync { type Context = actix::Context<Self>; }
342/// # use actix::prelude::*;
343/// # use core::pin::Pin;
344/// #
345/// pub struct MyQuestion{}
346/// impl Message for MyQuestion {
347///     type Result = Result<u8,u8>;
348/// }
349/// impl Handler<MyQuestion> for MyActorAsync {
350///     type Result = ResponseFuture<Result<u8,u8>>;
351///     fn handle(&mut self, question: MyQuestion, _ctx: &mut Context<Self>) -> Self::Result {
352///         Box::pin(async {Ok(5)})
353///     }
354/// }
355/// ```
356/// Usage with `Option<I>`:
357/// ```
358/// # pub struct MyActorAsync {}
359/// # impl Actor for MyActorAsync { type Context = actix::Context<Self>; }
360/// # use actix::prelude::*;
361/// # use core::pin::Pin;
362/// pub struct MyQuestion{}
363/// impl Message for MyQuestion {
364///     type Result = Option<u8>;
365/// }
366/// impl Handler<MyQuestion> for MyActorAsync {
367///     type Result = ResponseFuture<Option<u8>>;
368///     fn handle(&mut self, question: MyQuestion, _ctx: &mut Context<Self>) -> Self::Result {
369///         Box::pin(async {Some(5)})
370///     }
371/// }
372/// ```
373/// Usage with any `I: 'static`:
374/// ```
375/// # pub struct MyActorAsync {}
376/// # impl Actor for MyActorAsync { type Context = actix::Context<Self>; }
377/// # use actix::prelude::*;
378/// # use core::pin::Pin;
379/// pub struct MyQuestion{}
380/// impl Message for MyQuestion {
381///     type Result = u8;
382/// }
383/// impl Handler<MyQuestion> for MyActorAsync {
384///     type Result = ResponseFuture<u8>;
385///     fn handle(&mut self, question: MyQuestion, _ctx: &mut Context<Self>) -> Self::Result {
386///         Box::pin(async {5})
387///     }
388/// }
389/// ```
390impl<A, M> MessageResponse<A, M> for ResponseFuture<M::Result>
391where
392    A: Actor,
393    M: Message,
394{
395    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
396        actix_rt::spawn(async { tx.send(self.await) });
397    }
398}
399
400enum ResponseTypeItem<I> {
401    Result(I),
402    Fut(Pin<Box<dyn Future<Output = I>>>),
403}
404
405/// Helper type for representing different type of message responses
406pub struct Response<I> {
407    item: ResponseTypeItem<I>,
408}
409
410impl<I> fmt::Debug for Response<I> {
411    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
412        let mut fmt = fmt.debug_struct("Response");
413        match self.item {
414            ResponseTypeItem::Result(_) => fmt.field("item", &"Result(_)".to_string()),
415            ResponseTypeItem::Fut(_) => fmt.field("item", &"Fut(_)".to_string()),
416        }
417        .finish()
418    }
419}
420
421impl<I> Response<I> {
422    /// Creates an asynchronous response.
423    pub fn fut<T>(fut: T) -> Self
424    where
425        T: Future<Output = I> + 'static,
426    {
427        Self {
428            item: ResponseTypeItem::Fut(Box::pin(fut)),
429        }
430    }
431
432    /// Creates a response.
433    pub fn reply(val: I) -> Self {
434        Self {
435            item: ResponseTypeItem::Result(val),
436        }
437    }
438}
439
440impl<A, M> MessageResponse<A, M> for Response<M::Result>
441where
442    A: Actor,
443    M: Message,
444{
445    fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
446        match self.item {
447            ResponseTypeItem::Fut(fut) => {
448                actix_rt::spawn(async { tx.send(fut.await) });
449            }
450            ResponseTypeItem::Result(res) => tx.send(res),
451        }
452    }
453}
454
455enum ActorResponseTypeItem<A, I> {
456    Result(I),
457    Fut(Pin<Box<dyn ActorFuture<A, Output = I>>>),
458}
459
460/// A helper type for representing different types of message responses.
461pub struct ActorResponse<A, I> {
462    item: ActorResponseTypeItem<A, I>,
463}
464
465impl<A, I> fmt::Debug for ActorResponse<A, I> {
466    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
467        let mut fmt = fmt.debug_struct("ActorResponse");
468        match self.item {
469            ActorResponseTypeItem::Result(_) => fmt.field("item", &"Result(_)".to_string()),
470            ActorResponseTypeItem::Fut(_) => fmt.field("item", &"Fut(_)".to_string()),
471        }
472        .finish()
473    }
474}
475
476impl<A: Actor, I> ActorResponse<A, I> {
477    /// Creates a response.
478    pub fn reply(val: I) -> Self {
479        Self {
480            item: ActorResponseTypeItem::Result(val),
481        }
482    }
483
484    /// Creates an asynchronous response.
485    pub fn r#async<T>(fut: T) -> Self
486    where
487        T: ActorFuture<A, Output = I> + 'static,
488    {
489        Self {
490            item: ActorResponseTypeItem::Fut(Box::pin(fut)),
491        }
492    }
493}
494
495impl<A, M> MessageResponse<A, M> for ActorResponse<A, M::Result>
496where
497    A: Actor,
498    M: Message,
499    A::Context: AsyncContext<A>,
500{
501    fn handle(self, ctx: &mut A::Context, tx: Option<OneshotSender<M::Result>>) {
502        match self.item {
503            ActorResponseTypeItem::Fut(fut) => {
504                let fut = fut.map(|res, _, _| tx.send(res));
505                ctx.spawn(fut);
506            }
507            ActorResponseTypeItem::Result(res) => tx.send(res),
508        }
509    }
510}
511
512impl<A: Actor, I> From<Pin<Box<dyn ActorFuture<A, Output = I>>>> for ActorResponse<A, I> {
513    fn from(fut: Pin<Box<dyn ActorFuture<A, Output = I>>>) -> Self {
514        Self {
515            item: ActorResponseTypeItem::Fut(fut),
516        }
517    }
518}
519
520macro_rules! SIMPLE_RESULT {
521    ($type:ty) => {
522        impl<A, M> MessageResponse<A, M> for $type
523        where
524            A: Actor,
525            M: Message<Result = $type>,
526        {
527            fn handle(self, _: &mut A::Context, tx: Option<OneshotSender<$type>>) {
528                tx.send(self)
529            }
530        }
531    };
532}
533
534SIMPLE_RESULT!(());
535SIMPLE_RESULT!(u8);
536SIMPLE_RESULT!(u16);
537SIMPLE_RESULT!(u32);
538SIMPLE_RESULT!(u64);
539SIMPLE_RESULT!(usize);
540SIMPLE_RESULT!(i8);
541SIMPLE_RESULT!(i16);
542SIMPLE_RESULT!(i32);
543SIMPLE_RESULT!(i64);
544SIMPLE_RESULT!(isize);
545SIMPLE_RESULT!(f32);
546SIMPLE_RESULT!(f64);
547SIMPLE_RESULT!(String);
548SIMPLE_RESULT!(bool);
549
550// Helper trait for send one shot message from Option<Sender> type.
551// None and error are ignored.
552trait OneshotSend<M> {
553    fn send(self, msg: M);
554}
555
556impl<M> OneshotSend<M> for Option<OneshotSender<M>> {
557    fn send(self, msg: M) {
558        if let Some(tx) = self {
559            let _ = tx.send(msg);
560        }
561    }
562}