actix_server/
builder.rs

1use std::{future::Future, io, num::NonZeroUsize, time::Duration};
2
3use actix_rt::net::TcpStream;
4use futures_core::future::BoxFuture;
5use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
6
7use crate::{
8    server::ServerCommand,
9    service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
10    socket::{create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs},
11    worker::ServerWorkerConfig,
12    Server,
13};
14
15/// Multipath TCP (MPTCP) preference.
16///
17/// Currently only useful on Linux.
18///
19#[cfg_attr(target_os = "linux", doc = "Also see [`ServerBuilder::mptcp()`].")]
20#[derive(Debug, Clone)]
21pub enum MpTcp {
22    /// MPTCP will not be used when binding sockets.
23    Disabled,
24
25    /// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
26    /// attempted, too.
27    TcpFallback,
28
29    /// MPTCP will be used when binding sockets (with no fallback).
30    NoFallback,
31}
32
33/// [Server] builder.
34pub struct ServerBuilder {
35    pub(crate) threads: usize,
36    pub(crate) token: usize,
37    pub(crate) backlog: u32,
38    pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
39    pub(crate) sockets: Vec<(usize, String, MioListener)>,
40    pub(crate) mptcp: MpTcp,
41    pub(crate) exit: bool,
42    pub(crate) listen_os_signals: bool,
43    pub(crate) shutdown_signal: Option<BoxFuture<'static, ()>>,
44    pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
45    pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
46    pub(crate) worker_config: ServerWorkerConfig,
47}
48
49impl Default for ServerBuilder {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl ServerBuilder {
56    /// Create new Server builder instance
57    pub fn new() -> ServerBuilder {
58        let (cmd_tx, cmd_rx) = unbounded_channel();
59
60        ServerBuilder {
61            threads: std::thread::available_parallelism().map_or(2, NonZeroUsize::get),
62            token: 0,
63            factories: Vec::new(),
64            sockets: Vec::new(),
65            backlog: 2048,
66            mptcp: MpTcp::Disabled,
67            exit: false,
68            listen_os_signals: true,
69            shutdown_signal: None,
70            cmd_tx,
71            cmd_rx,
72            worker_config: ServerWorkerConfig::default(),
73        }
74    }
75
76    /// Sets number of workers to start.
77    ///
78    /// See [`bind()`](Self::bind()) for more details on how worker count affects the number of
79    /// server factory instantiations.
80    ///
81    /// The default worker count is the determined by [`std::thread::available_parallelism()`]. See
82    /// its documentation to determine what behavior you should expect when server is run.
83    ///
84    /// `num` must be greater than 0.
85    ///
86    /// # Panics
87    ///
88    /// Panics if `num` is 0.
89    pub fn workers(mut self, num: usize) -> Self {
90        assert_ne!(num, 0, "workers must be greater than 0");
91        self.threads = num;
92        self
93    }
94
95    /// Set max number of threads for each worker's blocking task thread pool.
96    ///
97    /// One thread pool is set up **per worker**; not shared across workers.
98    ///
99    /// # Examples:
100    /// ```
101    /// # use actix_server::ServerBuilder;
102    /// let builder = ServerBuilder::new()
103    ///     .workers(4) // server has 4 worker thread.
104    ///     .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
105    /// ```
106    ///
107    /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
108    pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
109        self.worker_config.max_blocking_threads(num);
110        self
111    }
112
113    /// Set the maximum number of pending connections.
114    ///
115    /// This refers to the number of clients that can be waiting to be served. Exceeding this number
116    /// results in the client getting an error when attempting to connect. It should only affect
117    /// servers under significant load.
118    ///
119    /// Generally set in the 64-2048 range. Default value is 2048.
120    ///
121    /// This method should be called before `bind()` method call.
122    pub fn backlog(mut self, num: u32) -> Self {
123        self.backlog = num;
124        self
125    }
126
127    /// Sets MultiPath TCP (MPTCP) preference on bound sockets.
128    ///
129    /// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
130    /// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
131    /// for more info about MPTCP itself.
132    ///
133    /// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
134    /// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
135    ///
136    /// This method will have no effect if called after a `bind()`.
137    ///
138    /// [mptcp.dev]: https://www.mptcp.dev
139    #[cfg(target_os = "linux")]
140    pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
141        self.mptcp = mptcp_enabled;
142        self
143    }
144
145    /// Sets the maximum per-worker number of concurrent connections.
146    ///
147    /// All socket listeners will stop accepting connections when this limit is reached for
148    /// each worker.
149    ///
150    /// By default max connections is set to a 25k per worker.
151    pub fn max_concurrent_connections(mut self, num: usize) -> Self {
152        self.worker_config.max_concurrent_connections(num);
153        self
154    }
155
156    #[doc(hidden)]
157    #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
158    pub fn maxconn(self, num: usize) -> Self {
159        self.max_concurrent_connections(num)
160    }
161
162    /// Sets flag to stop Actix `System` after server shutdown.
163    ///
164    /// This has no effect when server is running in a Tokio-only runtime.
165    pub fn system_exit(mut self) -> Self {
166        self.exit = true;
167        self
168    }
169
170    /// Disables OS signal handling.
171    pub fn disable_signals(mut self) -> Self {
172        self.listen_os_signals = false;
173        self
174    }
175
176    /// Specify shutdown signal from a future.
177    ///
178    /// Using this method will prevent OS signal handlers being set up.
179    ///
180    /// Typically, a `CancellationToken` will be used, but any future _can_ be.
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// # use std::io;
186    /// # use tokio::net::TcpStream;
187    /// # use actix_server::Server;
188    /// # async fn run() -> io::Result<()> {
189    /// use actix_service::fn_service;
190    /// use tokio_util::sync::CancellationToken;
191    ///
192    /// let stop_signal = CancellationToken::new();
193    ///
194    /// Server::build()
195    ///     .bind("shutdown-signal", "127.0.0.1:12345", || {
196    ///         fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
197    ///     })?
198    ///     .shutdown_signal(stop_signal.cancelled_owned())
199    ///     .run()
200    ///     .await
201    /// # }
202    /// ```
203    pub fn shutdown_signal<Fut>(mut self, shutdown_signal: Fut) -> Self
204    where
205        Fut: Future<Output = ()> + Send + 'static,
206    {
207        self.shutdown_signal = Some(Box::pin(shutdown_signal));
208        self
209    }
210
211    /// Timeout for graceful workers shutdown in seconds.
212    ///
213    /// After receiving a stop signal, workers have this much time to finish serving requests.
214    /// Workers still alive after the timeout are force dropped.
215    ///
216    /// By default shutdown timeout sets to 30 seconds.
217    pub fn shutdown_timeout(mut self, sec: u64) -> Self {
218        self.worker_config
219            .shutdown_timeout(Duration::from_secs(sec));
220        self
221    }
222
223    /// Adds new service to the server.
224    ///
225    /// Note that, if a DNS lookup is required, resolving hostnames is a blocking operation.
226    ///
227    /// # Worker Count
228    ///
229    /// The `factory` will be instantiated multiple times in most scenarios. The number of
230    /// instantiations is number of [`workers`](Self::workers()) × number of sockets resolved by
231    /// `addrs`.
232    ///
233    /// For example, if you've manually set [`workers`](Self::workers()) to 2, and use `127.0.0.1`
234    /// as the bind `addrs`, then `factory` will be instantiated twice. However, using `localhost`
235    /// as the bind `addrs` can often resolve to both `127.0.0.1` (IPv4) _and_ `::1` (IPv6), causing
236    /// the `factory` to be instantiated 4 times (2 workers × 2 bind addresses).
237    ///
238    /// Using a bind address of `0.0.0.0`, which signals to use all interfaces, may also multiple
239    /// the number of instantiations in a similar way.
240    ///
241    /// # Errors
242    ///
243    /// Returns an `io::Error` if:
244    /// - `addrs` cannot be resolved into one or more socket addresses;
245    /// - all the resolved socket addresses are already bound.
246    pub fn bind<F, U, N>(mut self, name: N, addrs: U, factory: F) -> io::Result<Self>
247    where
248        F: ServerServiceFactory<TcpStream>,
249        U: ToSocketAddrs,
250        N: AsRef<str>,
251    {
252        let sockets = bind_addr(addrs, self.backlog, &self.mptcp)?;
253
254        tracing::trace!("binding server to: {sockets:?}");
255
256        for lst in sockets {
257            let token = self.next_token();
258
259            self.factories.push(StreamNewService::create(
260                name.as_ref().to_string(),
261                token,
262                factory.clone(),
263                lst.local_addr()?,
264            ));
265
266            self.sockets
267                .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
268        }
269
270        Ok(self)
271    }
272
273    /// Adds service to the server using a socket listener already bound.
274    ///
275    /// # Worker Count
276    ///
277    /// The `factory` will be instantiated multiple times in most scenarios. The number of
278    /// instantiations is: number of [`workers`](Self::workers()).
279    pub fn listen<F, N: AsRef<str>>(
280        mut self,
281        name: N,
282        lst: StdTcpListener,
283        factory: F,
284    ) -> io::Result<Self>
285    where
286        F: ServerServiceFactory<TcpStream>,
287    {
288        lst.set_nonblocking(true)?;
289        let addr = lst.local_addr()?;
290
291        let token = self.next_token();
292        self.factories.push(StreamNewService::create(
293            name.as_ref().to_string(),
294            token,
295            factory,
296            addr,
297        ));
298
299        self.sockets
300            .push((token, name.as_ref().to_string(), MioListener::from(lst)));
301
302        Ok(self)
303    }
304
305    /// Starts processing incoming connections and return server controller.
306    pub fn run(self) -> Server {
307        if self.sockets.is_empty() {
308            panic!("Server should have at least one bound socket");
309        } else {
310            tracing::info!("starting {} workers", self.threads);
311            Server::new(self)
312        }
313    }
314
315    fn next_token(&mut self) -> usize {
316        let token = self.token;
317        self.token += 1;
318        token
319    }
320}
321
322#[cfg(unix)]
323impl ServerBuilder {
324    /// Adds new service to the server using a UDS (unix domain socket) address.
325    ///
326    /// # Worker Count
327    ///
328    /// The `factory` will be instantiated multiple times in most scenarios. The number of
329    /// instantiations is: number of [`workers`](Self::workers()).
330    pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
331    where
332        F: ServerServiceFactory<actix_rt::net::UnixStream>,
333        N: AsRef<str>,
334        U: AsRef<std::path::Path>,
335    {
336        // The path must not exist when we try to bind.
337        // Try to remove it to avoid bind error.
338        if let Err(err) = std::fs::remove_file(addr.as_ref()) {
339            // NotFound is expected and not an issue. Anything else is.
340            if err.kind() != std::io::ErrorKind::NotFound {
341                return Err(err);
342            }
343        }
344
345        let lst = crate::socket::StdUnixListener::bind(addr)?;
346        self.listen_uds(name, lst, factory)
347    }
348
349    /// Adds new service to the server using a UDS (unix domain socket) listener already bound.
350    ///
351    /// Useful when running as a systemd service and a socket FD is acquired externally.
352    ///
353    /// # Worker Count
354    ///
355    /// The `factory` will be instantiated multiple times in most scenarios. The number of
356    /// instantiations is: number of [`workers`](Self::workers()).
357    pub fn listen_uds<F, N: AsRef<str>>(
358        mut self,
359        name: N,
360        lst: crate::socket::StdUnixListener,
361        factory: F,
362    ) -> io::Result<Self>
363    where
364        F: ServerServiceFactory<actix_rt::net::UnixStream>,
365    {
366        use std::net::{IpAddr, Ipv4Addr};
367
368        lst.set_nonblocking(true)?;
369
370        let token = self.next_token();
371        let addr = crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
372
373        self.factories.push(StreamNewService::create(
374            name.as_ref().to_string(),
375            token,
376            factory,
377            addr,
378        ));
379
380        self.sockets
381            .push((token, name.as_ref().to_string(), MioListener::from(lst)));
382
383        Ok(self)
384    }
385}
386
387pub(super) fn bind_addr<S: ToSocketAddrs>(
388    addr: S,
389    backlog: u32,
390    mptcp: &MpTcp,
391) -> io::Result<Vec<MioTcpListener>> {
392    let mut opt_err = None;
393    let mut success = false;
394    let mut sockets = Vec::new();
395
396    for addr in addr.to_socket_addrs()? {
397        match create_mio_tcp_listener(addr, backlog, mptcp) {
398            Ok(lst) => {
399                success = true;
400                sockets.push(lst);
401            }
402            Err(err) => opt_err = Some(err),
403        }
404    }
405
406    if success {
407        Ok(sockets)
408    } else if let Some(err) = opt_err.take() {
409        Err(err)
410    } else {
411        Err(io::Error::other("Can not bind to address."))
412    }
413}