actix/
sync.rs

1//! Sync Actors support
2//!
3//! Sync Actors are actors that run multiple instances on a thread pool.
4//! This is useful for CPU bound, or concurrent workloads. There can only be
5//! a single Sync Actor type on a `SyncArbiter`. This means you can't have
6//! Actor type A and B, sharing the same thread pool. You need to create two
7//! [`SyncArbiter`]s and have A and B spawn on unique `SyncArbiter`s respectively.
8//! For more information and examples, see `SyncArbiter`
9use std::{future::Future, pin::Pin, sync::Arc, task, task::Poll, thread};
10
11use actix_rt::System;
12use crossbeam_channel as cb_channel;
13use futures_core::stream::Stream;
14use log::warn;
15use tokio::sync::oneshot::Sender as SyncSender;
16
17use crate::{
18    actor::{Actor, ActorContext, ActorState, Running},
19    address::{
20        channel, Addr, AddressReceiver, AddressSenderProducer, Envelope, EnvelopeProxy, ToEnvelope,
21    },
22    context::Context,
23    handler::{Handler, Message, MessageResponse},
24};
25
26/// [`SyncArbiter`] provides the resources for a single Sync Actor to run on a dedicated
27/// thread or threads. This is generally used for CPU bound concurrent workloads. It's
28/// important to note, that the [`SyncArbiter`] generates a single address for the pool
29/// of hosted Sync Actors. Any message sent to this Address, will be operated on by
30/// a single Sync Actor from the pool.
31///
32/// Sync Actors have a different lifecycle compared to Actors on the System
33/// Arbiter. For more, see `SyncContext`.
34///
35/// # Examples
36///
37/// ```
38/// use actix::prelude::*;
39///
40/// struct Fibonacci(pub u32);
41///
42/// # impl Message for Fibonacci {
43/// #     type Result = Result<u64, ()>;
44/// # }
45///
46/// struct SyncActor;
47///
48/// impl Actor for SyncActor {
49///     // It's important to note that you use "SyncContext" here instead of "Context".
50///     type Context = SyncContext<Self>;
51/// }
52///
53/// impl Handler<Fibonacci> for SyncActor {
54///     type Result = Result<u64, ()>;
55///
56///     fn handle(&mut self, msg: Fibonacci, _: &mut Self::Context) -> Self::Result {
57///         if msg.0 == 0 {
58///             Err(())
59///         } else if msg.0 == 1 {
60///             Ok(1)
61///         } else {
62///             let mut i = 0;
63///             let mut sum = 0;
64///             let mut last = 0;
65///             let mut curr = 1;
66///             while i < msg.0 - 1 {
67///                 sum = last + curr;
68///                 last = curr;
69///                 curr = sum;
70///                 i += 1;
71///             }
72///             Ok(sum)
73///         }
74///     }
75/// }
76///
77/// fn main() {
78///     System::new().block_on(async {
79///         // Start the SyncArbiter with 2 threads, and receive the address of the Actor pool.
80///         let addr = SyncArbiter::start(2, || SyncActor);
81///
82///         // send 5 messages
83///         for n in 5..10 {
84///             // As there are 2 threads, there are at least 2 messages always being processed
85///             // concurrently by the SyncActor.
86///             addr.do_send(Fibonacci(n));
87///         }
88///
89/// #       System::current().stop();
90///     });
91/// }
92/// ```
93pub struct SyncArbiter<A>
94where
95    A: Actor<Context = SyncContext<A>>,
96{
97    queue: Option<cb_channel::Sender<Envelope<A>>>,
98    msgs: AddressReceiver<A>,
99}
100
101impl<A> SyncArbiter<A>
102where
103    A: Actor<Context = SyncContext<A>>,
104{
105    /// Start a new `SyncArbiter` with specified number of worker threads.
106    /// Returns a single address of the started actor. A single address is
107    /// used to communicate to the actor(s), and messages are handled by
108    /// the next available Actor in the `SyncArbiter`.
109    pub fn start<F>(threads: usize, factory: F) -> Addr<A>
110    where
111        F: Fn() -> A + Send + Sync + 'static,
112    {
113        Self::start_with_thread_builder(threads, thread::Builder::new, factory)
114    }
115
116    /// Start a new `SyncArbiter` with specified number of worker threads.
117    /// Each worker thread is spawned from the [`std::thread::Builder`]
118    /// returned by a new call to `thread_builder_factory`.
119    /// Returns a single address of the started actor. A single address is
120    /// used to communicate to the actor(s), and messages are handled by
121    /// the next available Actor in the `SyncArbiter`.
122    pub fn start_with_thread_builder<F, BF>(
123        threads: usize,
124        mut thread_builder_factory: BF,
125        factory: F,
126    ) -> Addr<A>
127    where
128        F: Fn() -> A + Send + Sync + 'static,
129        BF: FnMut() -> thread::Builder,
130    {
131        let factory = Arc::new(factory);
132        let (sender, receiver) = cb_channel::unbounded();
133        let (tx, rx) = channel::channel(0);
134
135        for _ in 0..threads {
136            let f = Arc::clone(&factory);
137            let sys = System::current();
138            let actor_queue = receiver.clone();
139            let inner_rx = rx.sender_producer();
140
141            thread_builder_factory()
142                .spawn(move || {
143                    System::set_current(sys);
144                    SyncContext::new(f, actor_queue, inner_rx).run();
145                })
146                .expect("failed to spawn thread");
147        }
148
149        System::current().arbiter().spawn(Self {
150            queue: Some(sender),
151            msgs: rx,
152        });
153
154        Addr::new(tx)
155    }
156}
157
158impl<A> Actor for SyncArbiter<A>
159where
160    A: Actor<Context = SyncContext<A>>,
161{
162    type Context = Context<Self>;
163}
164
165#[doc(hidden)]
166impl<A> Future for SyncArbiter<A>
167where
168    A: Actor<Context = SyncContext<A>>,
169{
170    type Output = ();
171
172    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
173        let this = self.get_mut();
174        loop {
175            match Pin::new(&mut this.msgs).poll_next(cx) {
176                Poll::Ready(Some(msg)) => {
177                    if let Some(ref queue) = this.queue {
178                        assert!(queue.send(msg).is_ok());
179                    }
180                }
181                Poll::Pending => break,
182                Poll::Ready(None) => unreachable!(),
183            }
184        }
185
186        // stop condition
187        if this.msgs.connected() {
188            Poll::Pending
189        } else {
190            // stop sync arbiters
191            this.queue = None;
192            Poll::Ready(())
193        }
194    }
195}
196
197impl<A, M> ToEnvelope<A, M> for SyncContext<A>
198where
199    A: Actor<Context = Self> + Handler<M>,
200    M: Message + Send + 'static,
201    M::Result: Send,
202{
203    fn pack(msg: M, tx: Option<SyncSender<M::Result>>) -> Envelope<A> {
204        Envelope::with_proxy(Box::new(SyncContextEnvelope::new(msg, tx)))
205    }
206}
207
208/// Sync actor execution context. This is used instead of impl Actor for your Actor
209/// instead of Context, if you intend this actor to run in a [`SyncArbiter`].
210///
211/// Unlike Context, an Actor that uses a [`SyncContext`] can not be stopped
212/// by calling `stop` or `terminate`: Instead, these trigger a restart of
213/// the Actor. Similar, returning `false` from `fn stopping` can not prevent
214/// the restart or termination of the Actor.
215///
216/// # Examples
217///
218/// ```
219/// use actix::prelude::*;
220///
221/// # struct Fibonacci(pub u32);
222///
223/// # impl Message for Fibonacci {
224/// #     type Result = Result<u64, ()>;
225/// # }
226///
227/// struct SyncActor;
228///
229/// impl Actor for SyncActor {
230///     // It's important to note that you use "SyncContext" here instead of "Context".
231///     type Context = SyncContext<Self>;
232/// }
233///
234/// # fn main() {
235/// # }
236/// ```
237pub struct SyncContext<A>
238where
239    A: Actor<Context = SyncContext<A>>,
240{
241    act: Option<A>,
242    queue: cb_channel::Receiver<Envelope<A>>,
243    stopping: bool,
244    state: ActorState,
245    factory: Arc<dyn Fn() -> A>,
246    address: AddressSenderProducer<A>,
247}
248
249impl<A> SyncContext<A>
250where
251    A: Actor<Context = Self>,
252{
253    fn new(
254        factory: Arc<dyn Fn() -> A>,
255        queue: cb_channel::Receiver<Envelope<A>>,
256        address: AddressSenderProducer<A>,
257    ) -> Self {
258        let act = factory();
259        Self {
260            queue,
261            factory,
262            act: Some(act),
263            stopping: false,
264            state: ActorState::Started,
265            address,
266        }
267    }
268
269    fn run(&mut self) {
270        let mut act = self.act.take().unwrap();
271
272        // started
273        A::started(&mut act, self);
274        self.state = ActorState::Running;
275
276        loop {
277            match self.queue.recv() {
278                Ok(mut env) => {
279                    env.handle(&mut act, self);
280                }
281                Err(_) => {
282                    self.state = ActorState::Stopping;
283                    if A::stopping(&mut act, self) != Running::Stop {
284                        warn!("stopping method is not supported for sync actors");
285                    }
286                    self.state = ActorState::Stopped;
287                    A::stopped(&mut act, self);
288                    return;
289                }
290            }
291
292            if self.stopping {
293                self.stopping = false;
294
295                // stop old actor
296                A::stopping(&mut act, self);
297                self.state = ActorState::Stopped;
298                A::stopped(&mut act, self);
299
300                // start new actor
301                self.state = ActorState::Started;
302                act = (*self.factory)();
303                A::started(&mut act, self);
304                self.state = ActorState::Running;
305            }
306        }
307    }
308
309    pub fn address(&self) -> Addr<A> {
310        Addr::new(self.address.sender())
311    }
312}
313
314impl<A> ActorContext for SyncContext<A>
315where
316    A: Actor<Context = Self>,
317{
318    /// Stop the current Actor. [`SyncContext`] will stop the existing Actor, and restart
319    /// a new Actor of the same type to replace it.
320    fn stop(&mut self) {
321        self.stopping = true;
322        self.state = ActorState::Stopping;
323    }
324
325    /// Terminate the current Actor. [`SyncContext`] will terminate the existing Actor, and restart
326    /// a new Actor of the same type to replace it.
327    fn terminate(&mut self) {
328        self.stopping = true;
329        self.state = ActorState::Stopping;
330    }
331
332    /// Get the Actor execution state.
333    fn state(&self) -> ActorState {
334        self.state
335    }
336}
337
338pub(crate) struct SyncContextEnvelope<M>
339where
340    M: Message + Send,
341{
342    msg: Option<M>,
343    tx: Option<SyncSender<M::Result>>,
344}
345
346impl<M> SyncContextEnvelope<M>
347where
348    M: Message + Send,
349    M::Result: Send,
350{
351    pub fn new(msg: M, tx: Option<SyncSender<M::Result>>) -> Self {
352        Self { tx, msg: Some(msg) }
353    }
354}
355
356impl<A, M> EnvelopeProxy<A> for SyncContextEnvelope<M>
357where
358    M: Message + Send + 'static,
359    M::Result: Send,
360    A: Actor<Context = SyncContext<A>> + Handler<M>,
361{
362    fn handle(&mut self, act: &mut A, ctx: &mut A::Context) {
363        let tx = self.tx.take();
364        if tx.is_some() && tx.as_ref().unwrap().is_closed() {
365            return;
366        }
367
368        if let Some(msg) = self.msg.take() {
369            <A as Handler<M>>::handle(act, msg, ctx).handle(ctx, tx)
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use tokio::sync::oneshot;
377
378    use crate::prelude::*;
379
380    struct SyncActor2;
381
382    impl Actor for SyncActor2 {
383        type Context = SyncContext<Self>;
384    }
385
386    struct SyncActor1(Addr<SyncActor2>);
387
388    impl Actor for SyncActor1 {
389        type Context = SyncContext<Self>;
390    }
391
392    impl SyncActor1 {
393        fn run() -> SyncActor1 {
394            SyncActor1(SyncArbiter::start(1, || SyncActor2))
395        }
396    }
397
398    struct Msg(oneshot::Sender<u8>);
399
400    impl Message for Msg {
401        type Result = ();
402    }
403
404    impl Handler<Msg> for SyncActor1 {
405        type Result = ();
406
407        fn handle(&mut self, msg: Msg, _: &mut Self::Context) -> Self::Result {
408            self.0.do_send(msg);
409        }
410    }
411
412    impl Handler<Msg> for SyncActor2 {
413        type Result = ();
414
415        fn handle(&mut self, msg: Msg, _: &mut Self::Context) -> Self::Result {
416            msg.0.send(233u8).unwrap();
417        }
418    }
419
420    #[test]
421    fn nested_sync_arbiters() {
422        System::new().block_on(async {
423            let addr = SyncArbiter::start(1, SyncActor1::run);
424            let (tx, rx) = oneshot::channel();
425            addr.send(Msg(tx)).await.unwrap();
426            assert_eq!(233u8, rx.await.unwrap());
427            System::current().stop();
428        })
429    }
430}