actix/
context_impl.rs

1use std::{
2    fmt,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use bitflags::bitflags;
9use futures_core::ready;
10use smallvec::SmallVec;
11
12use crate::{
13    actor::{Actor, ActorContext, ActorState, AsyncContext, Running, SpawnHandle, Supervised},
14    address::{Addr, AddressSenderProducer},
15    context_items::ActorWaitItem,
16    fut::ActorFuture,
17    mailbox::Mailbox,
18};
19
20bitflags! {
21    /// Internal context state.
22    #[derive(Debug)]
23    struct ContextFlags: u8 {
24        const STARTED =  0b0000_0001;
25        const RUNNING =  0b0000_0010;
26        const STOPPING = 0b0000_0100;
27        const STOPPED =  0b0001_0000;
28        const MB_CAP_CHANGED = 0b0010_0000;
29    }
30}
31
32type Item<A> = (SpawnHandle, Pin<Box<dyn ActorFuture<A, Output = ()>>>);
33
34pub trait AsyncContextParts<A>: ActorContext + AsyncContext<A>
35where
36    A: Actor<Context = Self>,
37{
38    fn parts(&mut self) -> &mut ContextParts<A>;
39}
40
41pub struct ContextParts<A>
42where
43    A: Actor,
44    A::Context: AsyncContext<A>,
45{
46    addr: AddressSenderProducer<A>,
47    flags: ContextFlags,
48    wait: SmallVec<[ActorWaitItem<A>; 2]>,
49    items: SmallVec<[Item<A>; 3]>,
50    handles: SmallVec<[SpawnHandle; 2]>,
51}
52
53impl<A> fmt::Debug for ContextParts<A>
54where
55    A: Actor,
56    A::Context: AsyncContext<A>,
57{
58    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59        fmt.debug_struct("ContextParts")
60            .field("flags", &self.flags)
61            .finish()
62    }
63}
64
65impl<A> ContextParts<A>
66where
67    A: Actor,
68    A::Context: AsyncContext<A>,
69{
70    #[inline]
71    /// Create new [`ContextParts`] instance
72    pub fn new(addr: AddressSenderProducer<A>) -> Self {
73        ContextParts {
74            addr,
75            flags: ContextFlags::RUNNING,
76            wait: SmallVec::new(),
77            items: SmallVec::new(),
78            handles: SmallVec::from_slice(&[SpawnHandle::default(), SpawnHandle::default()]),
79        }
80    }
81
82    #[inline]
83    /// Initiate stop process for actor execution
84    ///
85    /// Actor could prevent stopping by returning `false` from
86    /// `Actor::stopping()` method.
87    pub fn stop(&mut self) {
88        if self.flags.contains(ContextFlags::RUNNING) {
89            self.flags.remove(ContextFlags::RUNNING);
90            self.flags.insert(ContextFlags::STOPPING);
91        }
92    }
93
94    #[inline]
95    /// Terminate actor execution
96    pub fn terminate(&mut self) {
97        self.flags = ContextFlags::STOPPED;
98    }
99
100    #[inline]
101    /// Actor execution state
102    pub fn state(&self) -> ActorState {
103        if self.flags.contains(ContextFlags::RUNNING) {
104            ActorState::Running
105        } else if self.flags.contains(ContextFlags::STOPPED) {
106            ActorState::Stopped
107        } else if self.flags.contains(ContextFlags::STOPPING) {
108            ActorState::Stopping
109        } else {
110            ActorState::Started
111        }
112    }
113
114    #[inline]
115    /// Is context waiting for future completion
116    pub fn waiting(&self) -> bool {
117        !self.wait.is_empty()
118            || self
119                .flags
120                .intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
121    }
122
123    #[inline]
124    /// Handle of the running future
125    pub fn curr_handle(&self) -> SpawnHandle {
126        self.handles[1]
127    }
128
129    #[inline]
130    /// Spawn new future to this context.
131    pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
132    where
133        F: ActorFuture<A, Output = ()> + 'static,
134    {
135        let handle = self.handles[0].next();
136        self.handles[0] = handle;
137        let fut: Box<dyn ActorFuture<A, Output = ()>> = Box::new(fut);
138        self.items.push((handle, Pin::from(fut)));
139        handle
140    }
141
142    #[inline]
143    /// Spawn new future to this context and wait future completion.
144    ///
145    /// During wait period actor does not receive any messages.
146    pub fn wait<F>(&mut self, f: F)
147    where
148        F: ActorFuture<A, Output = ()> + 'static,
149    {
150        self.wait.push(ActorWaitItem::new(f));
151    }
152
153    #[inline]
154    /// Cancel previously scheduled future.
155    pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
156        self.handles.push(handle);
157        true
158    }
159
160    #[inline]
161    pub fn capacity(&mut self) -> usize {
162        self.addr.capacity()
163    }
164
165    #[inline]
166    pub fn set_mailbox_capacity(&mut self, cap: usize) {
167        self.flags.insert(ContextFlags::MB_CAP_CHANGED);
168        self.addr.set_capacity(cap);
169    }
170
171    #[inline]
172    pub fn address(&self) -> Addr<A> {
173        Addr::new(self.addr.sender())
174    }
175
176    /// Restarts this [`ContextParts`] by:
177    /// - canceling all the [`ActorFuture`]s spawned via [`ContextParts::spawn`];
178    /// - clearing the [`ContextParts::wait`] queue;
179    /// - changing the [`Actor`] state to [`ActorState::Running`].
180    #[inline]
181    pub(crate) fn restart(&mut self) {
182        self.flags = ContextFlags::RUNNING;
183        self.wait = SmallVec::new();
184        self.items = SmallVec::new();
185        self.handles[0] = SpawnHandle::default();
186    }
187
188    #[inline]
189    pub fn started(&mut self) -> bool {
190        self.flags.contains(ContextFlags::STARTED)
191    }
192
193    /// Are any senders connected
194    #[inline]
195    pub fn connected(&self) -> bool {
196        self.addr.connected()
197    }
198}
199
200pub struct ContextFut<A, C>
201where
202    C: AsyncContextParts<A> + Unpin,
203    A: Actor<Context = C>,
204{
205    ctx: C,
206    act: A,
207    mailbox: Mailbox<A>,
208    wait: SmallVec<[ActorWaitItem<A>; 2]>,
209    items: SmallVec<[Item<A>; 3]>,
210}
211
212impl<A, C> fmt::Debug for ContextFut<A, C>
213where
214    C: AsyncContextParts<A> + Unpin,
215    A: Actor<Context = C>,
216{
217    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
218        write!(fmt, "ContextFut {{ /* omitted */ }}")
219    }
220}
221
222impl<A, C> Drop for ContextFut<A, C>
223where
224    C: AsyncContextParts<A> + Unpin,
225    A: Actor<Context = C>,
226{
227    fn drop(&mut self) {
228        if self.alive() {
229            self.ctx.parts().stop();
230            let waker = futures_task::noop_waker();
231            let mut cx = std::task::Context::from_waker(&waker);
232            let _ = Pin::new(self).poll(&mut cx);
233        }
234    }
235}
236
237impl<A, C> ContextFut<A, C>
238where
239    C: AsyncContextParts<A> + Unpin,
240    A: Actor<Context = C>,
241{
242    pub fn new(ctx: C, act: A, mailbox: Mailbox<A>) -> Self {
243        ContextFut {
244            ctx,
245            act,
246            mailbox,
247            wait: SmallVec::new(),
248            items: SmallVec::new(),
249        }
250    }
251
252    #[inline]
253    pub fn ctx(&mut self) -> &mut C {
254        &mut self.ctx
255    }
256
257    #[inline]
258    pub fn address(&self) -> Addr<A> {
259        self.mailbox.address()
260    }
261
262    #[inline]
263    fn stopping(&mut self) -> bool {
264        self.ctx
265            .parts()
266            .flags
267            .intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
268    }
269
270    #[inline]
271    pub fn alive(&mut self) -> bool {
272        if self.ctx.parts().flags.contains(ContextFlags::STOPPED) {
273            false
274        } else {
275            !self.ctx.parts().flags.contains(ContextFlags::STARTED)
276                || self.mailbox.connected()
277                || !self.items.is_empty()
278                || !self.wait.is_empty()
279        }
280    }
281
282    /// Restarts the [`AsyncContext`] of this [`ContextFut`] returning whether the [`Context`] was
283    /// restarted.
284    ///
285    /// Restarting the [`Context`] means:
286    /// - canceling all the [`ActorFuture`]s spawned by the [`AsyncContext`];
287    /// - clearing the [`ActorFuture`] await queue of the [`AsyncContext`];
288    /// - changing the [`Actor`] state to [`ActorState::Running`];
289    /// - calling [`Supervised::restarting`] on the [`Actor`].
290    ///
291    /// Restart may fail only if the [`Mailbox`] is not [`connected`].
292    ///
293    /// [`connected`]: Mailbox::connected
294    #[inline]
295    pub fn restart(&mut self) -> bool
296    where
297        A: Supervised,
298    {
299        if self.mailbox.connected() {
300            self.wait = SmallVec::new();
301            self.items = SmallVec::new();
302            self.ctx.parts().restart();
303            self.act.restarting(&mut self.ctx);
304            true
305        } else {
306            false
307        }
308    }
309
310    fn merge(&mut self) -> bool {
311        let mut modified = false;
312
313        let parts = self.ctx.parts();
314        if !parts.wait.is_empty() {
315            modified = true;
316            self.wait.extend(parts.wait.drain(0..));
317        }
318        if !parts.items.is_empty() {
319            modified = true;
320            self.items.extend(parts.items.drain(0..));
321        }
322        //
323        if parts.flags.contains(ContextFlags::MB_CAP_CHANGED) {
324            modified = true;
325            parts.flags.remove(ContextFlags::MB_CAP_CHANGED);
326        }
327        if parts.handles.len() > 2 {
328            modified = true;
329        }
330
331        modified
332    }
333
334    fn clean_canceled_handle(&mut self) {
335        fn remove_item_by_handle<C>(
336            items: &mut SmallVec<[Item<C>; 3]>,
337            handle: &SpawnHandle,
338        ) -> bool {
339            let mut idx = 0;
340            let mut removed = false;
341            while idx < items.len() {
342                if &items[idx].0 == handle {
343                    items.swap_remove(idx);
344                    removed = true;
345                } else {
346                    idx += 1;
347                }
348            }
349            removed
350        }
351
352        while self.ctx.parts().handles.len() > 2 {
353            let handle = self.ctx.parts().handles.pop().unwrap();
354            // remove item from ContextFut.items in case associated item is already merged
355            if !remove_item_by_handle(&mut self.items, &handle) {
356                // item is not merged into ContextFut.items yet,
357                // so it should be in ContextParts.items
358                remove_item_by_handle(&mut self.ctx.parts().items, &handle);
359            }
360        }
361    }
362}
363
364#[doc(hidden)]
365impl<A, C> Future for ContextFut<A, C>
366where
367    C: AsyncContextParts<A> + Unpin,
368    A: Actor<Context = C>,
369{
370    type Output = ();
371
372    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373        let this = self.get_mut();
374
375        if !this.ctx.parts().flags.contains(ContextFlags::STARTED) {
376            this.ctx.parts().flags.insert(ContextFlags::STARTED);
377            Actor::started(&mut this.act, &mut this.ctx);
378
379            // check cancelled handles, just in case
380            if this.merge() {
381                this.clean_canceled_handle();
382            }
383        }
384
385        'outer: loop {
386            // check wait futures. order does matter
387            // ctx.wait() always add to the back of the list
388            // and we always have to check most recent future
389            while !this.wait.is_empty() && !this.stopping() {
390                let idx = this.wait.len() - 1;
391                let item = this.wait.last_mut().unwrap();
392                ready!(Pin::new(item).poll(&mut this.act, &mut this.ctx, cx));
393                this.wait.remove(idx);
394                this.merge();
395            }
396
397            // process mailbox
398            this.mailbox.poll(&mut this.act, &mut this.ctx, cx);
399            if !this.wait.is_empty() && !this.stopping() {
400                continue;
401            }
402
403            // process items
404            let mut idx = 0;
405            while idx < this.items.len() && !this.stopping() {
406                this.ctx.parts().handles[1] = this.items[idx].0;
407                match Pin::new(&mut this.items[idx].1).poll(&mut this.act, &mut this.ctx, cx) {
408                    Poll::Pending => {
409                        // got new waiting item. merge
410                        if this.ctx.waiting() {
411                            this.merge();
412                        }
413
414                        // check cancelled handles
415                        if this.ctx.parts().handles.len() > 2 {
416                            // this code is not very efficient, relaying on fact that
417                            // cancellation should be rear also number of futures
418                            // in actor context should be small
419                            this.clean_canceled_handle();
420
421                            continue 'outer;
422                        }
423
424                        // item scheduled wait future
425                        if !this.wait.is_empty() && !this.stopping() {
426                            // move current item to end of poll queue
427                            // otherwise it is possible that same item generate wait
428                            // future and prevents polling
429                            // of other items
430                            let next = this.items.len() - 1;
431                            if idx != next {
432                                this.items.swap(idx, next);
433                            }
434                            continue 'outer;
435                        } else {
436                            idx += 1;
437                        }
438                    }
439                    Poll::Ready(()) => {
440                        this.items.swap_remove(idx);
441
442                        // got new waiting item. merge
443                        if this.ctx.waiting() {
444                            this.merge();
445                        }
446
447                        // one of the items scheduled wait future
448                        if !this.wait.is_empty() && !this.stopping() {
449                            continue 'outer;
450                        }
451                    }
452                }
453            }
454            this.ctx.parts().handles[1] = SpawnHandle::default();
455
456            // merge returns true if context contains new items or handles to be cancelled
457            if this.merge() && !this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
458                // if we have no item to process, cancelled handles wouldn't be
459                // reaped in the above loop. this means this.merge() will never
460                // be false and the poll() never ends. so, discard the handles
461                // as we're sure there are no more items to be cancelled.
462                if this.items.is_empty() {
463                    this.ctx.parts().handles.truncate(2);
464                }
465                continue;
466            }
467
468            // check state
469            if this.ctx.parts().flags.contains(ContextFlags::RUNNING) {
470                // possible stop condition
471                if !this.alive() && Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
472                    this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
473                    Actor::stopped(&mut this.act, &mut this.ctx);
474                    return Poll::Ready(());
475                }
476            } else if this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
477                if Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
478                    this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
479                    Actor::stopped(&mut this.act, &mut this.ctx);
480                    return Poll::Ready(());
481                } else {
482                    this.ctx.parts().flags.remove(ContextFlags::STOPPING);
483                    this.ctx.parts().flags.insert(ContextFlags::RUNNING);
484                    continue;
485                }
486            } else if this.ctx.parts().flags.contains(ContextFlags::STOPPED) {
487                Actor::stopped(&mut this.act, &mut this.ctx);
488                return Poll::Ready(());
489            }
490
491            return Poll::Pending;
492        }
493    }
494}