actix_server/builder.rs
1use std::{future::Future, io, num::NonZeroUsize, time::Duration};
2
3use actix_rt::net::TcpStream;
4use futures_core::future::BoxFuture;
5use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
6
7use crate::{
8 server::ServerCommand,
9 service::{InternalServiceFactory, ServerServiceFactory, StreamNewService},
10 socket::{create_mio_tcp_listener, MioListener, MioTcpListener, StdTcpListener, ToSocketAddrs},
11 worker::ServerWorkerConfig,
12 Server,
13};
14
15/// Multipath TCP (MPTCP) preference.
16///
17/// Currently only useful on Linux.
18///
19#[cfg_attr(target_os = "linux", doc = "Also see [`ServerBuilder::mptcp()`].")]
20#[derive(Debug, Clone)]
21pub enum MpTcp {
22 /// MPTCP will not be used when binding sockets.
23 Disabled,
24
25 /// MPTCP will be attempted when binding sockets. If errors occur, regular TCP will be
26 /// attempted, too.
27 TcpFallback,
28
29 /// MPTCP will be used when binding sockets (with no fallback).
30 NoFallback,
31}
32
33/// [Server] builder.
34pub struct ServerBuilder {
35 pub(crate) threads: usize,
36 pub(crate) token: usize,
37 pub(crate) backlog: u32,
38 pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
39 pub(crate) sockets: Vec<(usize, String, MioListener)>,
40 pub(crate) mptcp: MpTcp,
41 pub(crate) exit: bool,
42 pub(crate) listen_os_signals: bool,
43 pub(crate) shutdown_signal: Option<BoxFuture<'static, ()>>,
44 pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
45 pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
46 pub(crate) worker_config: ServerWorkerConfig,
47}
48
49impl Default for ServerBuilder {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl ServerBuilder {
56 /// Create new Server builder instance
57 pub fn new() -> ServerBuilder {
58 let (cmd_tx, cmd_rx) = unbounded_channel();
59
60 ServerBuilder {
61 threads: std::thread::available_parallelism().map_or(2, NonZeroUsize::get),
62 token: 0,
63 factories: Vec::new(),
64 sockets: Vec::new(),
65 backlog: 2048,
66 mptcp: MpTcp::Disabled,
67 exit: false,
68 listen_os_signals: true,
69 shutdown_signal: None,
70 cmd_tx,
71 cmd_rx,
72 worker_config: ServerWorkerConfig::default(),
73 }
74 }
75
76 /// Sets number of workers to start.
77 ///
78 /// See [`bind()`](Self::bind()) for more details on how worker count affects the number of
79 /// server factory instantiations.
80 ///
81 /// The default worker count is the determined by [`std::thread::available_parallelism()`]. See
82 /// its documentation to determine what behavior you should expect when server is run.
83 ///
84 /// `num` must be greater than 0.
85 ///
86 /// # Panics
87 ///
88 /// Panics if `num` is 0.
89 pub fn workers(mut self, num: usize) -> Self {
90 assert_ne!(num, 0, "workers must be greater than 0");
91 self.threads = num;
92 self
93 }
94
95 /// Set max number of threads for each worker's blocking task thread pool.
96 ///
97 /// One thread pool is set up **per worker**; not shared across workers.
98 ///
99 /// # Examples:
100 /// ```
101 /// # use actix_server::ServerBuilder;
102 /// let builder = ServerBuilder::new()
103 /// .workers(4) // server has 4 worker thread.
104 /// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
105 /// ```
106 ///
107 /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
108 pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
109 self.worker_config.max_blocking_threads(num);
110 self
111 }
112
113 /// Set the maximum number of pending connections.
114 ///
115 /// This refers to the number of clients that can be waiting to be served. Exceeding this number
116 /// results in the client getting an error when attempting to connect. It should only affect
117 /// servers under significant load.
118 ///
119 /// Generally set in the 64-2048 range. Default value is 2048.
120 ///
121 /// This method should be called before `bind()` method call.
122 pub fn backlog(mut self, num: u32) -> Self {
123 self.backlog = num;
124 self
125 }
126
127 /// Sets MultiPath TCP (MPTCP) preference on bound sockets.
128 ///
129 /// Multipath TCP (MPTCP) builds on top of TCP to improve connection redundancy and performance
130 /// by sharing a network data stream across multiple underlying TCP sessions. See [mptcp.dev]
131 /// for more info about MPTCP itself.
132 ///
133 /// MPTCP is available on Linux kernel version 5.6 and higher. In addition, you'll also need to
134 /// ensure the kernel option is enabled using `sysctl net.mptcp.enabled=1`.
135 ///
136 /// This method will have no effect if called after a `bind()`.
137 ///
138 /// [mptcp.dev]: https://www.mptcp.dev
139 #[cfg(target_os = "linux")]
140 pub fn mptcp(mut self, mptcp_enabled: MpTcp) -> Self {
141 self.mptcp = mptcp_enabled;
142 self
143 }
144
145 /// Sets the maximum per-worker number of concurrent connections.
146 ///
147 /// All socket listeners will stop accepting connections when this limit is reached for
148 /// each worker.
149 ///
150 /// By default max connections is set to a 25k per worker.
151 pub fn max_concurrent_connections(mut self, num: usize) -> Self {
152 self.worker_config.max_concurrent_connections(num);
153 self
154 }
155
156 #[doc(hidden)]
157 #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
158 pub fn maxconn(self, num: usize) -> Self {
159 self.max_concurrent_connections(num)
160 }
161
162 /// Sets flag to stop Actix `System` after server shutdown.
163 ///
164 /// This has no effect when server is running in a Tokio-only runtime.
165 pub fn system_exit(mut self) -> Self {
166 self.exit = true;
167 self
168 }
169
170 /// Disables OS signal handling.
171 pub fn disable_signals(mut self) -> Self {
172 self.listen_os_signals = false;
173 self
174 }
175
176 /// Specify shutdown signal from a future.
177 ///
178 /// Using this method will prevent OS signal handlers being set up.
179 ///
180 /// Typically, a `CancellationToken` will be used, but any future _can_ be.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// # use std::io;
186 /// # use tokio::net::TcpStream;
187 /// # use actix_server::Server;
188 /// # async fn run() -> io::Result<()> {
189 /// use actix_service::fn_service;
190 /// use tokio_util::sync::CancellationToken;
191 ///
192 /// let stop_signal = CancellationToken::new();
193 ///
194 /// Server::build()
195 /// .bind("shutdown-signal", "127.0.0.1:12345", || {
196 /// fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
197 /// })?
198 /// .shutdown_signal(stop_signal.cancelled_owned())
199 /// .run()
200 /// .await
201 /// # }
202 /// ```
203 pub fn shutdown_signal<Fut>(mut self, shutdown_signal: Fut) -> Self
204 where
205 Fut: Future<Output = ()> + Send + 'static,
206 {
207 self.shutdown_signal = Some(Box::pin(shutdown_signal));
208 self
209 }
210
211 /// Timeout for graceful workers shutdown in seconds.
212 ///
213 /// After receiving a stop signal, workers have this much time to finish serving requests.
214 /// Workers still alive after the timeout are force dropped.
215 ///
216 /// By default shutdown timeout sets to 30 seconds.
217 pub fn shutdown_timeout(mut self, sec: u64) -> Self {
218 self.worker_config
219 .shutdown_timeout(Duration::from_secs(sec));
220 self
221 }
222
223 /// Adds new service to the server.
224 ///
225 /// Note that, if a DNS lookup is required, resolving hostnames is a blocking operation.
226 ///
227 /// # Worker Count
228 ///
229 /// The `factory` will be instantiated multiple times in most scenarios. The number of
230 /// instantiations is number of [`workers`](Self::workers()) × number of sockets resolved by
231 /// `addrs`.
232 ///
233 /// For example, if you've manually set [`workers`](Self::workers()) to 2, and use `127.0.0.1`
234 /// as the bind `addrs`, then `factory` will be instantiated twice. However, using `localhost`
235 /// as the bind `addrs` can often resolve to both `127.0.0.1` (IPv4) _and_ `::1` (IPv6), causing
236 /// the `factory` to be instantiated 4 times (2 workers × 2 bind addresses).
237 ///
238 /// Using a bind address of `0.0.0.0`, which signals to use all interfaces, may also multiple
239 /// the number of instantiations in a similar way.
240 ///
241 /// # Errors
242 ///
243 /// Returns an `io::Error` if:
244 /// - `addrs` cannot be resolved into one or more socket addresses;
245 /// - all the resolved socket addresses are already bound.
246 pub fn bind<F, U, N>(mut self, name: N, addrs: U, factory: F) -> io::Result<Self>
247 where
248 F: ServerServiceFactory<TcpStream>,
249 U: ToSocketAddrs,
250 N: AsRef<str>,
251 {
252 let sockets = bind_addr(addrs, self.backlog, &self.mptcp)?;
253
254 tracing::trace!("binding server to: {sockets:?}");
255
256 for lst in sockets {
257 let token = self.next_token();
258
259 self.factories.push(StreamNewService::create(
260 name.as_ref().to_string(),
261 token,
262 factory.clone(),
263 lst.local_addr()?,
264 ));
265
266 self.sockets
267 .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
268 }
269
270 Ok(self)
271 }
272
273 /// Adds service to the server using a socket listener already bound.
274 ///
275 /// # Worker Count
276 ///
277 /// The `factory` will be instantiated multiple times in most scenarios. The number of
278 /// instantiations is: number of [`workers`](Self::workers()).
279 pub fn listen<F, N: AsRef<str>>(
280 mut self,
281 name: N,
282 lst: StdTcpListener,
283 factory: F,
284 ) -> io::Result<Self>
285 where
286 F: ServerServiceFactory<TcpStream>,
287 {
288 lst.set_nonblocking(true)?;
289 let addr = lst.local_addr()?;
290
291 let token = self.next_token();
292 self.factories.push(StreamNewService::create(
293 name.as_ref().to_string(),
294 token,
295 factory,
296 addr,
297 ));
298
299 self.sockets
300 .push((token, name.as_ref().to_string(), MioListener::from(lst)));
301
302 Ok(self)
303 }
304
305 /// Starts processing incoming connections and return server controller.
306 pub fn run(self) -> Server {
307 if self.sockets.is_empty() {
308 panic!("Server should have at least one bound socket");
309 } else {
310 tracing::info!("starting {} workers", self.threads);
311 Server::new(self)
312 }
313 }
314
315 fn next_token(&mut self) -> usize {
316 let token = self.token;
317 self.token += 1;
318 token
319 }
320}
321
322#[cfg(unix)]
323impl ServerBuilder {
324 /// Adds new service to the server using a UDS (unix domain socket) address.
325 ///
326 /// # Worker Count
327 ///
328 /// The `factory` will be instantiated multiple times in most scenarios. The number of
329 /// instantiations is: number of [`workers`](Self::workers()).
330 pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
331 where
332 F: ServerServiceFactory<actix_rt::net::UnixStream>,
333 N: AsRef<str>,
334 U: AsRef<std::path::Path>,
335 {
336 // The path must not exist when we try to bind.
337 // Try to remove it to avoid bind error.
338 if let Err(err) = std::fs::remove_file(addr.as_ref()) {
339 // NotFound is expected and not an issue. Anything else is.
340 if err.kind() != std::io::ErrorKind::NotFound {
341 return Err(err);
342 }
343 }
344
345 let lst = crate::socket::StdUnixListener::bind(addr)?;
346 self.listen_uds(name, lst, factory)
347 }
348
349 /// Adds new service to the server using a UDS (unix domain socket) listener already bound.
350 ///
351 /// Useful when running as a systemd service and a socket FD is acquired externally.
352 ///
353 /// # Worker Count
354 ///
355 /// The `factory` will be instantiated multiple times in most scenarios. The number of
356 /// instantiations is: number of [`workers`](Self::workers()).
357 pub fn listen_uds<F, N: AsRef<str>>(
358 mut self,
359 name: N,
360 lst: crate::socket::StdUnixListener,
361 factory: F,
362 ) -> io::Result<Self>
363 where
364 F: ServerServiceFactory<actix_rt::net::UnixStream>,
365 {
366 use std::net::{IpAddr, Ipv4Addr};
367
368 lst.set_nonblocking(true)?;
369
370 let token = self.next_token();
371 let addr = crate::socket::StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
372
373 self.factories.push(StreamNewService::create(
374 name.as_ref().to_string(),
375 token,
376 factory,
377 addr,
378 ));
379
380 self.sockets
381 .push((token, name.as_ref().to_string(), MioListener::from(lst)));
382
383 Ok(self)
384 }
385}
386
387pub(super) fn bind_addr<S: ToSocketAddrs>(
388 addr: S,
389 backlog: u32,
390 mptcp: &MpTcp,
391) -> io::Result<Vec<MioTcpListener>> {
392 let mut opt_err = None;
393 let mut success = false;
394 let mut sockets = Vec::new();
395
396 for addr in addr.to_socket_addrs()? {
397 match create_mio_tcp_listener(addr, backlog, mptcp) {
398 Ok(lst) => {
399 success = true;
400 sockets.push(lst);
401 }
402 Err(err) => opt_err = Some(err),
403 }
404 }
405
406 if success {
407 Ok(sockets)
408 } else if let Some(err) = opt_err.take() {
409 Err(err)
410 } else {
411 Err(io::Error::other("Can not bind to address."))
412 }
413}