1use std::{fmt, pin::Pin, task, task::Poll};
2
3use futures_core::stream::Stream;
4
5use crate::{
6 actor::{Actor, AsyncContext},
7 address::{channel, Addr, AddressReceiver, AddressSenderProducer, EnvelopeProxy},
8};
9
10pub const DEFAULT_CAPACITY: usize = 16;
12
13pub struct Mailbox<A>
14where
15 A: Actor,
16 A::Context: AsyncContext<A>,
17{
18 msgs: AddressReceiver<A>,
19}
20
21impl<A> fmt::Debug for Mailbox<A>
22where
23 A: Actor,
24 A::Context: AsyncContext<A>,
25{
26 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
27 fmt.debug_struct("Mailbox")
28 .field("capacity", &self.capacity())
29 .finish()
30 }
31}
32
33impl<A> Default for Mailbox<A>
34where
35 A: Actor,
36 A::Context: AsyncContext<A>,
37{
38 #[inline]
39 fn default() -> Self {
40 let (_, rx) = channel::channel(DEFAULT_CAPACITY);
41 Mailbox { msgs: rx }
42 }
43}
44
45impl<A> Mailbox<A>
46where
47 A: Actor,
48 A::Context: AsyncContext<A>,
49{
50 #[inline]
51 pub fn new(msgs: AddressReceiver<A>) -> Self {
52 Self { msgs }
53 }
54
55 pub fn capacity(&self) -> usize {
56 self.msgs.capacity()
57 }
58
59 pub fn set_capacity(&mut self, cap: usize) {
60 self.msgs.set_capacity(cap);
61 }
62
63 #[inline]
64 pub fn connected(&self) -> bool {
65 self.msgs.connected()
66 }
67
68 pub fn address(&self) -> Addr<A> {
69 Addr::new(self.msgs.sender())
70 }
71
72 pub fn sender_producer(&self) -> AddressSenderProducer<A> {
73 self.msgs.sender_producer()
74 }
75
76 pub fn poll(&mut self, act: &mut A, ctx: &mut A::Context, task: &mut task::Context<'_>) {
77 #[cfg(feature = "mailbox_assert")]
78 let mut n_polls = 0u16;
79
80 while !ctx.waiting() {
81 match Pin::new(&mut self.msgs).poll_next(task) {
82 Poll::Ready(Some(mut msg)) => {
83 msg.handle(act, ctx);
84 #[cfg(feature = "mailbox_assert")]
85 {
86 n_polls += 1;
87 assert!(n_polls < 256u16, "Too many messages are being processed. Use Self::Context::notify() instead of direct use of address");
89 }
90 }
91 Poll::Ready(None) | Poll::Pending => return,
92 }
93 }
94 }
95}