actix/address/
mod.rs

1use std::{
2    error, fmt,
3    hash::{Hash, Hasher},
4};
5
6pub(crate) mod channel;
7mod envelope;
8mod message;
9mod queue;
10
11pub(crate) use self::channel::{AddressReceiver, AddressSenderProducer};
12use self::channel::{AddressSender, Sender, WeakAddressSender, WeakSender};
13pub use self::{
14    envelope::{Envelope, EnvelopeProxy, ToEnvelope},
15    message::{RecipientRequest, Request},
16};
17use crate::{
18    actor::Actor,
19    handler::{Handler, Message},
20};
21
22pub enum SendError<T> {
23    Full(T),
24    Closed(T),
25}
26
27#[derive(Clone, Copy, PartialEq, Eq)]
28/// The errors that can occur during the message delivery process.
29pub enum MailboxError {
30    Closed,
31    Timeout,
32}
33
34impl fmt::Debug for MailboxError {
35    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
36        write!(fmt, "MailboxError({})", self)
37    }
38}
39
40impl fmt::Display for MailboxError {
41    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            MailboxError::Closed => write!(fmt, "Mailbox has closed"),
44            MailboxError::Timeout => write!(fmt, "Message delivery timed out"),
45        }
46    }
47}
48
49impl error::Error for MailboxError {}
50
51impl<T> SendError<T> {
52    pub fn into_inner(self) -> T {
53        match self {
54            SendError::Full(msg) | SendError::Closed(msg) => msg,
55        }
56    }
57}
58
59impl<T> error::Error for SendError<T> {}
60
61impl<T> fmt::Debug for SendError<T> {
62    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match *self {
64            SendError::Full(_) => write!(fmt, "SendError::Full(..)"),
65            SendError::Closed(_) => write!(fmt, "SendError::Closed(..)"),
66        }
67    }
68}
69
70impl<T> fmt::Display for SendError<T> {
71    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
72        match *self {
73            SendError::Full(_) => write!(fmt, "send failed because receiver is full"),
74            SendError::Closed(_) => write!(fmt, "send failed because receiver is gone"),
75        }
76    }
77}
78
79/// The address of an actor.
80pub struct Addr<A: Actor> {
81    tx: AddressSender<A>,
82}
83
84impl<A: Actor> Addr<A> {
85    pub fn new(tx: AddressSender<A>) -> Addr<A> {
86        Addr { tx }
87    }
88
89    /// Returns whether the actor is still alive.
90    #[inline]
91    pub fn connected(&self) -> bool {
92        self.tx.connected()
93    }
94
95    /// Sends a message unconditionally, ignoring any potential errors.
96    ///
97    /// The message is always queued, even if the mailbox for the receiver is full. If the mailbox
98    /// is closed, the message is silently dropped.
99    #[inline]
100    pub fn do_send<M>(&self, msg: M)
101    where
102        M: Message + Send,
103        M::Result: Send,
104        A: Handler<M>,
105        A::Context: ToEnvelope<A, M>,
106    {
107        let _ = self.tx.do_send(msg);
108    }
109
110    /// Tries to send a message.
111    ///
112    /// This method fails if actor's mailbox is full or closed. This
113    /// method registers the current task in the receiver's queue.
114    pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<M>>
115    where
116        M: Message + Send + 'static,
117        M::Result: Send,
118        A: Handler<M>,
119        A::Context: ToEnvelope<A, M>,
120    {
121        self.tx.try_send(msg, true)
122    }
123
124    /// Sends an asynchronous message and waits for a response.
125    ///
126    /// The communication channel to the actor is bounded. If the returned request future gets
127    /// dropped, the message is cancelled.
128    #[inline]
129    pub fn send<M>(&self, msg: M) -> Request<A, M>
130    where
131        M: Message + Send + 'static,
132        M::Result: Send,
133        A: Handler<M>,
134        A::Context: ToEnvelope<A, M>,
135    {
136        match self.tx.send(msg) {
137            Ok(rx) => Request::new(Some(rx), None),
138            Err(SendError::Full(msg)) => Request::new(None, Some((self.tx.clone(), msg))),
139            Err(SendError::Closed(_)) => Request::new(None, None),
140        }
141    }
142
143    /// Returns the [`Recipient`] for a specific message type.
144    pub fn recipient<M>(self) -> Recipient<M>
145    where
146        A: Handler<M>,
147        A::Context: ToEnvelope<A, M>,
148        M: Message + Send + 'static,
149        M::Result: Send,
150    {
151        self.into()
152    }
153
154    /// Returns a downgraded [`WeakAddr`].
155    pub fn downgrade(&self) -> WeakAddr<A> {
156        WeakAddr {
157            wtx: self.tx.downgrade(),
158        }
159    }
160}
161
162impl<A: Actor> Clone for Addr<A> {
163    fn clone(&self) -> Addr<A> {
164        Addr {
165            tx: self.tx.clone(),
166        }
167    }
168}
169
170impl<A: Actor> PartialEq for Addr<A> {
171    fn eq(&self, other: &Self) -> bool {
172        self.tx == other.tx
173    }
174}
175
176impl<A: Actor> Eq for Addr<A> {}
177
178impl<A: Actor> Hash for Addr<A> {
179    fn hash<H: Hasher>(&self, state: &mut H) {
180        self.tx.hash(state)
181    }
182}
183
184impl<A: Actor> fmt::Debug for Addr<A> {
185    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
186        fmt.debug_struct("Addr").field("tx", &self.tx).finish()
187    }
188}
189
190/// A weakly referenced counterpart to `Addr<A>`.
191pub struct WeakAddr<A: Actor> {
192    wtx: WeakAddressSender<A>,
193}
194
195impl<A: Actor> WeakAddr<A> {
196    /// Attempts to upgrade the [`WeakAddr<A>`] pointer to an [`Addr<A>`].
197    ///
198    /// Returns `None` if the actor has since been dropped or the
199    /// underlying address is disconnected.
200    pub fn upgrade(&self) -> Option<Addr<A>> {
201        match self.wtx.upgrade() {
202            Some(tx) => {
203                if tx.connected() {
204                    Some(Addr::new(tx))
205                } else {
206                    None
207                }
208            }
209            None => None,
210        }
211    }
212
213    pub fn recipient<M>(self) -> WeakRecipient<M>
214    where
215        A: Handler<M>,
216        A::Context: ToEnvelope<A, M>,
217        M: Message + Send + 'static,
218        M::Result: Send,
219    {
220        self.into()
221    }
222}
223
224impl<A: Actor> Clone for WeakAddr<A> {
225    fn clone(&self) -> WeakAddr<A> {
226        WeakAddr {
227            wtx: self.wtx.clone(),
228        }
229    }
230}
231
232impl<A: Actor> fmt::Debug for WeakAddr<A> {
233    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
234        fmt.debug_struct("WeakAddr")
235            .field("wtx", &self.wtx)
236            .finish()
237    }
238}
239
240impl<A: Actor> PartialEq for WeakAddr<A> {
241    fn eq(&self, other: &Self) -> bool {
242        self.wtx == other.wtx
243    }
244}
245
246impl<A: Actor> std::cmp::Eq for WeakAddr<A> {}
247
248/// The [`Recipient`] type allows to send one specific message to an actor.
249///
250/// You can get a recipient using the `Addr::recipient()` method. It is possible
251/// to use the `Clone::clone()` method to get a cloned recipient.
252pub struct Recipient<M: Message>
253where
254    M: Message + Send,
255    M::Result: Send,
256{
257    tx: Box<dyn Sender<M> + Sync>,
258}
259
260impl<M> Recipient<M>
261where
262    M: Message + Send,
263    M::Result: Send,
264{
265    /// Creates a new recipient.
266    pub(crate) fn new(tx: Box<dyn Sender<M> + Sync>) -> Recipient<M> {
267        Recipient { tx }
268    }
269
270    /// Sends a message.
271    ///
272    /// The message is always queued, even if the mailbox for the receiver is full. If the mailbox
273    /// is closed, the message is silently dropped.
274    pub fn do_send(&self, msg: M) {
275        let _ = self.tx.do_send(msg);
276    }
277
278    /// Attempts to send a message.
279    ///
280    /// This method fails if the actor's mailbox is full or closed. This method registers the
281    /// current task in the receivers queue.
282    pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
283        self.tx.try_send(msg)
284    }
285
286    /// Sends a message and asynchronously wait for a response.
287    ///
288    /// The communication channel to the actor is bounded. If the returned `RecipientRequest` object
289    /// gets dropped, the message is cancelled.
290    pub fn send(&self, msg: M) -> RecipientRequest<M> {
291        match self.tx.send(msg) {
292            Ok(rx) => RecipientRequest::new(Some(rx), None),
293            Err(SendError::Full(msg)) => RecipientRequest::new(None, Some((self.tx.boxed(), msg))),
294            Err(SendError::Closed(_)) => RecipientRequest::new(None, None),
295        }
296    }
297
298    pub fn connected(&self) -> bool {
299        self.tx.connected()
300    }
301
302    /// Returns a downgraded `WeakRecipient`
303    pub fn downgrade(&self) -> WeakRecipient<M> {
304        WeakRecipient {
305            wtx: self.tx.downgrade(),
306        }
307    }
308}
309
310impl<A: Actor, M: Message + Send + 'static> From<Addr<A>> for Recipient<M>
311where
312    A: Handler<M>,
313    M::Result: Send,
314    A::Context: ToEnvelope<A, M>,
315{
316    fn from(addr: Addr<A>) -> Self {
317        Recipient::new(Box::new(addr.tx))
318    }
319}
320
321impl<M> Clone for Recipient<M>
322where
323    M: Message + Send,
324    M::Result: Send,
325{
326    fn clone(&self) -> Recipient<M> {
327        Recipient {
328            tx: self.tx.boxed(),
329        }
330    }
331}
332
333impl<M> PartialEq for Recipient<M>
334where
335    M: Message + Send,
336    M::Result: Send,
337{
338    fn eq(&self, other: &Self) -> bool {
339        self.tx.hash() == other.tx.hash()
340    }
341}
342
343impl<M> Eq for Recipient<M>
344where
345    M: Message + Send,
346    M::Result: Send,
347{
348}
349
350impl<M> Hash for Recipient<M>
351where
352    M: Message + Send,
353    M::Result: Send,
354{
355    fn hash<H: Hasher>(&self, state: &mut H) {
356        self.tx.hash().hash(state)
357    }
358}
359
360impl<M> fmt::Debug for Recipient<M>
361where
362    M: Message + Send,
363    M::Result: Send,
364{
365    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
366        write!(fmt, "Recipient {{ /* omitted */ }}")
367    }
368}
369
370/// A weakly referenced counterpart to `Recipient<M>`
371pub struct WeakRecipient<M: Message>
372where
373    M: Message + Send,
374    M::Result: Send,
375{
376    wtx: Box<dyn WeakSender<M> + Sync>,
377}
378
379impl<M> fmt::Debug for WeakRecipient<M>
380where
381    M: Message + Send,
382    M::Result: Send,
383{
384    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
385        write!(fmt, "WeakRecipient {{ /* omitted */ }}")
386    }
387}
388
389impl<M> Clone for WeakRecipient<M>
390where
391    M: Message + Send,
392    M::Result: Send,
393{
394    fn clone(&self) -> Self {
395        Self {
396            wtx: self.wtx.boxed(),
397        }
398    }
399}
400
401impl<M> From<Recipient<M>> for WeakRecipient<M>
402where
403    M: Message + Send,
404    M::Result: Send,
405{
406    fn from(recipient: Recipient<M>) -> Self {
407        recipient.downgrade()
408    }
409}
410
411impl<M> WeakRecipient<M>
412where
413    M: Message + Send,
414    M::Result: Send,
415{
416    pub(crate) fn new(wtx: Box<dyn WeakSender<M> + Sync>) -> WeakRecipient<M> {
417        WeakRecipient { wtx }
418    }
419
420    /// Attempts to upgrade the `WeakRecipient<M>` pointer to an `Recipient<M>`, similar to `WeakAddr<A>`
421    pub fn upgrade(&self) -> Option<Recipient<M>> {
422        self.wtx.upgrade().map(Recipient::new)
423    }
424}
425
426impl<A: Actor, M: Message + Send + 'static> From<Addr<A>> for WeakRecipient<M>
427where
428    A: Handler<M>,
429    M::Result: Send,
430    A::Context: ToEnvelope<A, M>,
431{
432    fn from(addr: Addr<A>) -> WeakRecipient<M> {
433        addr.downgrade().recipient()
434    }
435}
436
437impl<A: Actor, M: Message + Send + 'static> From<WeakAddr<A>> for WeakRecipient<M>
438where
439    A: Handler<M>,
440    M::Result: Send,
441    A::Context: ToEnvelope<A, M>,
442{
443    fn from(addr: WeakAddr<A>) -> WeakRecipient<M> {
444        WeakRecipient::new(Box::new(addr.wtx))
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use std::sync::{
451        atomic::{AtomicUsize, Ordering},
452        Arc,
453    };
454
455    use crate::prelude::*;
456
457    struct ActorWithSmallMailBox(Arc<AtomicUsize>);
458
459    impl Actor for ActorWithSmallMailBox {
460        type Context = Context<Self>;
461
462        fn started(&mut self, ctx: &mut Self::Context) {
463            ctx.set_mailbox_capacity(1);
464        }
465    }
466
467    pub struct SetCounter(usize);
468
469    impl Message for SetCounter {
470        type Result = ();
471    }
472
473    impl Handler<SetCounter> for ActorWithSmallMailBox {
474        type Result = <SetCounter as Message>::Result;
475
476        fn handle(&mut self, ping: SetCounter, _: &mut Context<Self>) -> Self::Result {
477            self.0.store(ping.0, Ordering::Relaxed)
478        }
479    }
480
481    #[test]
482    fn test_send_over_limit() {
483        let count = Arc::new(AtomicUsize::new(0));
484        let count2 = Arc::clone(&count);
485
486        let sys = System::new();
487
488        sys.block_on(async move {
489            //Actor::started gets called after we relinquish
490            //control to event loop so we just set it ourself.
491            let addr = ActorWithSmallMailBox::create(|ctx| {
492                ctx.set_mailbox_capacity(1);
493                ActorWithSmallMailBox(count2)
494            });
495
496            let fut = async move {
497                let send = addr.clone().send(SetCounter(1));
498                assert!(send.rx_is_some());
499                let addr2 = addr.clone();
500                let send2 = addr2.send(SetCounter(2));
501                assert!(send2.rx_is_some());
502                let send3 = addr2.send(SetCounter(3));
503                assert!(!send3.rx_is_some());
504
505                let _ = send.await;
506                let _ = send2.await;
507                let _ = send3.await;
508
509                System::current().stop();
510            };
511            actix_rt::spawn(fut);
512        });
513
514        // run til system stop
515        sys.run().unwrap();
516
517        assert_eq!(count.load(Ordering::Relaxed), 3);
518    }
519}