actix/registry.rs
1//! Actors registry
2//!
3//! An Actor can register itself as a service. A Service can be defined as an
4//! `ArbiterService`, which is unique per arbiter, or a `SystemService`, which
5//! is unique per system.
6use std::{
7 any::{Any, TypeId},
8 cell::RefCell,
9 collections::HashMap,
10 rc::Rc,
11};
12
13use actix_rt::{ArbiterHandle, System};
14use once_cell::sync::Lazy;
15use parking_lot::Mutex;
16
17use crate::{
18 actor::{Actor, Supervised},
19 address::Addr,
20 context::Context,
21 supervisor::Supervisor,
22};
23
24type AnyMap = HashMap<TypeId, Box<dyn Any>>;
25
26/// Actors registry
27///
28/// An Actor can register itself as a service. A Service can be defined as an
29/// `ArbiterService`, which is unique per arbiter, or a `SystemService`, which
30/// is unique per system.
31///
32/// If an arbiter service is used outside of a running arbiter, it panics.
33///
34/// # Examples
35///
36/// ```
37/// use actix::prelude::*;
38///
39/// #[derive(Message)]
40/// #[rtype(result = "()")]
41/// struct Ping;
42///
43/// #[derive(Default)]
44/// struct MyActor1;
45///
46/// impl Actor for MyActor1 {
47/// type Context = Context<Self>;
48/// }
49/// impl actix::Supervised for MyActor1 {}
50///
51/// impl ArbiterService for MyActor1 {
52/// fn service_started(&mut self, ctx: &mut Context<Self>) {
53/// println!("Service started");
54/// }
55/// }
56///
57/// impl Handler<Ping> for MyActor1 {
58/// type Result = ();
59///
60/// fn handle(&mut self, _: Ping, ctx: &mut Context<Self>) {
61/// println!("ping");
62/// # System::current().stop();
63/// }
64/// }
65///
66/// struct MyActor2;
67///
68/// impl Actor for MyActor2 {
69/// type Context = Context<Self>;
70///
71/// fn started(&mut self, _: &mut Context<Self>) {
72/// // get MyActor1 address from the registry
73/// let act = MyActor1::from_registry();
74/// act.do_send(Ping);
75/// }
76/// }
77///
78/// #[actix::main]
79/// async fn main() {
80/// // Start MyActor2 in new Arbiter
81/// Arbiter::new().spawn_fn(|| {
82/// MyActor2.start();
83/// });
84/// # System::current().stop();
85/// }
86/// ```
87#[derive(Clone)]
88pub struct Registry {
89 registry: Rc<RefCell<AnyMap>>,
90}
91
92thread_local! {
93 static AREG: Registry = {
94 Registry {
95 registry: Rc::new(RefCell::new(AnyMap::new()))
96 }
97 };
98}
99
100/// Trait defines arbiter's service.
101#[allow(unused_variables)]
102pub trait ArbiterService: Actor<Context = Context<Self>> + Supervised + Default {
103 /// Construct and start arbiter service
104 fn start_service() -> Addr<Self> {
105 Supervisor::start(|ctx| {
106 let mut act = Self::default();
107 act.service_started(ctx);
108 act
109 })
110 }
111
112 /// Method is called during service initialization.
113 fn service_started(&mut self, ctx: &mut Context<Self>) {}
114
115 /// Get actor's address from arbiter registry
116 fn from_registry() -> Addr<Self> {
117 AREG.with(|reg| reg.get_or_start_default())
118 }
119}
120
121impl Registry {
122 /// Queries registry for a type of actor (`A`), returning its address.
123 ///
124 /// If actor is not registered, a new actor is started and its address is returned.
125 #[deprecated(since = "0.13.5", note = "Renamed to `get_or_start_default()`.")]
126 #[inline]
127 pub fn get<A: ArbiterService + Actor<Context = Context<A>>>(&self) -> Addr<A> {
128 self.get_or_start_default()
129 }
130
131 /// Queries registry for an actor, returning its address.
132 ///
133 /// If actor of type `A` is not registered, a new actor is started and its address is returned.
134 pub fn get_or_start_default<A: ArbiterService + Actor<Context = Context<A>>>(&self) -> Addr<A> {
135 let addr = self.try_get::<A>().unwrap_or_else(A::start_service);
136
137 self.registry
138 .borrow_mut()
139 .insert(TypeId::of::<A>(), Box::new(addr.clone()));
140
141 addr
142 }
143
144 /// Queries registry for specific actor, returning its address.
145 ///
146 /// Returns `None` if an actor of type `A` is not registered.
147 pub fn try_get<A: Actor<Context = Context<A>>>(&self) -> Option<Addr<A>> {
148 let id = TypeId::of::<A>();
149
150 if let Some(addr) = self.registry.borrow().get(&id) {
151 if let Some(addr) = addr.downcast_ref::<Addr<A>>() {
152 return Some(addr.clone());
153 }
154 }
155
156 None
157 }
158
159 /// Returns actor's address if it is in the registry.
160 pub fn query<A: Actor<Context = Context<A>>>(&self) -> Option<Addr<A>> {
161 let id = TypeId::of::<A>();
162
163 if let Some(addr) = self.registry.borrow().get(&id) {
164 if let Some(addr) = addr.downcast_ref::<Addr<A>>() {
165 return Some(addr.clone());
166 }
167 }
168
169 None
170 }
171
172 /// Adds an unregistered actor type to the registry by address.
173 ///
174 /// # Panics
175 ///
176 /// Panics if actor is already running
177 pub fn set<A: Actor<Context = Context<A>>>(addr: Addr<A>) {
178 AREG.with(|reg| {
179 let id = TypeId::of::<A>();
180 if let Some(addr) = reg.registry.borrow().get(&id) {
181 if addr.downcast_ref::<Addr<A>>().is_some() {
182 panic!("Actor already started");
183 }
184 }
185
186 reg.registry.borrow_mut().insert(id, Box::new(addr));
187 })
188 }
189}
190
191/// System wide actors registry
192///
193/// System registry serves same purpose as [Registry](struct.Registry.html),
194/// except it is shared across all arbiters.
195///
196/// # Examples
197///
198/// ```
199/// use actix::prelude::*;
200///
201/// #[derive(Message)]
202/// #[rtype(result = "()")]
203/// struct Ping;
204///
205/// #[derive(Default)]
206/// struct MyActor1;
207///
208/// impl Actor for MyActor1 {
209/// type Context = Context<Self>;
210/// }
211/// impl actix::Supervised for MyActor1 {}
212///
213/// impl SystemService for MyActor1 {
214/// fn service_started(&mut self, ctx: &mut Context<Self>) {
215/// println!("Service started");
216/// }
217/// }
218///
219/// impl Handler<Ping> for MyActor1 {
220/// type Result = ();
221///
222/// fn handle(&mut self, _: Ping, ctx: &mut Context<Self>) {
223/// println!("ping");
224/// # System::current().stop();
225/// }
226/// }
227///
228/// struct MyActor2;
229///
230/// impl Actor for MyActor2 {
231/// type Context = Context<Self>;
232///
233/// fn started(&mut self, _: &mut Context<Self>) {
234/// let act = MyActor1::from_registry();
235/// act.do_send(Ping);
236/// }
237/// }
238///
239/// #[actix::main]
240/// async fn main() {
241/// // Start MyActor2
242/// let addr = MyActor2.start();
243/// }
244/// ```
245#[derive(Debug)]
246pub struct SystemRegistry {
247 system: ArbiterHandle,
248 registry: HashMap<TypeId, Box<dyn Any + Send>>,
249}
250
251static SREG: Lazy<Mutex<HashMap<usize, SystemRegistry>>> = Lazy::new(|| Mutex::new(HashMap::new()));
252
253/// Trait defines system's service.
254#[allow(unused_variables)]
255pub trait SystemService: Actor<Context = Context<Self>> + Supervised + Default {
256 /// Construct and start system service
257 fn start_service(wrk: &ArbiterHandle) -> Addr<Self> {
258 Supervisor::start_in_arbiter(wrk, |ctx| {
259 let mut act = Self::default();
260 act.service_started(ctx);
261 act
262 })
263 }
264
265 /// Method is called during service initialization.
266 fn service_started(&mut self, ctx: &mut Context<Self>) {}
267
268 /// Get actor's address from system registry
269 fn from_registry() -> Addr<Self> {
270 let sys = System::current();
271
272 let mut sreg = SREG.lock();
273 let reg = sreg
274 .entry(sys.id())
275 .or_insert_with(|| SystemRegistry::new(sys.arbiter().clone()));
276
277 if let Some(addr) = reg.registry.get(&TypeId::of::<Self>()) {
278 if let Some(addr) = addr.downcast_ref::<Addr<Self>>() {
279 return addr.clone();
280 }
281 }
282
283 let addr = Self::start_service(System::current().arbiter());
284 reg.registry
285 .insert(TypeId::of::<Self>(), Box::new(addr.clone()));
286 addr
287 }
288}
289
290impl SystemRegistry {
291 pub(crate) fn new(system: ArbiterHandle) -> Self {
292 Self {
293 system,
294 registry: HashMap::default(),
295 }
296 }
297
298 /// Return address of the service. If service actor is not running
299 /// it get started in the system.
300 pub fn get<A: SystemService + Actor<Context = Context<A>>>(&mut self) -> Addr<A> {
301 if let Some(addr) = self.registry.get(&TypeId::of::<A>()) {
302 match addr.downcast_ref::<Addr<A>>() {
303 Some(addr) => return addr.clone(),
304 None => panic!("Got unknown value: {:?}", addr),
305 }
306 }
307
308 let addr = A::start_service(&self.system);
309 self.registry
310 .insert(TypeId::of::<A>(), Box::new(addr.clone()));
311 addr
312 }
313
314 /// Check if actor is in registry, if so, return its address
315 pub fn query<A: SystemService + Actor<Context = Context<A>>>(&self) -> Option<Addr<A>> {
316 if let Some(addr) = self.registry.get(&TypeId::of::<A>()) {
317 match addr.downcast_ref::<Addr<A>>() {
318 Some(addr) => return Some(addr.clone()),
319 None => return None,
320 }
321 }
322
323 None
324 }
325
326 /// Add new actor to the registry by address, panic if actor is already running
327 pub fn set<A: SystemService + Actor<Context = Context<A>>>(addr: Addr<A>) {
328 let sys = System::current();
329
330 let mut sreg = SREG.lock();
331 let reg = sreg
332 .entry(sys.id())
333 .or_insert_with(|| SystemRegistry::new(sys.arbiter().clone()));
334
335 if let Some(addr) = reg.registry.get(&TypeId::of::<A>()) {
336 if addr.downcast_ref::<Addr<A>>().is_some() {
337 panic!("Actor already started");
338 }
339 }
340
341 reg.registry.insert(TypeId::of::<A>(), Box::new(addr));
342 }
343}