1use std::{
2 fmt,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use bitflags::bitflags;
9use futures_core::ready;
10use smallvec::SmallVec;
11
12use crate::{
13 actor::{Actor, ActorContext, ActorState, AsyncContext, Running, SpawnHandle, Supervised},
14 address::{Addr, AddressSenderProducer},
15 context_items::ActorWaitItem,
16 fut::ActorFuture,
17 mailbox::Mailbox,
18};
19
20bitflags! {
21 #[derive(Debug)]
23 struct ContextFlags: u8 {
24 const STARTED = 0b0000_0001;
25 const RUNNING = 0b0000_0010;
26 const STOPPING = 0b0000_0100;
27 const STOPPED = 0b0001_0000;
28 const MB_CAP_CHANGED = 0b0010_0000;
29 }
30}
31
32type Item<A> = (SpawnHandle, Pin<Box<dyn ActorFuture<A, Output = ()>>>);
33
34pub trait AsyncContextParts<A>: ActorContext + AsyncContext<A>
35where
36 A: Actor<Context = Self>,
37{
38 fn parts(&mut self) -> &mut ContextParts<A>;
39}
40
41pub struct ContextParts<A>
42where
43 A: Actor,
44 A::Context: AsyncContext<A>,
45{
46 addr: AddressSenderProducer<A>,
47 flags: ContextFlags,
48 wait: SmallVec<[ActorWaitItem<A>; 2]>,
49 items: SmallVec<[Item<A>; 3]>,
50 handles: SmallVec<[SpawnHandle; 2]>,
51}
52
53impl<A> fmt::Debug for ContextParts<A>
54where
55 A: Actor,
56 A::Context: AsyncContext<A>,
57{
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt.debug_struct("ContextParts")
60 .field("flags", &self.flags)
61 .finish()
62 }
63}
64
65impl<A> ContextParts<A>
66where
67 A: Actor,
68 A::Context: AsyncContext<A>,
69{
70 #[inline]
71 pub fn new(addr: AddressSenderProducer<A>) -> Self {
73 ContextParts {
74 addr,
75 flags: ContextFlags::RUNNING,
76 wait: SmallVec::new(),
77 items: SmallVec::new(),
78 handles: SmallVec::from_slice(&[SpawnHandle::default(), SpawnHandle::default()]),
79 }
80 }
81
82 #[inline]
83 pub fn stop(&mut self) {
88 if self.flags.contains(ContextFlags::RUNNING) {
89 self.flags.remove(ContextFlags::RUNNING);
90 self.flags.insert(ContextFlags::STOPPING);
91 }
92 }
93
94 #[inline]
95 pub fn terminate(&mut self) {
97 self.flags = ContextFlags::STOPPED;
98 }
99
100 #[inline]
101 pub fn state(&self) -> ActorState {
103 if self.flags.contains(ContextFlags::RUNNING) {
104 ActorState::Running
105 } else if self.flags.contains(ContextFlags::STOPPED) {
106 ActorState::Stopped
107 } else if self.flags.contains(ContextFlags::STOPPING) {
108 ActorState::Stopping
109 } else {
110 ActorState::Started
111 }
112 }
113
114 #[inline]
115 pub fn waiting(&self) -> bool {
117 !self.wait.is_empty()
118 || self
119 .flags
120 .intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
121 }
122
123 #[inline]
124 pub fn curr_handle(&self) -> SpawnHandle {
126 self.handles[1]
127 }
128
129 #[inline]
130 pub fn spawn<F>(&mut self, fut: F) -> SpawnHandle
132 where
133 F: ActorFuture<A, Output = ()> + 'static,
134 {
135 let handle = self.handles[0].next();
136 self.handles[0] = handle;
137 let fut: Box<dyn ActorFuture<A, Output = ()>> = Box::new(fut);
138 self.items.push((handle, Pin::from(fut)));
139 handle
140 }
141
142 #[inline]
143 pub fn wait<F>(&mut self, f: F)
147 where
148 F: ActorFuture<A, Output = ()> + 'static,
149 {
150 self.wait.push(ActorWaitItem::new(f));
151 }
152
153 #[inline]
154 pub fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
156 self.handles.push(handle);
157 true
158 }
159
160 #[inline]
161 pub fn capacity(&mut self) -> usize {
162 self.addr.capacity()
163 }
164
165 #[inline]
166 pub fn set_mailbox_capacity(&mut self, cap: usize) {
167 self.flags.insert(ContextFlags::MB_CAP_CHANGED);
168 self.addr.set_capacity(cap);
169 }
170
171 #[inline]
172 pub fn address(&self) -> Addr<A> {
173 Addr::new(self.addr.sender())
174 }
175
176 #[inline]
181 pub(crate) fn restart(&mut self) {
182 self.flags = ContextFlags::RUNNING;
183 self.wait = SmallVec::new();
184 self.items = SmallVec::new();
185 self.handles[0] = SpawnHandle::default();
186 }
187
188 #[inline]
189 pub fn started(&mut self) -> bool {
190 self.flags.contains(ContextFlags::STARTED)
191 }
192
193 #[inline]
195 pub fn connected(&self) -> bool {
196 self.addr.connected()
197 }
198}
199
200pub struct ContextFut<A, C>
201where
202 C: AsyncContextParts<A> + Unpin,
203 A: Actor<Context = C>,
204{
205 ctx: C,
206 act: A,
207 mailbox: Mailbox<A>,
208 wait: SmallVec<[ActorWaitItem<A>; 2]>,
209 items: SmallVec<[Item<A>; 3]>,
210}
211
212impl<A, C> fmt::Debug for ContextFut<A, C>
213where
214 C: AsyncContextParts<A> + Unpin,
215 A: Actor<Context = C>,
216{
217 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
218 write!(fmt, "ContextFut {{ /* omitted */ }}")
219 }
220}
221
222impl<A, C> Drop for ContextFut<A, C>
223where
224 C: AsyncContextParts<A> + Unpin,
225 A: Actor<Context = C>,
226{
227 fn drop(&mut self) {
228 if self.alive() {
229 self.ctx.parts().stop();
230 let waker = futures_task::noop_waker();
231 let mut cx = std::task::Context::from_waker(&waker);
232 let _ = Pin::new(self).poll(&mut cx);
233 }
234 }
235}
236
237impl<A, C> ContextFut<A, C>
238where
239 C: AsyncContextParts<A> + Unpin,
240 A: Actor<Context = C>,
241{
242 pub fn new(ctx: C, act: A, mailbox: Mailbox<A>) -> Self {
243 ContextFut {
244 ctx,
245 act,
246 mailbox,
247 wait: SmallVec::new(),
248 items: SmallVec::new(),
249 }
250 }
251
252 #[inline]
253 pub fn ctx(&mut self) -> &mut C {
254 &mut self.ctx
255 }
256
257 #[inline]
258 pub fn address(&self) -> Addr<A> {
259 self.mailbox.address()
260 }
261
262 #[inline]
263 fn stopping(&mut self) -> bool {
264 self.ctx
265 .parts()
266 .flags
267 .intersects(ContextFlags::STOPPING | ContextFlags::STOPPED)
268 }
269
270 #[inline]
271 pub fn alive(&mut self) -> bool {
272 if self.ctx.parts().flags.contains(ContextFlags::STOPPED) {
273 false
274 } else {
275 !self.ctx.parts().flags.contains(ContextFlags::STARTED)
276 || self.mailbox.connected()
277 || !self.items.is_empty()
278 || !self.wait.is_empty()
279 }
280 }
281
282 #[inline]
295 pub fn restart(&mut self) -> bool
296 where
297 A: Supervised,
298 {
299 if self.mailbox.connected() {
300 self.wait = SmallVec::new();
301 self.items = SmallVec::new();
302 self.ctx.parts().restart();
303 self.act.restarting(&mut self.ctx);
304 true
305 } else {
306 false
307 }
308 }
309
310 fn merge(&mut self) -> bool {
311 let mut modified = false;
312
313 let parts = self.ctx.parts();
314 if !parts.wait.is_empty() {
315 modified = true;
316 self.wait.extend(parts.wait.drain(0..));
317 }
318 if !parts.items.is_empty() {
319 modified = true;
320 self.items.extend(parts.items.drain(0..));
321 }
322 if parts.flags.contains(ContextFlags::MB_CAP_CHANGED) {
324 modified = true;
325 parts.flags.remove(ContextFlags::MB_CAP_CHANGED);
326 }
327 if parts.handles.len() > 2 {
328 modified = true;
329 }
330
331 modified
332 }
333
334 fn clean_canceled_handle(&mut self) {
335 fn remove_item_by_handle<C>(
336 items: &mut SmallVec<[Item<C>; 3]>,
337 handle: &SpawnHandle,
338 ) -> bool {
339 let mut idx = 0;
340 let mut removed = false;
341 while idx < items.len() {
342 if &items[idx].0 == handle {
343 items.swap_remove(idx);
344 removed = true;
345 } else {
346 idx += 1;
347 }
348 }
349 removed
350 }
351
352 while self.ctx.parts().handles.len() > 2 {
353 let handle = self.ctx.parts().handles.pop().unwrap();
354 if !remove_item_by_handle(&mut self.items, &handle) {
356 remove_item_by_handle(&mut self.ctx.parts().items, &handle);
359 }
360 }
361 }
362}
363
364#[doc(hidden)]
365impl<A, C> Future for ContextFut<A, C>
366where
367 C: AsyncContextParts<A> + Unpin,
368 A: Actor<Context = C>,
369{
370 type Output = ();
371
372 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373 let this = self.get_mut();
374
375 if !this.ctx.parts().flags.contains(ContextFlags::STARTED) {
376 this.ctx.parts().flags.insert(ContextFlags::STARTED);
377 Actor::started(&mut this.act, &mut this.ctx);
378
379 if this.merge() {
381 this.clean_canceled_handle();
382 }
383 }
384
385 'outer: loop {
386 while !this.wait.is_empty() && !this.stopping() {
390 let idx = this.wait.len() - 1;
391 let item = this.wait.last_mut().unwrap();
392 ready!(Pin::new(item).poll(&mut this.act, &mut this.ctx, cx));
393 this.wait.remove(idx);
394 this.merge();
395 }
396
397 this.mailbox.poll(&mut this.act, &mut this.ctx, cx);
399 if !this.wait.is_empty() && !this.stopping() {
400 continue;
401 }
402
403 let mut idx = 0;
405 while idx < this.items.len() && !this.stopping() {
406 this.ctx.parts().handles[1] = this.items[idx].0;
407 match Pin::new(&mut this.items[idx].1).poll(&mut this.act, &mut this.ctx, cx) {
408 Poll::Pending => {
409 if this.ctx.waiting() {
411 this.merge();
412 }
413
414 if this.ctx.parts().handles.len() > 2 {
416 this.clean_canceled_handle();
420
421 continue 'outer;
422 }
423
424 if !this.wait.is_empty() && !this.stopping() {
426 let next = this.items.len() - 1;
431 if idx != next {
432 this.items.swap(idx, next);
433 }
434 continue 'outer;
435 } else {
436 idx += 1;
437 }
438 }
439 Poll::Ready(()) => {
440 this.items.swap_remove(idx);
441
442 if this.ctx.waiting() {
444 this.merge();
445 }
446
447 if !this.wait.is_empty() && !this.stopping() {
449 continue 'outer;
450 }
451 }
452 }
453 }
454 this.ctx.parts().handles[1] = SpawnHandle::default();
455
456 if this.merge() && !this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
458 if this.items.is_empty() {
463 this.ctx.parts().handles.truncate(2);
464 }
465 continue;
466 }
467
468 if this.ctx.parts().flags.contains(ContextFlags::RUNNING) {
470 if !this.alive() && Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
472 this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
473 Actor::stopped(&mut this.act, &mut this.ctx);
474 return Poll::Ready(());
475 }
476 } else if this.ctx.parts().flags.contains(ContextFlags::STOPPING) {
477 if Actor::stopping(&mut this.act, &mut this.ctx) == Running::Stop {
478 this.ctx.parts().flags = ContextFlags::STOPPED | ContextFlags::STARTED;
479 Actor::stopped(&mut this.act, &mut this.ctx);
480 return Poll::Ready(());
481 } else {
482 this.ctx.parts().flags.remove(ContextFlags::STOPPING);
483 this.ctx.parts().flags.insert(ContextFlags::RUNNING);
484 continue;
485 }
486 } else if this.ctx.parts().flags.contains(ContextFlags::STOPPED) {
487 Actor::stopped(&mut this.act, &mut this.ctx);
488 return Poll::Ready(());
489 }
490
491 return Poll::Pending;
492 }
493 }
494}