actix_server/
server.rs

1use std::{
2    future::Future,
3    io, mem,
4    pin::Pin,
5    task::{Context, Poll},
6    thread,
7    time::Duration,
8};
9
10use actix_rt::{time::sleep, System};
11use futures_core::{future::BoxFuture, Stream};
12use futures_util::stream::StreamExt as _;
13use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
14use tracing::{error, info};
15
16use crate::{
17    accept::Accept,
18    builder::ServerBuilder,
19    join_all::join_all,
20    service::InternalServiceFactory,
21    signals::{OsSignals, SignalKind, StopSignal},
22    waker_queue::{WakerInterest, WakerQueue},
23    worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
24    ServerHandle,
25};
26
27#[derive(Debug)]
28pub(crate) enum ServerCommand {
29    /// Worker failed to accept connection, indicating a probable panic.
30    ///
31    /// Contains index of faulted worker.
32    WorkerFaulted(usize),
33
34    /// Pause accepting connections.
35    ///
36    /// Contains return channel to notify caller of successful state change.
37    Pause(oneshot::Sender<()>),
38
39    /// Resume accepting connections.
40    ///
41    /// Contains return channel to notify caller of successful state change.
42    Resume(oneshot::Sender<()>),
43
44    /// Stop accepting connections and begin shutdown procedure.
45    Stop {
46        /// True if shut down should be graceful.
47        graceful: bool,
48
49        /// Return channel to notify caller that shutdown is complete.
50        completion: Option<oneshot::Sender<()>>,
51
52        /// Force System exit when true, overriding `ServerBuilder::system_exit()` if it is false.
53        force_system_stop: bool,
54    },
55}
56
57/// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
58///
59/// Handles creating worker threads, restarting faulted workers, connection accepting, and
60/// back-pressure logic.
61///
62/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
63/// distributes connections with a round-robin strategy.
64///
65/// The [Server] must be awaited or polled in order to start running. It will resolve when the
66/// server has fully shut down.
67///
68/// # Shutdown Signals
69/// On UNIX systems, `SIGTERM` will start a graceful shutdown and `SIGQUIT` or `SIGINT` will start a
70/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
71///
72/// A graceful shutdown will wait for all workers to stop first.
73///
74/// # Examples
75/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
76///
77/// ```no_run
78/// use std::io;
79///
80/// use actix_rt::net::TcpStream;
81/// use actix_server::Server;
82/// use actix_service::{fn_service, ServiceFactoryExt as _};
83/// use bytes::BytesMut;
84/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
85///
86/// #[actix_rt::main]
87/// async fn main() -> io::Result<()> {
88///     let bind_addr = ("127.0.0.1", 8080);
89///
90///     Server::build()
91///         .bind("echo", bind_addr, move || {
92///             fn_service(move |mut stream: TcpStream| {
93///                 async move {
94///                     let mut size = 0;
95///                     let mut buf = BytesMut::new();
96///
97///                     loop {
98///                         match stream.read_buf(&mut buf).await {
99///                             // end of stream; bail from loop
100///                             Ok(0) => break,
101///
102///                             // write bytes back to stream
103///                             Ok(bytes_read) => {
104///                                 stream.write_all(&buf[size..]).await.unwrap();
105///                                 size += bytes_read;
106///                             }
107///
108///                             Err(err) => {
109///                                 eprintln!("Stream Error: {:?}", err);
110///                                 return Err(());
111///                             }
112///                         }
113///                     }
114///
115///                     Ok(())
116///                 }
117///             })
118///             .map_err(|err| eprintln!("Service Error: {:?}", err))
119///         })?
120///         .run()
121///         .await
122/// }
123/// ```
124#[must_use = "Server does nothing unless you `.await` or poll it"]
125pub struct Server {
126    handle: ServerHandle,
127    fut: BoxFuture<'static, io::Result<()>>,
128}
129
130impl Server {
131    /// Create server build.
132    pub fn build() -> ServerBuilder {
133        ServerBuilder::default()
134    }
135
136    pub(crate) fn new(builder: ServerBuilder) -> Self {
137        Server {
138            handle: ServerHandle::new(builder.cmd_tx.clone()),
139            fut: Box::pin(ServerInner::run(builder)),
140        }
141    }
142
143    /// Get a `Server` handle that can be used issue commands and change it's state.
144    ///
145    /// See [ServerHandle](ServerHandle) for usage.
146    pub fn handle(&self) -> ServerHandle {
147        self.handle.clone()
148    }
149}
150
151impl Future for Server {
152    type Output = io::Result<()>;
153
154    #[inline]
155    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
156        Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
157    }
158}
159
160pub struct ServerInner {
161    worker_handles: Vec<WorkerHandleServer>,
162    accept_handle: Option<thread::JoinHandle<()>>,
163    worker_config: ServerWorkerConfig,
164    services: Vec<Box<dyn InternalServiceFactory>>,
165    waker_queue: WakerQueue,
166    system_stop: bool,
167    stopping: bool,
168}
169
170impl ServerInner {
171    async fn run(builder: ServerBuilder) -> io::Result<()> {
172        let (mut this, mut mux) = Self::run_sync(builder)?;
173
174        while let Some(cmd) = mux.next().await {
175            this.handle_cmd(cmd).await;
176
177            if this.stopping {
178                break;
179            }
180        }
181
182        Ok(())
183    }
184
185    fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
186        // Give log information on what runtime will be used.
187        let is_actix = actix_rt::System::try_current().is_some();
188        let is_tokio = tokio::runtime::Handle::try_current().is_ok();
189
190        match (is_actix, is_tokio) {
191            (true, _) => info!("Actix runtime found; starting in Actix runtime"),
192            (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
193            (_, false) => panic!("Actix or Tokio runtime not found; halting"),
194        }
195
196        for (_, name, lst) in &builder.sockets {
197            info!(
198                r#"starting service: "{}", workers: {}, listening on: {}"#,
199                name,
200                builder.threads,
201                lst.local_addr()
202            );
203        }
204
205        let sockets = mem::take(&mut builder.sockets)
206            .into_iter()
207            .map(|t| (t.0, t.2))
208            .collect();
209
210        let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
211
212        let mux = ServerEventMultiplexer {
213            signal_fut: builder.shutdown_signal.map(StopSignal::Cancel).or_else(|| {
214                builder
215                    .listen_os_signals
216                    .then(OsSignals::new)
217                    .map(StopSignal::Os)
218            }),
219            cmd_rx: builder.cmd_rx,
220        };
221
222        let server = ServerInner {
223            waker_queue,
224            accept_handle: Some(accept_handle),
225            worker_handles,
226            worker_config: builder.worker_config,
227            services: builder.factories,
228            system_stop: builder.exit,
229            stopping: false,
230        };
231
232        Ok((server, mux))
233    }
234
235    async fn handle_cmd(&mut self, item: ServerCommand) {
236        match item {
237            ServerCommand::Pause(tx) => {
238                self.waker_queue.wake(WakerInterest::Pause);
239                let _ = tx.send(());
240            }
241
242            ServerCommand::Resume(tx) => {
243                self.waker_queue.wake(WakerInterest::Resume);
244                let _ = tx.send(());
245            }
246
247            ServerCommand::Stop {
248                graceful,
249                completion,
250                force_system_stop,
251            } => {
252                self.stopping = true;
253
254                // Signal accept thread to stop.
255                // Signal is non-blocking; we wait for thread to stop later.
256                self.waker_queue.wake(WakerInterest::Stop);
257
258                // send stop signal to workers
259                let workers_stop = self
260                    .worker_handles
261                    .iter()
262                    .map(|worker| worker.stop(graceful))
263                    .collect::<Vec<_>>();
264
265                if graceful {
266                    // wait for all workers to shut down
267                    let _ = join_all(workers_stop).await;
268                }
269
270                // wait for accept thread stop
271                self.accept_handle
272                    .take()
273                    .unwrap()
274                    .join()
275                    .expect("Accept thread must not panic in any case");
276
277                if let Some(tx) = completion {
278                    let _ = tx.send(());
279                }
280
281                if self.system_stop || force_system_stop {
282                    sleep(Duration::from_millis(300)).await;
283                    System::try_current().as_ref().map(System::stop);
284                }
285            }
286
287            ServerCommand::WorkerFaulted(idx) => {
288                // TODO: maybe just return with warning log if not found ?
289                assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
290
291                error!("worker {} has died; restarting", idx);
292
293                let factories = self
294                    .services
295                    .iter()
296                    .map(|service| service.clone_factory())
297                    .collect();
298
299                match ServerWorker::start(
300                    idx,
301                    factories,
302                    self.waker_queue.clone(),
303                    self.worker_config,
304                ) {
305                    Ok((handle_accept, handle_server)) => {
306                        *self
307                            .worker_handles
308                            .iter_mut()
309                            .find(|wrk| wrk.idx == idx)
310                            .unwrap() = handle_server;
311
312                        self.waker_queue.wake(WakerInterest::Worker(handle_accept));
313                    }
314
315                    Err(err) => error!("can not restart worker {}: {}", idx, err),
316                };
317            }
318        }
319    }
320
321    fn map_signal(signal: SignalKind) -> ServerCommand {
322        match signal {
323            SignalKind::Cancel => {
324                info!("Cancellation token/channel received; starting graceful shutdown");
325                ServerCommand::Stop {
326                    graceful: true,
327                    completion: None,
328                    force_system_stop: true,
329                }
330            }
331
332            SignalKind::OsInt => {
333                info!("SIGINT received; starting forced shutdown");
334                ServerCommand::Stop {
335                    graceful: false,
336                    completion: None,
337                    force_system_stop: true,
338                }
339            }
340
341            SignalKind::OsTerm => {
342                info!("SIGTERM received; starting graceful shutdown");
343                ServerCommand::Stop {
344                    graceful: true,
345                    completion: None,
346                    force_system_stop: true,
347                }
348            }
349
350            SignalKind::OsQuit => {
351                info!("SIGQUIT received; starting forced shutdown");
352                ServerCommand::Stop {
353                    graceful: false,
354                    completion: None,
355                    force_system_stop: true,
356                }
357            }
358        }
359    }
360}
361
362struct ServerEventMultiplexer {
363    cmd_rx: UnboundedReceiver<ServerCommand>,
364    signal_fut: Option<StopSignal>,
365}
366
367impl Stream for ServerEventMultiplexer {
368    type Item = ServerCommand;
369
370    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371        let this = Pin::into_inner(self);
372
373        if let Some(signal_fut) = &mut this.signal_fut {
374            if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
375                this.signal_fut = None;
376                return Poll::Ready(Some(ServerInner::map_signal(signal)));
377            }
378        }
379
380        this.cmd_rx.poll_recv(cx)
381    }
382}