sqlx_core/pool/
mod.rs

1//! Provides the connection pool for asynchronous SQLx connections.
2//!
3//! Opening a database connection for each and every operation to the database can quickly
4//! become expensive. Furthermore, sharing a database connection between threads and functions
5//! can be difficult to express in Rust.
6//!
7//! A connection pool is a standard technique that can manage opening and re-using connections.
8//! Normally it also enforces a maximum number of connections as these are an expensive resource
9//! on the database server.
10//!
11//! SQLx provides a canonical connection pool implementation intended to satisfy the majority
12//! of use cases.
13//!
14//! See [Pool] for details.
15//!
16//! Type aliases are provided for each database to make it easier to sprinkle `Pool` through
17//! your codebase:
18//!
19//! * [MssqlPool][crate::mssql::MssqlPool] (MSSQL)
20//! * [MySqlPool][crate::mysql::MySqlPool] (MySQL)
21//! * [PgPool][crate::postgres::PgPool] (PostgreSQL)
22//! * [SqlitePool][crate::sqlite::SqlitePool] (SQLite)
23//!
24//! # Opening a connection pool
25//!
26//! A new connection pool with a default configuration can be created by supplying `Pool`
27//! with the database driver and a connection string.
28//!
29//! ```rust,ignore
30//! use sqlx::Pool;
31//! use sqlx::postgres::Postgres;
32//!
33//! let pool = Pool::<Postgres>::connect("postgres://").await?;
34//! ```
35//!
36//! For convenience, database-specific type aliases are provided:
37//!
38//! ```rust,ignore
39//! use sqlx::mssql::MssqlPool;
40//!
41//! let pool = MssqlPool::connect("mssql://").await?;
42//! ```
43//!
44//! # Using a connection pool
45//!
46//! A connection pool implements [`Executor`][crate::executor::Executor] and can be used directly
47//! when executing a query. Notice that only an immutable reference (`&Pool`) is needed.
48//!
49//! ```rust,ignore
50//! sqlx::query("DELETE FROM articles").execute(&pool).await?;
51//! ```
52//!
53//! A connection or transaction may also be manually acquired with
54//! [`Pool::acquire`] or
55//! [`Pool::begin`].
56
57use std::borrow::Cow;
58use std::fmt;
59use std::future::Future;
60use std::pin::{pin, Pin};
61use std::sync::Arc;
62use std::task::{Context, Poll};
63use std::time::{Duration, Instant};
64
65use event_listener::EventListener;
66use futures_core::FusedFuture;
67use futures_util::FutureExt;
68
69use crate::connection::Connection;
70use crate::database::Database;
71use crate::error::Error;
72use crate::transaction::Transaction;
73
74pub use self::connection::PoolConnection;
75use self::inner::PoolInner;
76#[doc(hidden)]
77pub use self::maybe::MaybePoolConnection;
78pub use self::options::{PoolConnectionMetadata, PoolOptions};
79
80#[macro_use]
81mod executor;
82
83#[macro_use]
84pub mod maybe;
85
86mod connection;
87mod inner;
88mod options;
89
90/// An asynchronous pool of SQLx database connections.
91///
92/// Create a pool with [Pool::connect] or [Pool::connect_with] and then call [Pool::acquire]
93/// to get a connection from the pool; when the connection is dropped it will return to the pool
94/// so it can be reused.
95///
96/// You can also pass `&Pool` directly anywhere an `Executor` is required; this will automatically
97/// checkout a connection for you.
98///
99/// See [the module documentation](crate::pool) for examples.
100///
101/// The pool has a maximum connection limit that it will not exceed; if `acquire()` is called
102/// when at this limit and all connections are checked out, the task will be made to wait until
103/// a connection becomes available.
104///
105/// You can configure the connection limit, and other parameters, using [PoolOptions].
106///
107/// Calls to `acquire()` are fair, i.e. fulfilled on a first-come, first-serve basis.
108///
109/// `Pool` is `Send`, `Sync` and `Clone`. It is intended to be created once at the start of your
110/// application/daemon/web server/etc. and then shared with all tasks throughout the process'
111/// lifetime. How best to accomplish this depends on your program architecture.
112///
113/// In Actix-Web, for example, you can efficiently share a single pool with all request handlers
114/// using [web::ThinData].
115///
116/// Cloning `Pool` is cheap as it is simply a reference-counted handle to the inner pool state.
117/// When the last remaining handle to the pool is dropped, the connections owned by the pool are
118/// immediately closed (also by dropping). `PoolConnection` returned by [Pool::acquire] and
119/// `Transaction` returned by [Pool::begin] both implicitly hold a reference to the pool for
120/// their lifetimes.
121///
122/// If you prefer to explicitly shutdown the pool and gracefully close its connections (which
123/// depending on the database type, may include sending a message to the database server that the
124/// connection is being closed), you can call [Pool::close] which causes all waiting and subsequent
125/// calls to [Pool::acquire] to return [Error::PoolClosed], and waits until all connections have
126/// been returned to the pool and gracefully closed.
127///
128/// Type aliases are provided for each database to make it easier to sprinkle `Pool` through
129/// your codebase:
130///
131/// * [MssqlPool][crate::mssql::MssqlPool] (MSSQL)
132/// * [MySqlPool][crate::mysql::MySqlPool] (MySQL)
133/// * [PgPool][crate::postgres::PgPool] (PostgreSQL)
134/// * [SqlitePool][crate::sqlite::SqlitePool] (SQLite)
135///
136/// [web::ThinData]: https://docs.rs/actix-web/4.9.0/actix_web/web/struct.ThinData.html
137///
138/// ### Note: Drop Behavior
139/// Due to a lack of async `Drop`, dropping the last `Pool` handle may not immediately clean
140/// up connections by itself. The connections will be dropped locally, which is sufficient for
141/// SQLite, but for client/server databases like MySQL and Postgres, that only closes the
142/// client side of the connection. The server will not know the connection is closed until
143/// potentially much later: this is usually dictated by the TCP keepalive timeout in the server
144/// settings.
145///
146/// Because the connection may not be cleaned up immediately on the server side, you may run
147/// into errors regarding connection limits if you are creating and dropping many pools in short
148/// order.
149///
150/// We recommend calling [`.close().await`] to gracefully close the pool and its connections
151/// when you are done using it. This will also wake any tasks that are waiting on an `.acquire()`
152/// call, so for long-lived applications it's a good idea to call `.close()` during shutdown.
153///
154/// If you're writing tests, consider using `#[sqlx::test]` which handles the lifetime of
155/// the pool for you.
156///
157/// [`.close().await`]: Pool::close
158///
159/// ### Why Use a Pool?
160///
161/// A single database connection (in general) cannot be used by multiple threads simultaneously
162/// for various reasons, but an application or web server will typically need to execute numerous
163/// queries or commands concurrently (think of concurrent requests against a web server; many or all
164/// of them will probably need to hit the database).
165///
166/// You could place the connection in a `Mutex` but this will make it a huge bottleneck.
167///
168/// Naively, you might also think to just open a new connection per request, but this
169/// has a number of other caveats, generally due to the high overhead involved in working with
170/// a fresh connection. Examples to follow.
171///
172/// Connection pools facilitate reuse of connections to _amortize_ these costs, helping to ensure
173/// that you're not paying for them each time you need a connection.
174///
175/// ##### 1. Overhead of Opening a Connection
176/// Opening a database connection is not exactly a cheap operation.
177///
178/// For SQLite, it means numerous requests to the filesystem and memory allocations, while for
179/// server-based databases it involves performing DNS resolution, opening a new TCP connection and
180/// allocating buffers.
181///
182/// Each connection involves a nontrivial allocation of resources for the database server, usually
183/// including spawning a new thread or process specifically to handle the connection, both for
184/// concurrency and isolation of faults.
185///
186/// Additionally, database connections typically involve a complex handshake including
187/// authentication, negotiation regarding connection parameters (default character sets, timezones,
188/// locales, supported features) and upgrades to encrypted tunnels.
189///
190/// If `acquire()` is called on a pool with all connections checked out but it is not yet at its
191/// connection limit (see next section), then a new connection is immediately opened, so this pool
192/// does not _automatically_ save you from the overhead of creating a new connection.
193///
194/// However, because this pool by design enforces _reuse_ of connections, this overhead cost
195/// is not paid each and every time you need a connection. In fact, if you set
196/// [the `min_connections` option in PoolOptions][PoolOptions::min_connections], the pool will
197/// create that many connections up-front so that they are ready to go when a request comes in,
198/// and maintain that number on a best-effort basis for consistent performance.
199///
200/// ##### 2. Connection Limits (MySQL, MSSQL, Postgres)
201/// Database servers usually place hard limits on the number of connections that are allowed open at
202/// any given time, to maintain performance targets and prevent excessive allocation of resources,
203/// such as RAM, journal files, disk caches, etc.
204///
205/// These limits have different defaults per database flavor, and may vary between different
206/// distributions of the same database, but are typically configurable on server start;
207/// if you're paying for managed database hosting then the connection limit will typically vary with
208/// your pricing tier.
209///
210/// In MySQL, the default limit is typically 150, plus 1 which is reserved for a user with the
211/// `CONNECTION_ADMIN` privilege so you can still access the server to diagnose problems even
212/// with all connections being used.
213///
214/// In MSSQL the only documentation for the default maximum limit is that it depends on the version
215/// and server configuration.
216///
217/// In Postgres, the default limit is typically 100, minus 3 which are reserved for superusers
218/// (putting the default limit for unprivileged users at 97 connections).
219///
220/// In any case, exceeding these limits results in an error when opening a new connection, which
221/// in a web server context will turn into a `500 Internal Server Error` if not handled, but should
222/// be turned into either `403 Forbidden` or `429 Too Many Requests` depending on your rate-limiting
223/// scheme. However, in a web context, telling a client "go away, maybe try again later" results in
224/// a sub-optimal user experience.
225///
226/// Instead, with a connection pool, clients are made to wait in a fair queue for a connection to
227/// become available; by using a single connection pool for your whole application, you can ensure
228/// that you don't exceed the connection limit of your database server while allowing response
229/// time to degrade gracefully at high load.
230///
231/// Of course, if multiple applications are connecting to the same database server, then you
232/// should ensure that the connection limits for all applications add up to your server's maximum
233/// connections or less.
234///
235/// ##### 3. Resource Reuse
236/// The first time you execute a query against your database, the database engine must first turn
237/// the SQL into an actionable _query plan_ which it may then execute against the database. This
238/// involves parsing the SQL query, validating and analyzing it, and in the case of Postgres 12+ and
239/// SQLite, generating code to execute the query plan (native or bytecode, respectively).
240///
241/// These database servers provide a way to amortize this overhead by _preparing_ the query,
242/// associating it with an object ID and placing its query plan in a cache to be referenced when
243/// it is later executed.
244///
245/// Prepared statements have other features, like bind parameters, which make them safer and more
246/// ergonomic to use as well. By design, SQLx pushes you towards using prepared queries/statements
247/// via the [Query][crate::query::Query] API _et al._ and the `query!()` macro _et al._, for
248/// reasons of safety, ergonomics, and efficiency.
249///
250/// However, because database connections are typically isolated from each other in the database
251/// server (either by threads or separate processes entirely), they don't typically share prepared
252/// statements between connections so this work must be redone _for each connection_.
253///
254/// As with section 1, by facilitating reuse of connections, `Pool` helps to ensure their prepared
255/// statements (and thus cached query plans) can be reused as much as possible, thus amortizing
256/// the overhead involved.
257///
258/// Depending on the database server, a connection will have caches for all kinds of other data as
259/// well and queries will generally benefit from these caches being "warm" (populated with data).
260pub struct Pool<DB: Database>(pub(crate) Arc<PoolInner<DB>>);
261
262/// A future that resolves when the pool is closed.
263///
264/// See [`Pool::close_event()`] for details.
265pub struct CloseEvent {
266    listener: Option<EventListener>,
267}
268
269impl<DB: Database> Pool<DB> {
270    /// Create a new connection pool with a default pool configuration and
271    /// the given connection URL, and immediately establish one connection.
272    ///
273    /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
274    ///
275    /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
276    /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
277    /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
278    /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
279    ///
280    /// The default configuration is mainly suited for testing and light-duty applications.
281    /// For production applications, you'll likely want to make at least few tweaks.
282    ///
283    /// See [`PoolOptions::new()`] for details.
284    pub async fn connect(url: &str) -> Result<Self, Error> {
285        PoolOptions::<DB>::new().connect(url).await
286    }
287
288    /// Create a new connection pool with a default pool configuration and
289    /// the given `ConnectOptions`, and immediately establish one connection.
290    ///
291    /// The default configuration is mainly suited for testing and light-duty applications.
292    /// For production applications, you'll likely want to make at least few tweaks.
293    ///
294    /// See [`PoolOptions::new()`] for details.
295    pub async fn connect_with(
296        options: <DB::Connection as Connection>::Options,
297    ) -> Result<Self, Error> {
298        PoolOptions::<DB>::new().connect_with(options).await
299    }
300
301    /// Create a new connection pool with a default pool configuration and
302    /// the given connection URL.
303    ///
304    /// The pool will establish connections only as needed.
305    ///
306    /// Refer to the relevant [`ConnectOptions`][crate::connection::ConnectOptions] impl for your database for the expected URL format:
307    ///
308    /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
309    /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
310    /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
311    /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
312    ///
313    /// The default configuration is mainly suited for testing and light-duty applications.
314    /// For production applications, you'll likely want to make at least few tweaks.
315    ///
316    /// See [`PoolOptions::new()`] for details.
317    pub fn connect_lazy(url: &str) -> Result<Self, Error> {
318        PoolOptions::<DB>::new().connect_lazy(url)
319    }
320
321    /// Create a new connection pool with a default pool configuration and
322    /// the given `ConnectOptions`.
323    ///
324    /// The pool will establish connections only as needed.
325    ///
326    /// The default configuration is mainly suited for testing and light-duty applications.
327    /// For production applications, you'll likely want to make at least few tweaks.
328    ///
329    /// See [`PoolOptions::new()`] for details.
330    pub fn connect_lazy_with(options: <DB::Connection as Connection>::Options) -> Self {
331        PoolOptions::<DB>::new().connect_lazy_with(options)
332    }
333
334    /// Retrieves a connection from the pool.
335    ///
336    /// The total time this method is allowed to execute is capped by
337    /// [`PoolOptions::acquire_timeout`].
338    /// If that timeout elapses, this will return [`Error::PoolClosed`].
339    ///
340    /// ### Note: Cancellation/Timeout May Drop Connections
341    /// If `acquire` is cancelled or times out after it acquires a connection from the idle queue or
342    /// opens a new one, it will drop that connection because we don't want to assume it
343    /// is safe to return to the pool, and testing it to see if it's safe to release could introduce
344    /// subtle bugs if not implemented correctly. To avoid that entirely, we've decided to not
345    /// gracefully handle cancellation here.
346    ///
347    /// However, if your workload is sensitive to dropped connections such as using an in-memory
348    /// SQLite database with a pool size of 1, you can pretty easily ensure that a cancelled
349    /// `acquire()` call will never drop connections by tweaking your [`PoolOptions`]:
350    ///
351    /// * Set [`test_before_acquire(false)`][PoolOptions::test_before_acquire]
352    /// * Never set [`before_acquire`][PoolOptions::before_acquire] or
353    ///   [`after_connect`][PoolOptions::after_connect].
354    ///
355    /// This should eliminate any potential `.await` points between acquiring a connection and
356    /// returning it.
357    pub fn acquire(&self) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static {
358        let shared = self.0.clone();
359        async move { shared.acquire().await.map(|conn| conn.reattach()) }
360    }
361
362    /// Attempts to retrieve a connection from the pool if there is one available.
363    ///
364    /// Returns `None` immediately if there are no idle connections available in the pool
365    /// or there are tasks waiting for a connection which have yet to wake.
366    pub fn try_acquire(&self) -> Option<PoolConnection<DB>> {
367        self.0.try_acquire().map(|conn| conn.into_live().reattach())
368    }
369
370    /// Retrieves a connection and immediately begins a new transaction.
371    pub async fn begin(&self) -> Result<Transaction<'static, DB>, Error> {
372        Transaction::begin(
373            MaybePoolConnection::PoolConnection(self.acquire().await?),
374            None,
375        )
376        .await
377    }
378
379    /// Attempts to retrieve a connection and immediately begins a new transaction if successful.
380    pub async fn try_begin(&self) -> Result<Option<Transaction<'static, DB>>, Error> {
381        match self.try_acquire() {
382            Some(conn) => Transaction::begin(MaybePoolConnection::PoolConnection(conn), None)
383                .await
384                .map(Some),
385
386            None => Ok(None),
387        }
388    }
389
390    /// Retrieves a connection and immediately begins a new transaction using `statement`.
391    pub async fn begin_with(
392        &self,
393        statement: impl Into<Cow<'static, str>>,
394    ) -> Result<Transaction<'static, DB>, Error> {
395        Transaction::begin(
396            MaybePoolConnection::PoolConnection(self.acquire().await?),
397            Some(statement.into()),
398        )
399        .await
400    }
401
402    /// Attempts to retrieve a connection and, if successful, immediately begins a new
403    /// transaction using `statement`.
404    pub async fn try_begin_with(
405        &self,
406        statement: impl Into<Cow<'static, str>>,
407    ) -> Result<Option<Transaction<'static, DB>>, Error> {
408        match self.try_acquire() {
409            Some(conn) => Transaction::begin(
410                MaybePoolConnection::PoolConnection(conn),
411                Some(statement.into()),
412            )
413            .await
414            .map(Some),
415
416            None => Ok(None),
417        }
418    }
419
420    /// Shut down the connection pool, immediately waking all tasks waiting for a connection.
421    ///
422    /// Upon calling this method, any currently waiting or subsequent calls to [`Pool::acquire`] and
423    /// the like will immediately return [`Error::PoolClosed`] and no new connections will be opened.
424    /// Checked-out connections are unaffected, but will be gracefully closed on-drop
425    /// rather than being returned to the pool.
426    ///
427    /// Returns a `Future` which can be `.await`ed to ensure all connections are
428    /// gracefully closed. It will first close any idle connections currently waiting in the pool,
429    /// then wait for all checked-out connections to be returned or closed.
430    ///
431    /// Waiting for connections to be gracefully closed is optional, but will allow the database
432    /// server to clean up the resources sooner rather than later. This is especially important
433    /// for tests that create a new pool every time, otherwise you may see errors about connection
434    /// limits being exhausted even when running tests in a single thread.
435    ///
436    /// If the returned `Future` is not run to completion, any remaining connections will be dropped
437    /// when the last handle for the given pool instance is dropped, which could happen in a task
438    /// spawned by `Pool` internally and so may be unpredictable otherwise.
439    ///
440    /// `.close()` may be safely called and `.await`ed on multiple handles concurrently.
441    pub fn close(&self) -> impl Future<Output = ()> + '_ {
442        self.0.close()
443    }
444
445    /// Returns `true` if [`.close()`][Pool::close] has been called on the pool, `false` otherwise.
446    pub fn is_closed(&self) -> bool {
447        self.0.is_closed()
448    }
449
450    /// Get a future that resolves when [`Pool::close()`] is called.
451    ///
452    /// If the pool is already closed, the future resolves immediately.
453    ///
454    /// This can be used to cancel long-running operations that hold onto a [`PoolConnection`]
455    /// so they don't prevent the pool from closing (which would otherwise wait until all
456    /// connections are returned).
457    ///
458    /// Examples
459    /// ========
460    /// These examples use Postgres and Tokio, but should suffice to demonstrate the concept.
461    ///
462    /// Do something when the pool is closed:
463    /// ```rust,no_run
464    /// # async fn bleh() -> sqlx::Result<()> {
465    /// use sqlx::PgPool;
466    ///
467    /// let pool = PgPool::connect("postgresql://...").await?;
468    ///
469    /// let pool2 = pool.clone();
470    ///
471    /// tokio::spawn(async move {
472    ///     // Demonstrates that `CloseEvent` is itself a `Future` you can wait on.
473    ///     // This lets you implement any kind of on-close event that you like.
474    ///     pool2.close_event().await;
475    ///
476    ///     println!("Pool is closing!");
477    ///
478    ///     // Imagine maybe recording application statistics or logging a report, etc.
479    /// });
480    ///
481    /// // The rest of the application executes normally...
482    ///
483    /// // Close the pool before the application exits...
484    /// pool.close().await;
485    ///
486    /// # Ok(())
487    /// # }
488    /// ```
489    ///
490    /// Cancel a long-running operation:
491    /// ```rust,no_run
492    /// # async fn bleh() -> sqlx::Result<()> {
493    /// use sqlx::{Executor, PgPool};
494    ///
495    /// let pool = PgPool::connect("postgresql://...").await?;
496    ///
497    /// let pool2 = pool.clone();
498    ///
499    /// tokio::spawn(async move {
500    ///     // `do_until` yields the inner future's output wrapped in `sqlx::Result`,
501    ///     // in this case giving a double-wrapped result.
502    ///     let res: sqlx::Result<sqlx::Result<()>> = pool2.close_event().do_until(async {
503    ///         // This statement normally won't return for 30 days!
504    ///         // (Assuming the connection doesn't time out first, of course.)
505    ///         pool2.execute("SELECT pg_sleep('30 days')").await?;
506    ///
507    ///         // If the pool is closed before the statement completes, this won't be printed.
508    ///         // This is because `.do_until()` cancels the future it's given if the
509    ///         // pool is closed first.
510    ///         println!("Waited!");
511    ///
512    ///         Ok(())
513    ///     }).await;
514    ///
515    ///     match res {
516    ///         Ok(Ok(())) => println!("Wait succeeded"),
517    ///         Ok(Err(e)) => println!("Error from inside do_until: {e:?}"),
518    ///         Err(e) => println!("Error from do_until: {e:?}"),
519    ///     }
520    /// });
521    ///
522    /// // This normally wouldn't return until the above statement completed and the connection
523    /// // was returned to the pool. However, thanks to `.do_until()`, the operation was
524    /// // cancelled as soon as we called `.close().await`.
525    /// pool.close().await;
526    ///
527    /// # Ok(())
528    /// # }
529    /// ```
530    pub fn close_event(&self) -> CloseEvent {
531        self.0.close_event()
532    }
533
534    /// Returns the number of connections currently active. This includes idle connections.
535    pub fn size(&self) -> u32 {
536        self.0.size()
537    }
538
539    /// Returns the number of connections active and idle (not in use).
540    pub fn num_idle(&self) -> usize {
541        self.0.num_idle()
542    }
543
544    /// Gets a clone of the connection options for this pool
545    pub fn connect_options(&self) -> Arc<<DB::Connection as Connection>::Options> {
546        self.0
547            .connect_options
548            .read()
549            .expect("write-lock holder panicked")
550            .clone()
551    }
552
553    /// Updates the connection options this pool will use when opening any future connections.  Any
554    /// existing open connection in the pool will be left as-is.
555    pub fn set_connect_options(&self, connect_options: <DB::Connection as Connection>::Options) {
556        // technically write() could also panic if the current thread already holds the lock,
557        // but because this method can't be re-entered by the same thread that shouldn't be a problem
558        let mut guard = self
559            .0
560            .connect_options
561            .write()
562            .expect("write-lock holder panicked");
563        *guard = Arc::new(connect_options);
564    }
565
566    /// Get the options for this pool
567    pub fn options(&self) -> &PoolOptions<DB> {
568        &self.0.options
569    }
570}
571
572/// Returns a new [Pool] tied to the same shared connection pool.
573impl<DB: Database> Clone for Pool<DB> {
574    fn clone(&self) -> Self {
575        Self(Arc::clone(&self.0))
576    }
577}
578
579impl<DB: Database> fmt::Debug for Pool<DB> {
580    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
581        fmt.debug_struct("Pool")
582            .field("size", &self.0.size())
583            .field("num_idle", &self.0.num_idle())
584            .field("is_closed", &self.0.is_closed())
585            .field("options", &self.0.options)
586            .finish()
587    }
588}
589
590impl CloseEvent {
591    /// Execute the given future until it returns or the pool is closed.
592    ///
593    /// Cancels the future and returns `Err(PoolClosed)` if/when the pool is closed.
594    /// If the pool was already closed, the future is never run.
595    pub async fn do_until<Fut: Future>(&mut self, fut: Fut) -> Result<Fut::Output, Error> {
596        // Check that the pool wasn't closed already.
597        //
598        // We use `poll_immediate()` as it will use the correct waker instead of
599        // a no-op one like `.now_or_never()`, but it won't actually suspend execution here.
600        futures_util::future::poll_immediate(&mut *self)
601            .await
602            .map_or(Ok(()), |_| Err(Error::PoolClosed))?;
603
604        let mut fut = pin!(fut);
605
606        // I find that this is clearer in intent than `futures_util::future::select()`
607        // or `futures_util::select_biased!{}` (which isn't enabled anyway).
608        std::future::poll_fn(|cx| {
609            // Poll `fut` first as the wakeup event is more likely for it than `self`.
610            if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
611                return Poll::Ready(Ok(ret));
612            }
613
614            // Can't really factor out mapping to `Err(Error::PoolClosed)` though it seems like
615            // we should because that results in a different `Ok` type each time.
616            //
617            // Ideally we'd map to something like `Result<!, Error>` but using `!` as a type
618            // is not allowed on stable Rust yet.
619            self.poll_unpin(cx).map(|_| Err(Error::PoolClosed))
620        })
621        .await
622    }
623}
624
625impl Future for CloseEvent {
626    type Output = ();
627
628    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
629        if let Some(listener) = &mut self.listener {
630            futures_core::ready!(listener.poll_unpin(cx));
631        }
632
633        // `EventListener` doesn't like being polled after it yields, and even if it did it
634        // would probably just wait for the next event, neither of which we want.
635        //
636        // So this way, once we get our close event, we fuse this future to immediately return.
637        self.listener = None;
638
639        Poll::Ready(())
640    }
641}
642
643impl FusedFuture for CloseEvent {
644    fn is_terminated(&self) -> bool {
645        self.listener.is_none()
646    }
647}
648
649/// get the time between the deadline and now and use that as our timeout
650///
651/// returns `Error::PoolTimedOut` if the deadline is in the past
652fn deadline_as_timeout(deadline: Instant) -> Result<Duration, Error> {
653    deadline
654        .checked_duration_since(Instant::now())
655        .ok_or(Error::PoolTimedOut)
656}
657
658#[test]
659#[allow(dead_code)]
660fn assert_pool_traits() {
661    fn assert_send_sync<T: Send + Sync>() {}
662    fn assert_clone<T: Clone>() {}
663
664    fn assert_pool<DB: Database>() {
665        assert_send_sync::<Pool<DB>>();
666        assert_clone::<Pool<DB>>();
667    }
668}