sqlx_postgres/testing/
mod.rs1use std::fmt::Write;
2use std::ops::Deref;
3use std::str::FromStr;
4use std::time::Duration;
5
6use futures_core::future::BoxFuture;
7
8use once_cell::sync::OnceCell;
9use sqlx_core::connection::Connection;
10use sqlx_core::query_scalar::query_scalar;
11
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::pool::{Pool, PoolOptions};
15use crate::query::query;
16use crate::{PgConnectOptions, PgConnection, Postgres};
17
18pub(crate) use sqlx_core::testing::*;
19
20static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
22impl TestSupport for Postgres {
25 fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
26 Box::pin(async move { test_context(args).await })
27 }
28
29 fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
30 Box::pin(async move {
31 let mut conn = MASTER_POOL
32 .get()
33 .expect("cleanup_test() invoked outside `#[sqlx::test]`")
34 .acquire()
35 .await?;
36
37 do_cleanup(&mut conn, db_name).await
38 })
39 }
40
41 fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
42 Box::pin(async move {
43 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
44
45 let mut conn = PgConnection::connect(&url).await?;
46
47 let delete_db_names: Vec<String> =
48 query_scalar("select db_name from _sqlx_test.databases")
49 .fetch_all(&mut conn)
50 .await?;
51
52 if delete_db_names.is_empty() {
53 return Ok(None);
54 }
55
56 let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
57
58 let mut command = String::new();
59
60 for db_name in &delete_db_names {
61 command.clear();
62 writeln!(command, "drop database if exists {db_name:?};").ok();
63 match conn.execute(&*command).await {
64 Ok(_deleted) => {
65 deleted_db_names.push(db_name);
66 }
67 Err(Error::Database(dbe)) => {
69 eprintln!("could not clean test database {db_name:?}: {dbe}")
70 }
71 Err(e) => return Err(e),
73 }
74 }
75
76 query("delete from _sqlx_test.databases where db_name = any($1::text[])")
77 .bind(&deleted_db_names)
78 .execute(&mut conn)
79 .await?;
80
81 let _ = conn.close().await;
82 Ok(Some(delete_db_names.len()))
83 })
84 }
85
86 fn snapshot(
87 _conn: &mut Self::Connection,
88 ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
89 todo!()
92 }
93}
94
95async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
96 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
97
98 let master_opts = PgConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
99
100 let pool = PoolOptions::new()
101 .max_connections(20)
105 .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
107 .connect_lazy_with(master_opts);
108
109 let master_pool = match MASTER_POOL.try_insert(pool) {
110 Ok(inserted) => inserted,
111 Err((existing, pool)) => {
112 assert_eq!(
114 existing.connect_options().host,
115 pool.connect_options().host,
116 "DATABASE_URL changed at runtime, host differs"
117 );
118
119 assert_eq!(
120 existing.connect_options().database,
121 pool.connect_options().database,
122 "DATABASE_URL changed at runtime, database differs"
123 );
124
125 existing
126 }
127 };
128
129 let mut conn = master_pool.acquire().await?;
130
131 conn.execute(
133 r#"
139 select pg_advisory_xact_lock(8318549251334697844);
140
141 create schema if not exists _sqlx_test;
142
143 create table if not exists _sqlx_test.databases (
144 db_name text primary key,
145 test_path text not null,
146 created_at timestamptz not null default now()
147 );
148
149 create index if not exists databases_created_at
150 on _sqlx_test.databases(created_at);
151
152 create sequence if not exists _sqlx_test.database_ids;
153 "#,
154 )
155 .await?;
156
157 let db_name = Postgres::db_name(args);
158 do_cleanup(&mut conn, &db_name).await?;
159
160 query(
161 r#"
162 insert into _sqlx_test.databases(db_name, test_path) values ($1, $2)
163 "#,
164 )
165 .bind(&db_name)
166 .bind(args.test_path)
167 .execute(&mut *conn)
168 .await?;
169
170 let create_command = format!("create database {db_name:?}");
171 debug_assert!(create_command.starts_with("create database \""));
172 conn.execute(&(create_command)[..]).await?;
173
174 Ok(TestContext {
175 pool_opts: PoolOptions::new()
176 .max_connections(5)
180 .idle_timeout(Some(Duration::from_secs(1)))
182 .parent(master_pool.clone()),
183 connect_opts: master_pool
184 .connect_options()
185 .deref()
186 .clone()
187 .database(&db_name),
188 db_name,
189 })
190}
191
192async fn do_cleanup(conn: &mut PgConnection, db_name: &str) -> Result<(), Error> {
193 let delete_db_command = format!("drop database if exists {db_name:?};");
194 conn.execute(&*delete_db_command).await?;
195 query("delete from _sqlx_test.databases where db_name = $1::text")
196 .bind(db_name)
197 .execute(&mut *conn)
198 .await?;
199
200 Ok(())
201}