actix/
registry.rs

1//! Actors registry
2//!
3//! An Actor can register itself as a service. A Service can be defined as an
4//! `ArbiterService`, which is unique per arbiter, or a `SystemService`, which
5//! is unique per system.
6use std::{
7    any::{Any, TypeId},
8    cell::RefCell,
9    collections::HashMap,
10    rc::Rc,
11};
12
13use actix_rt::{ArbiterHandle, System};
14use once_cell::sync::Lazy;
15use parking_lot::Mutex;
16
17use crate::{
18    actor::{Actor, Supervised},
19    address::Addr,
20    context::Context,
21    supervisor::Supervisor,
22};
23
24type AnyMap = HashMap<TypeId, Box<dyn Any>>;
25
26/// Actors registry
27///
28/// An Actor can register itself as a service. A Service can be defined as an
29/// `ArbiterService`, which is unique per arbiter, or a `SystemService`, which
30/// is unique per system.
31///
32/// If an arbiter service is used outside of a running arbiter, it panics.
33///
34/// # Examples
35///
36/// ```
37/// use actix::prelude::*;
38///
39/// #[derive(Message)]
40/// #[rtype(result = "()")]
41/// struct Ping;
42///
43/// #[derive(Default)]
44/// struct MyActor1;
45///
46/// impl Actor for MyActor1 {
47///     type Context = Context<Self>;
48/// }
49/// impl actix::Supervised for MyActor1 {}
50///
51/// impl ArbiterService for MyActor1 {
52///    fn service_started(&mut self, ctx: &mut Context<Self>) {
53///       println!("Service started");
54///    }
55/// }
56///
57/// impl Handler<Ping> for MyActor1 {
58///    type Result = ();
59///
60///    fn handle(&mut self, _: Ping, ctx: &mut Context<Self>) {
61///       println!("ping");
62/// #     System::current().stop();
63///    }
64/// }
65///
66/// struct MyActor2;
67///
68/// impl Actor for MyActor2 {
69///    type Context = Context<Self>;
70///
71///    fn started(&mut self, _: &mut Context<Self>) {
72///       // get MyActor1 address from the registry
73///       let act = MyActor1::from_registry();
74///       act.do_send(Ping);
75///    }
76/// }
77///
78/// #[actix::main]
79/// async fn main() {
80///     // Start MyActor2 in new Arbiter
81///     Arbiter::new().spawn_fn(|| {
82///         MyActor2.start();
83///     });
84///     # System::current().stop();
85/// }
86/// ```
87#[derive(Clone)]
88pub struct Registry {
89    registry: Rc<RefCell<AnyMap>>,
90}
91
92thread_local! {
93    static AREG: Registry = {
94        Registry {
95            registry: Rc::new(RefCell::new(AnyMap::new()))
96        }
97    };
98}
99
100/// Trait defines arbiter's service.
101#[allow(unused_variables)]
102pub trait ArbiterService: Actor<Context = Context<Self>> + Supervised + Default {
103    /// Construct and start arbiter service
104    fn start_service() -> Addr<Self> {
105        Supervisor::start(|ctx| {
106            let mut act = Self::default();
107            act.service_started(ctx);
108            act
109        })
110    }
111
112    /// Method is called during service initialization.
113    fn service_started(&mut self, ctx: &mut Context<Self>) {}
114
115    /// Get actor's address from arbiter registry
116    fn from_registry() -> Addr<Self> {
117        AREG.with(|reg| reg.get_or_start_default())
118    }
119}
120
121impl Registry {
122    /// Queries registry for a type of actor (`A`), returning its address.
123    ///
124    /// If actor is not registered, a new actor is started and its address is returned.
125    #[deprecated(since = "0.13.5", note = "Renamed to `get_or_start_default()`.")]
126    #[inline]
127    pub fn get<A: ArbiterService + Actor<Context = Context<A>>>(&self) -> Addr<A> {
128        self.get_or_start_default()
129    }
130
131    /// Queries registry for an actor, returning its address.
132    ///
133    /// If actor of type `A` is not registered, a new actor is started and its address is returned.
134    pub fn get_or_start_default<A: ArbiterService + Actor<Context = Context<A>>>(&self) -> Addr<A> {
135        let addr = self.try_get::<A>().unwrap_or_else(A::start_service);
136
137        self.registry
138            .borrow_mut()
139            .insert(TypeId::of::<A>(), Box::new(addr.clone()));
140
141        addr
142    }
143
144    /// Queries registry for specific actor, returning its address.
145    ///
146    /// Returns `None` if an actor of type `A` is not registered.
147    pub fn try_get<A: Actor<Context = Context<A>>>(&self) -> Option<Addr<A>> {
148        let id = TypeId::of::<A>();
149
150        if let Some(addr) = self.registry.borrow().get(&id) {
151            if let Some(addr) = addr.downcast_ref::<Addr<A>>() {
152                return Some(addr.clone());
153            }
154        }
155
156        None
157    }
158
159    /// Returns actor's address if it is in the registry.
160    pub fn query<A: Actor<Context = Context<A>>>(&self) -> Option<Addr<A>> {
161        let id = TypeId::of::<A>();
162
163        if let Some(addr) = self.registry.borrow().get(&id) {
164            if let Some(addr) = addr.downcast_ref::<Addr<A>>() {
165                return Some(addr.clone());
166            }
167        }
168
169        None
170    }
171
172    /// Adds an unregistered actor type to the registry by address.
173    ///
174    /// # Panics
175    ///
176    /// Panics if actor is already running
177    pub fn set<A: Actor<Context = Context<A>>>(addr: Addr<A>) {
178        AREG.with(|reg| {
179            let id = TypeId::of::<A>();
180            if let Some(addr) = reg.registry.borrow().get(&id) {
181                if addr.downcast_ref::<Addr<A>>().is_some() {
182                    panic!("Actor already started");
183                }
184            }
185
186            reg.registry.borrow_mut().insert(id, Box::new(addr));
187        })
188    }
189}
190
191/// System wide actors registry
192///
193/// System registry serves same purpose as [Registry](struct.Registry.html),
194/// except it is shared across all arbiters.
195///
196/// # Examples
197///
198/// ```
199/// use actix::prelude::*;
200///
201/// #[derive(Message)]
202/// #[rtype(result = "()")]
203/// struct Ping;
204///
205/// #[derive(Default)]
206/// struct MyActor1;
207///
208/// impl Actor for MyActor1 {
209///     type Context = Context<Self>;
210/// }
211/// impl actix::Supervised for MyActor1 {}
212///
213/// impl SystemService for MyActor1 {
214///     fn service_started(&mut self, ctx: &mut Context<Self>) {
215///         println!("Service started");
216///     }
217/// }
218///
219/// impl Handler<Ping> for MyActor1 {
220///     type Result = ();
221///
222///     fn handle(&mut self, _: Ping, ctx: &mut Context<Self>) {
223///         println!("ping");
224/// #       System::current().stop();
225///     }
226/// }
227///
228/// struct MyActor2;
229///
230/// impl Actor for MyActor2 {
231///     type Context = Context<Self>;
232///
233///     fn started(&mut self, _: &mut Context<Self>) {
234///         let act = MyActor1::from_registry();
235///         act.do_send(Ping);
236///     }
237/// }
238///
239/// #[actix::main]
240/// async fn main() {
241///     // Start MyActor2
242///     let addr = MyActor2.start();
243/// }
244/// ```
245#[derive(Debug)]
246pub struct SystemRegistry {
247    system: ArbiterHandle,
248    registry: HashMap<TypeId, Box<dyn Any + Send>>,
249}
250
251static SREG: Lazy<Mutex<HashMap<usize, SystemRegistry>>> = Lazy::new(|| Mutex::new(HashMap::new()));
252
253/// Trait defines system's service.
254#[allow(unused_variables)]
255pub trait SystemService: Actor<Context = Context<Self>> + Supervised + Default {
256    /// Construct and start system service
257    fn start_service(wrk: &ArbiterHandle) -> Addr<Self> {
258        Supervisor::start_in_arbiter(wrk, |ctx| {
259            let mut act = Self::default();
260            act.service_started(ctx);
261            act
262        })
263    }
264
265    /// Method is called during service initialization.
266    fn service_started(&mut self, ctx: &mut Context<Self>) {}
267
268    /// Get actor's address from system registry
269    fn from_registry() -> Addr<Self> {
270        let sys = System::current();
271
272        let mut sreg = SREG.lock();
273        let reg = sreg
274            .entry(sys.id())
275            .or_insert_with(|| SystemRegistry::new(sys.arbiter().clone()));
276
277        if let Some(addr) = reg.registry.get(&TypeId::of::<Self>()) {
278            if let Some(addr) = addr.downcast_ref::<Addr<Self>>() {
279                return addr.clone();
280            }
281        }
282
283        let addr = Self::start_service(System::current().arbiter());
284        reg.registry
285            .insert(TypeId::of::<Self>(), Box::new(addr.clone()));
286        addr
287    }
288}
289
290impl SystemRegistry {
291    pub(crate) fn new(system: ArbiterHandle) -> Self {
292        Self {
293            system,
294            registry: HashMap::default(),
295        }
296    }
297
298    /// Return address of the service. If service actor is not running
299    /// it get started in the system.
300    pub fn get<A: SystemService + Actor<Context = Context<A>>>(&mut self) -> Addr<A> {
301        if let Some(addr) = self.registry.get(&TypeId::of::<A>()) {
302            match addr.downcast_ref::<Addr<A>>() {
303                Some(addr) => return addr.clone(),
304                None => panic!("Got unknown value: {:?}", addr),
305            }
306        }
307
308        let addr = A::start_service(&self.system);
309        self.registry
310            .insert(TypeId::of::<A>(), Box::new(addr.clone()));
311        addr
312    }
313
314    /// Check if actor is in registry, if so, return its address
315    pub fn query<A: SystemService + Actor<Context = Context<A>>>(&self) -> Option<Addr<A>> {
316        if let Some(addr) = self.registry.get(&TypeId::of::<A>()) {
317            match addr.downcast_ref::<Addr<A>>() {
318                Some(addr) => return Some(addr.clone()),
319                None => return None,
320            }
321        }
322
323        None
324    }
325
326    /// Add new actor to the registry by address, panic if actor is already running
327    pub fn set<A: SystemService + Actor<Context = Context<A>>>(addr: Addr<A>) {
328        let sys = System::current();
329
330        let mut sreg = SREG.lock();
331        let reg = sreg
332            .entry(sys.id())
333            .or_insert_with(|| SystemRegistry::new(sys.arbiter().clone()));
334
335        if let Some(addr) = reg.registry.get(&TypeId::of::<A>()) {
336            if addr.downcast_ref::<Addr<A>>().is_some() {
337                panic!("Actor already started");
338            }
339        }
340
341        reg.registry.insert(TypeId::of::<A>(), Box::new(addr));
342    }
343}