1use std::{
2 error, fmt,
3 hash::{Hash, Hasher},
4};
5
6pub(crate) mod channel;
7mod envelope;
8mod message;
9mod queue;
10
11pub(crate) use self::channel::{AddressReceiver, AddressSenderProducer};
12use self::channel::{AddressSender, Sender, WeakAddressSender, WeakSender};
13pub use self::{
14 envelope::{Envelope, EnvelopeProxy, ToEnvelope},
15 message::{RecipientRequest, Request},
16};
17use crate::{
18 actor::Actor,
19 handler::{Handler, Message},
20};
21
22pub enum SendError<T> {
23 Full(T),
24 Closed(T),
25}
26
27#[derive(Clone, Copy, PartialEq, Eq)]
28pub enum MailboxError {
30 Closed,
31 Timeout,
32}
33
34impl fmt::Debug for MailboxError {
35 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
36 write!(fmt, "MailboxError({})", self)
37 }
38}
39
40impl fmt::Display for MailboxError {
41 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 MailboxError::Closed => write!(fmt, "Mailbox has closed"),
44 MailboxError::Timeout => write!(fmt, "Message delivery timed out"),
45 }
46 }
47}
48
49impl error::Error for MailboxError {}
50
51impl<T> SendError<T> {
52 pub fn into_inner(self) -> T {
53 match self {
54 SendError::Full(msg) | SendError::Closed(msg) => msg,
55 }
56 }
57}
58
59impl<T> error::Error for SendError<T> {}
60
61impl<T> fmt::Debug for SendError<T> {
62 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match *self {
64 SendError::Full(_) => write!(fmt, "SendError::Full(..)"),
65 SendError::Closed(_) => write!(fmt, "SendError::Closed(..)"),
66 }
67 }
68}
69
70impl<T> fmt::Display for SendError<T> {
71 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
72 match *self {
73 SendError::Full(_) => write!(fmt, "send failed because receiver is full"),
74 SendError::Closed(_) => write!(fmt, "send failed because receiver is gone"),
75 }
76 }
77}
78
79pub struct Addr<A: Actor> {
81 tx: AddressSender<A>,
82}
83
84impl<A: Actor> Addr<A> {
85 pub fn new(tx: AddressSender<A>) -> Addr<A> {
86 Addr { tx }
87 }
88
89 #[inline]
91 pub fn connected(&self) -> bool {
92 self.tx.connected()
93 }
94
95 #[inline]
100 pub fn do_send<M>(&self, msg: M)
101 where
102 M: Message + Send,
103 M::Result: Send,
104 A: Handler<M>,
105 A::Context: ToEnvelope<A, M>,
106 {
107 let _ = self.tx.do_send(msg);
108 }
109
110 pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<M>>
115 where
116 M: Message + Send + 'static,
117 M::Result: Send,
118 A: Handler<M>,
119 A::Context: ToEnvelope<A, M>,
120 {
121 self.tx.try_send(msg, true)
122 }
123
124 #[inline]
129 pub fn send<M>(&self, msg: M) -> Request<A, M>
130 where
131 M: Message + Send + 'static,
132 M::Result: Send,
133 A: Handler<M>,
134 A::Context: ToEnvelope<A, M>,
135 {
136 match self.tx.send(msg) {
137 Ok(rx) => Request::new(Some(rx), None),
138 Err(SendError::Full(msg)) => Request::new(None, Some((self.tx.clone(), msg))),
139 Err(SendError::Closed(_)) => Request::new(None, None),
140 }
141 }
142
143 pub fn recipient<M>(self) -> Recipient<M>
145 where
146 A: Handler<M>,
147 A::Context: ToEnvelope<A, M>,
148 M: Message + Send + 'static,
149 M::Result: Send,
150 {
151 self.into()
152 }
153
154 pub fn downgrade(&self) -> WeakAddr<A> {
156 WeakAddr {
157 wtx: self.tx.downgrade(),
158 }
159 }
160}
161
162impl<A: Actor> Clone for Addr<A> {
163 fn clone(&self) -> Addr<A> {
164 Addr {
165 tx: self.tx.clone(),
166 }
167 }
168}
169
170impl<A: Actor> PartialEq for Addr<A> {
171 fn eq(&self, other: &Self) -> bool {
172 self.tx == other.tx
173 }
174}
175
176impl<A: Actor> Eq for Addr<A> {}
177
178impl<A: Actor> Hash for Addr<A> {
179 fn hash<H: Hasher>(&self, state: &mut H) {
180 self.tx.hash(state)
181 }
182}
183
184impl<A: Actor> fmt::Debug for Addr<A> {
185 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
186 fmt.debug_struct("Addr").field("tx", &self.tx).finish()
187 }
188}
189
190pub struct WeakAddr<A: Actor> {
192 wtx: WeakAddressSender<A>,
193}
194
195impl<A: Actor> WeakAddr<A> {
196 pub fn upgrade(&self) -> Option<Addr<A>> {
201 match self.wtx.upgrade() {
202 Some(tx) => {
203 if tx.connected() {
204 Some(Addr::new(tx))
205 } else {
206 None
207 }
208 }
209 None => None,
210 }
211 }
212
213 pub fn recipient<M>(self) -> WeakRecipient<M>
214 where
215 A: Handler<M>,
216 A::Context: ToEnvelope<A, M>,
217 M: Message + Send + 'static,
218 M::Result: Send,
219 {
220 self.into()
221 }
222}
223
224impl<A: Actor> Clone for WeakAddr<A> {
225 fn clone(&self) -> WeakAddr<A> {
226 WeakAddr {
227 wtx: self.wtx.clone(),
228 }
229 }
230}
231
232impl<A: Actor> fmt::Debug for WeakAddr<A> {
233 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
234 fmt.debug_struct("WeakAddr")
235 .field("wtx", &self.wtx)
236 .finish()
237 }
238}
239
240impl<A: Actor> PartialEq for WeakAddr<A> {
241 fn eq(&self, other: &Self) -> bool {
242 self.wtx == other.wtx
243 }
244}
245
246impl<A: Actor> std::cmp::Eq for WeakAddr<A> {}
247
248pub struct Recipient<M: Message>
253where
254 M: Message + Send,
255 M::Result: Send,
256{
257 tx: Box<dyn Sender<M> + Sync>,
258}
259
260impl<M> Recipient<M>
261where
262 M: Message + Send,
263 M::Result: Send,
264{
265 pub(crate) fn new(tx: Box<dyn Sender<M> + Sync>) -> Recipient<M> {
267 Recipient { tx }
268 }
269
270 pub fn do_send(&self, msg: M) {
275 let _ = self.tx.do_send(msg);
276 }
277
278 pub fn try_send(&self, msg: M) -> Result<(), SendError<M>> {
283 self.tx.try_send(msg)
284 }
285
286 pub fn send(&self, msg: M) -> RecipientRequest<M> {
291 match self.tx.send(msg) {
292 Ok(rx) => RecipientRequest::new(Some(rx), None),
293 Err(SendError::Full(msg)) => RecipientRequest::new(None, Some((self.tx.boxed(), msg))),
294 Err(SendError::Closed(_)) => RecipientRequest::new(None, None),
295 }
296 }
297
298 pub fn connected(&self) -> bool {
299 self.tx.connected()
300 }
301
302 pub fn downgrade(&self) -> WeakRecipient<M> {
304 WeakRecipient {
305 wtx: self.tx.downgrade(),
306 }
307 }
308}
309
310impl<A: Actor, M: Message + Send + 'static> From<Addr<A>> for Recipient<M>
311where
312 A: Handler<M>,
313 M::Result: Send,
314 A::Context: ToEnvelope<A, M>,
315{
316 fn from(addr: Addr<A>) -> Self {
317 Recipient::new(Box::new(addr.tx))
318 }
319}
320
321impl<M> Clone for Recipient<M>
322where
323 M: Message + Send,
324 M::Result: Send,
325{
326 fn clone(&self) -> Recipient<M> {
327 Recipient {
328 tx: self.tx.boxed(),
329 }
330 }
331}
332
333impl<M> PartialEq for Recipient<M>
334where
335 M: Message + Send,
336 M::Result: Send,
337{
338 fn eq(&self, other: &Self) -> bool {
339 self.tx.hash() == other.tx.hash()
340 }
341}
342
343impl<M> Eq for Recipient<M>
344where
345 M: Message + Send,
346 M::Result: Send,
347{
348}
349
350impl<M> Hash for Recipient<M>
351where
352 M: Message + Send,
353 M::Result: Send,
354{
355 fn hash<H: Hasher>(&self, state: &mut H) {
356 self.tx.hash().hash(state)
357 }
358}
359
360impl<M> fmt::Debug for Recipient<M>
361where
362 M: Message + Send,
363 M::Result: Send,
364{
365 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
366 write!(fmt, "Recipient {{ /* omitted */ }}")
367 }
368}
369
370pub struct WeakRecipient<M: Message>
372where
373 M: Message + Send,
374 M::Result: Send,
375{
376 wtx: Box<dyn WeakSender<M> + Sync>,
377}
378
379impl<M> fmt::Debug for WeakRecipient<M>
380where
381 M: Message + Send,
382 M::Result: Send,
383{
384 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
385 write!(fmt, "WeakRecipient {{ /* omitted */ }}")
386 }
387}
388
389impl<M> Clone for WeakRecipient<M>
390where
391 M: Message + Send,
392 M::Result: Send,
393{
394 fn clone(&self) -> Self {
395 Self {
396 wtx: self.wtx.boxed(),
397 }
398 }
399}
400
401impl<M> From<Recipient<M>> for WeakRecipient<M>
402where
403 M: Message + Send,
404 M::Result: Send,
405{
406 fn from(recipient: Recipient<M>) -> Self {
407 recipient.downgrade()
408 }
409}
410
411impl<M> WeakRecipient<M>
412where
413 M: Message + Send,
414 M::Result: Send,
415{
416 pub(crate) fn new(wtx: Box<dyn WeakSender<M> + Sync>) -> WeakRecipient<M> {
417 WeakRecipient { wtx }
418 }
419
420 pub fn upgrade(&self) -> Option<Recipient<M>> {
422 self.wtx.upgrade().map(Recipient::new)
423 }
424}
425
426impl<A: Actor, M: Message + Send + 'static> From<Addr<A>> for WeakRecipient<M>
427where
428 A: Handler<M>,
429 M::Result: Send,
430 A::Context: ToEnvelope<A, M>,
431{
432 fn from(addr: Addr<A>) -> WeakRecipient<M> {
433 addr.downgrade().recipient()
434 }
435}
436
437impl<A: Actor, M: Message + Send + 'static> From<WeakAddr<A>> for WeakRecipient<M>
438where
439 A: Handler<M>,
440 M::Result: Send,
441 A::Context: ToEnvelope<A, M>,
442{
443 fn from(addr: WeakAddr<A>) -> WeakRecipient<M> {
444 WeakRecipient::new(Box::new(addr.wtx))
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use std::sync::{
451 atomic::{AtomicUsize, Ordering},
452 Arc,
453 };
454
455 use crate::prelude::*;
456
457 struct ActorWithSmallMailBox(Arc<AtomicUsize>);
458
459 impl Actor for ActorWithSmallMailBox {
460 type Context = Context<Self>;
461
462 fn started(&mut self, ctx: &mut Self::Context) {
463 ctx.set_mailbox_capacity(1);
464 }
465 }
466
467 pub struct SetCounter(usize);
468
469 impl Message for SetCounter {
470 type Result = ();
471 }
472
473 impl Handler<SetCounter> for ActorWithSmallMailBox {
474 type Result = <SetCounter as Message>::Result;
475
476 fn handle(&mut self, ping: SetCounter, _: &mut Context<Self>) -> Self::Result {
477 self.0.store(ping.0, Ordering::Relaxed)
478 }
479 }
480
481 #[test]
482 fn test_send_over_limit() {
483 let count = Arc::new(AtomicUsize::new(0));
484 let count2 = Arc::clone(&count);
485
486 let sys = System::new();
487
488 sys.block_on(async move {
489 let addr = ActorWithSmallMailBox::create(|ctx| {
492 ctx.set_mailbox_capacity(1);
493 ActorWithSmallMailBox(count2)
494 });
495
496 let fut = async move {
497 let send = addr.clone().send(SetCounter(1));
498 assert!(send.rx_is_some());
499 let addr2 = addr.clone();
500 let send2 = addr2.send(SetCounter(2));
501 assert!(send2.rx_is_some());
502 let send3 = addr2.send(SetCounter(3));
503 assert!(!send3.rx_is_some());
504
505 let _ = send.await;
506 let _ = send2.await;
507 let _ = send3.await;
508
509 System::current().stop();
510 };
511 actix_rt::spawn(fut);
512 });
513
514 sys.run().unwrap();
516
517 assert_eq!(count.load(Ordering::Relaxed), 3);
518 }
519}