1use std::{
2 future::Future,
3 io, mem,
4 pin::Pin,
5 task::{Context, Poll},
6 thread,
7 time::Duration,
8};
9
10use actix_rt::{time::sleep, System};
11use futures_core::{future::BoxFuture, Stream};
12use futures_util::stream::StreamExt as _;
13use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
14use tracing::{error, info};
15
16use crate::{
17 accept::Accept,
18 builder::ServerBuilder,
19 join_all::join_all,
20 service::InternalServiceFactory,
21 signals::{OsSignals, SignalKind, StopSignal},
22 waker_queue::{WakerInterest, WakerQueue},
23 worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
24 ServerHandle,
25};
26
27#[derive(Debug)]
28pub(crate) enum ServerCommand {
29 WorkerFaulted(usize),
33
34 Pause(oneshot::Sender<()>),
38
39 Resume(oneshot::Sender<()>),
43
44 Stop {
46 graceful: bool,
48
49 completion: Option<oneshot::Sender<()>>,
51
52 force_system_stop: bool,
54 },
55}
56
57#[must_use = "Server does nothing unless you `.await` or poll it"]
125pub struct Server {
126 handle: ServerHandle,
127 fut: BoxFuture<'static, io::Result<()>>,
128}
129
130impl Server {
131 pub fn build() -> ServerBuilder {
133 ServerBuilder::default()
134 }
135
136 pub(crate) fn new(builder: ServerBuilder) -> Self {
137 Server {
138 handle: ServerHandle::new(builder.cmd_tx.clone()),
139 fut: Box::pin(ServerInner::run(builder)),
140 }
141 }
142
143 pub fn handle(&self) -> ServerHandle {
147 self.handle.clone()
148 }
149}
150
151impl Future for Server {
152 type Output = io::Result<()>;
153
154 #[inline]
155 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156 Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
157 }
158}
159
160pub struct ServerInner {
161 worker_handles: Vec<WorkerHandleServer>,
162 accept_handle: Option<thread::JoinHandle<()>>,
163 worker_config: ServerWorkerConfig,
164 services: Vec<Box<dyn InternalServiceFactory>>,
165 waker_queue: WakerQueue,
166 system_stop: bool,
167 stopping: bool,
168}
169
170impl ServerInner {
171 async fn run(builder: ServerBuilder) -> io::Result<()> {
172 let (mut this, mut mux) = Self::run_sync(builder)?;
173
174 while let Some(cmd) = mux.next().await {
175 this.handle_cmd(cmd).await;
176
177 if this.stopping {
178 break;
179 }
180 }
181
182 Ok(())
183 }
184
185 fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
186 let is_actix = actix_rt::System::try_current().is_some();
188 let is_tokio = tokio::runtime::Handle::try_current().is_ok();
189
190 match (is_actix, is_tokio) {
191 (true, _) => info!("Actix runtime found; starting in Actix runtime"),
192 (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
193 (_, false) => panic!("Actix or Tokio runtime not found; halting"),
194 }
195
196 for (_, name, lst) in &builder.sockets {
197 info!(
198 r#"starting service: "{}", workers: {}, listening on: {}"#,
199 name,
200 builder.threads,
201 lst.local_addr()
202 );
203 }
204
205 let sockets = mem::take(&mut builder.sockets)
206 .into_iter()
207 .map(|t| (t.0, t.2))
208 .collect();
209
210 let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
211
212 let mux = ServerEventMultiplexer {
213 signal_fut: builder.shutdown_signal.map(StopSignal::Cancel).or_else(|| {
214 builder
215 .listen_os_signals
216 .then(OsSignals::new)
217 .map(StopSignal::Os)
218 }),
219 cmd_rx: builder.cmd_rx,
220 };
221
222 let server = ServerInner {
223 waker_queue,
224 accept_handle: Some(accept_handle),
225 worker_handles,
226 worker_config: builder.worker_config,
227 services: builder.factories,
228 system_stop: builder.exit,
229 stopping: false,
230 };
231
232 Ok((server, mux))
233 }
234
235 async fn handle_cmd(&mut self, item: ServerCommand) {
236 match item {
237 ServerCommand::Pause(tx) => {
238 self.waker_queue.wake(WakerInterest::Pause);
239 let _ = tx.send(());
240 }
241
242 ServerCommand::Resume(tx) => {
243 self.waker_queue.wake(WakerInterest::Resume);
244 let _ = tx.send(());
245 }
246
247 ServerCommand::Stop {
248 graceful,
249 completion,
250 force_system_stop,
251 } => {
252 self.stopping = true;
253
254 self.waker_queue.wake(WakerInterest::Stop);
257
258 let workers_stop = self
260 .worker_handles
261 .iter()
262 .map(|worker| worker.stop(graceful))
263 .collect::<Vec<_>>();
264
265 if graceful {
266 let _ = join_all(workers_stop).await;
268 }
269
270 self.accept_handle
272 .take()
273 .unwrap()
274 .join()
275 .expect("Accept thread must not panic in any case");
276
277 if let Some(tx) = completion {
278 let _ = tx.send(());
279 }
280
281 if self.system_stop || force_system_stop {
282 sleep(Duration::from_millis(300)).await;
283 System::try_current().as_ref().map(System::stop);
284 }
285 }
286
287 ServerCommand::WorkerFaulted(idx) => {
288 assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
290
291 error!("worker {} has died; restarting", idx);
292
293 let factories = self
294 .services
295 .iter()
296 .map(|service| service.clone_factory())
297 .collect();
298
299 match ServerWorker::start(
300 idx,
301 factories,
302 self.waker_queue.clone(),
303 self.worker_config,
304 ) {
305 Ok((handle_accept, handle_server)) => {
306 *self
307 .worker_handles
308 .iter_mut()
309 .find(|wrk| wrk.idx == idx)
310 .unwrap() = handle_server;
311
312 self.waker_queue.wake(WakerInterest::Worker(handle_accept));
313 }
314
315 Err(err) => error!("can not restart worker {}: {}", idx, err),
316 };
317 }
318 }
319 }
320
321 fn map_signal(signal: SignalKind) -> ServerCommand {
322 match signal {
323 SignalKind::Cancel => {
324 info!("Cancellation token/channel received; starting graceful shutdown");
325 ServerCommand::Stop {
326 graceful: true,
327 completion: None,
328 force_system_stop: true,
329 }
330 }
331
332 SignalKind::OsInt => {
333 info!("SIGINT received; starting forced shutdown");
334 ServerCommand::Stop {
335 graceful: false,
336 completion: None,
337 force_system_stop: true,
338 }
339 }
340
341 SignalKind::OsTerm => {
342 info!("SIGTERM received; starting graceful shutdown");
343 ServerCommand::Stop {
344 graceful: true,
345 completion: None,
346 force_system_stop: true,
347 }
348 }
349
350 SignalKind::OsQuit => {
351 info!("SIGQUIT received; starting forced shutdown");
352 ServerCommand::Stop {
353 graceful: false,
354 completion: None,
355 force_system_stop: true,
356 }
357 }
358 }
359 }
360}
361
362struct ServerEventMultiplexer {
363 cmd_rx: UnboundedReceiver<ServerCommand>,
364 signal_fut: Option<StopSignal>,
365}
366
367impl Stream for ServerEventMultiplexer {
368 type Item = ServerCommand;
369
370 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371 let this = Pin::into_inner(self);
372
373 if let Some(signal_fut) = &mut this.signal_fut {
374 if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
375 this.signal_fut = None;
376 return Poll::Ready(Some(ServerInner::map_signal(signal)));
377 }
378 }
379
380 this.cmd_rx.poll_recv(cx)
381 }
382}