sqlx_postgres/testing/
mod.rs

1use std::fmt::Write;
2use std::ops::Deref;
3use std::str::FromStr;
4use std::time::Duration;
5
6use futures_core::future::BoxFuture;
7
8use once_cell::sync::OnceCell;
9use sqlx_core::connection::Connection;
10use sqlx_core::query_scalar::query_scalar;
11
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::pool::{Pool, PoolOptions};
15use crate::query::query;
16use crate::{PgConnectOptions, PgConnection, Postgres};
17
18pub(crate) use sqlx_core::testing::*;
19
20// Using a blocking `OnceCell` here because the critical sections are short.
21static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
22// Automatically delete any databases created before the start of the test binary.
23
24impl TestSupport for Postgres {
25    fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
26        Box::pin(async move { test_context(args).await })
27    }
28
29    fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
30        Box::pin(async move {
31            let mut conn = MASTER_POOL
32                .get()
33                .expect("cleanup_test() invoked outside `#[sqlx::test]`")
34                .acquire()
35                .await?;
36
37            do_cleanup(&mut conn, db_name).await
38        })
39    }
40
41    fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
42        Box::pin(async move {
43            let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
44
45            let mut conn = PgConnection::connect(&url).await?;
46
47            let delete_db_names: Vec<String> =
48                query_scalar("select db_name from _sqlx_test.databases")
49                    .fetch_all(&mut conn)
50                    .await?;
51
52            if delete_db_names.is_empty() {
53                return Ok(None);
54            }
55
56            let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
57
58            let mut command = String::new();
59
60            for db_name in &delete_db_names {
61                command.clear();
62                writeln!(command, "drop database if exists {db_name:?};").ok();
63                match conn.execute(&*command).await {
64                    Ok(_deleted) => {
65                        deleted_db_names.push(db_name);
66                    }
67                    // Assume a database error just means the DB is still in use.
68                    Err(Error::Database(dbe)) => {
69                        eprintln!("could not clean test database {db_name:?}: {dbe}")
70                    }
71                    // Bubble up other errors
72                    Err(e) => return Err(e),
73                }
74            }
75
76            query("delete from _sqlx_test.databases where db_name = any($1::text[])")
77                .bind(&deleted_db_names)
78                .execute(&mut conn)
79                .await?;
80
81            let _ = conn.close().await;
82            Ok(Some(delete_db_names.len()))
83        })
84    }
85
86    fn snapshot(
87        _conn: &mut Self::Connection,
88    ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
89        // TODO: I want to get the testing feature out the door so this will have to wait,
90        // but I'm keeping the code around for now because I plan to come back to it.
91        todo!()
92    }
93}
94
95async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
96    let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
97
98    let master_opts = PgConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
99
100    let pool = PoolOptions::new()
101        // Postgres' normal connection limit is 100 plus 3 superuser connections
102        // We don't want to use the whole cap and there may be fuzziness here due to
103        // concurrently running tests anyway.
104        .max_connections(20)
105        // Immediately close master connections. Tokio's I/O streams don't like hopping runtimes.
106        .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
107        .connect_lazy_with(master_opts);
108
109    let master_pool = match MASTER_POOL.try_insert(pool) {
110        Ok(inserted) => inserted,
111        Err((existing, pool)) => {
112            // Sanity checks.
113            assert_eq!(
114                existing.connect_options().host,
115                pool.connect_options().host,
116                "DATABASE_URL changed at runtime, host differs"
117            );
118
119            assert_eq!(
120                existing.connect_options().database,
121                pool.connect_options().database,
122                "DATABASE_URL changed at runtime, database differs"
123            );
124
125            existing
126        }
127    };
128
129    let mut conn = master_pool.acquire().await?;
130
131    // language=PostgreSQL
132    conn.execute(
133        // Explicit lock avoids this latent bug: https://stackoverflow.com/a/29908840
134        // I couldn't find a bug on the mailing list for `CREATE SCHEMA` specifically,
135        // but a clearly related bug with `CREATE TABLE` has been known since 2007:
136        // https://www.postgresql.org/message-id/200710222037.l9MKbCJZ098744%40wwwmaster.postgresql.org
137        // magic constant 8318549251334697844 is just 8 ascii bytes 'sqlxtest'.
138        r#"
139        select pg_advisory_xact_lock(8318549251334697844);
140
141        create schema if not exists _sqlx_test;
142
143        create table if not exists _sqlx_test.databases (
144            db_name text primary key,
145            test_path text not null,
146            created_at timestamptz not null default now()
147        );
148
149        create index if not exists databases_created_at 
150            on _sqlx_test.databases(created_at);
151
152        create sequence if not exists _sqlx_test.database_ids;
153    "#,
154    )
155    .await?;
156
157    let db_name = Postgres::db_name(args);
158    do_cleanup(&mut conn, &db_name).await?;
159
160    query(
161        r#"
162            insert into _sqlx_test.databases(db_name, test_path) values ($1, $2)
163        "#,
164    )
165    .bind(&db_name)
166    .bind(args.test_path)
167    .execute(&mut *conn)
168    .await?;
169
170    let create_command = format!("create database {db_name:?}");
171    debug_assert!(create_command.starts_with("create database \""));
172    conn.execute(&(create_command)[..]).await?;
173
174    Ok(TestContext {
175        pool_opts: PoolOptions::new()
176            // Don't allow a single test to take all the connections.
177            // Most tests shouldn't require more than 5 connections concurrently,
178            // or else they're likely doing too much in one test.
179            .max_connections(5)
180            // Close connections ASAP if left in the idle queue.
181            .idle_timeout(Some(Duration::from_secs(1)))
182            .parent(master_pool.clone()),
183        connect_opts: master_pool
184            .connect_options()
185            .deref()
186            .clone()
187            .database(&db_name),
188        db_name,
189    })
190}
191
192async fn do_cleanup(conn: &mut PgConnection, db_name: &str) -> Result<(), Error> {
193    let delete_db_command = format!("drop database if exists {db_name:?};");
194    conn.execute(&*delete_db_command).await?;
195    query("delete from _sqlx_test.databases where db_name = $1::text")
196        .bind(db_name)
197        .execute(&mut *conn)
198        .await?;
199
200    Ok(())
201}