actix/
mailbox.rs

1use std::{fmt, pin::Pin, task, task::Poll};
2
3use futures_core::stream::Stream;
4
5use crate::{
6    actor::{Actor, AsyncContext},
7    address::{channel, Addr, AddressReceiver, AddressSenderProducer, EnvelopeProxy},
8};
9
10/// Default address channel capacity
11pub const DEFAULT_CAPACITY: usize = 16;
12
13pub struct Mailbox<A>
14where
15    A: Actor,
16    A::Context: AsyncContext<A>,
17{
18    msgs: AddressReceiver<A>,
19}
20
21impl<A> fmt::Debug for Mailbox<A>
22where
23    A: Actor,
24    A::Context: AsyncContext<A>,
25{
26    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
27        fmt.debug_struct("Mailbox")
28            .field("capacity", &self.capacity())
29            .finish()
30    }
31}
32
33impl<A> Default for Mailbox<A>
34where
35    A: Actor,
36    A::Context: AsyncContext<A>,
37{
38    #[inline]
39    fn default() -> Self {
40        let (_, rx) = channel::channel(DEFAULT_CAPACITY);
41        Mailbox { msgs: rx }
42    }
43}
44
45impl<A> Mailbox<A>
46where
47    A: Actor,
48    A::Context: AsyncContext<A>,
49{
50    #[inline]
51    pub fn new(msgs: AddressReceiver<A>) -> Self {
52        Self { msgs }
53    }
54
55    pub fn capacity(&self) -> usize {
56        self.msgs.capacity()
57    }
58
59    pub fn set_capacity(&mut self, cap: usize) {
60        self.msgs.set_capacity(cap);
61    }
62
63    #[inline]
64    pub fn connected(&self) -> bool {
65        self.msgs.connected()
66    }
67
68    pub fn address(&self) -> Addr<A> {
69        Addr::new(self.msgs.sender())
70    }
71
72    pub fn sender_producer(&self) -> AddressSenderProducer<A> {
73        self.msgs.sender_producer()
74    }
75
76    pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, task: &mut task::Context<'_>) {
77        #[cfg(feature = "mailbox_assert")]
78        let mut n_polls = 0u16;
79
80        while !ctx.waiting() {
81            match Pin::new(&mut self.msgs).poll_next(task) {
82                Poll::Ready(Some(mut msg)) => {
83                    msg.handle(act, ctx);
84                    #[cfg(feature = "mailbox_assert")]
85                    {
86                        n_polls += 1;
87                        // Maximum number of consecutive polls in a loop is 256.
88                        assert!(n_polls < 256u16, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");
89                    }
90                }
91                Poll::Ready(None) | Poll::Pending => return,
92            }
93        }
94    }
95}