1use std::str::FromStr;
2use std::time::Duration;
3use std::time::Instant;
4
5use futures_core::future::BoxFuture;
6
7pub(crate) use sqlx_core::migrate::MigrateError;
8pub(crate) use sqlx_core::migrate::{AppliedMigration, Migration};
9pub(crate) use sqlx_core::migrate::{Migrate, MigrateDatabase};
10
11use crate::connection::{ConnectOptions, Connection};
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::query::query;
15use crate::query_as::query_as;
16use crate::query_scalar::query_scalar;
17use crate::{PgConnectOptions, PgConnection, Postgres};
18
19fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
20 let mut options = PgConnectOptions::from_str(url)?;
21
22 let database = options
24 .database
25 .as_deref()
26 .unwrap_or(&options.username)
27 .to_owned();
28
29 options.database = if database == "postgres" {
33 Some("template1".into())
34 } else {
35 Some("postgres".into())
36 };
37
38 Ok((options, database))
39}
40
41impl MigrateDatabase for Postgres {
42 fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
43 Box::pin(async move {
44 let (options, database) = parse_for_maintenance(url)?;
45 let mut conn = options.connect().await?;
46
47 let _ = conn
48 .execute(&*format!(
49 "CREATE DATABASE \"{}\"",
50 database.replace('"', "\"\"")
51 ))
52 .await?;
53
54 Ok(())
55 })
56 }
57
58 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
59 Box::pin(async move {
60 let (options, database) = parse_for_maintenance(url)?;
61 let mut conn = options.connect().await?;
62
63 let exists: bool =
64 query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
65 .bind(database)
66 .fetch_one(&mut conn)
67 .await?;
68
69 Ok(exists)
70 })
71 }
72
73 fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
74 Box::pin(async move {
75 let (options, database) = parse_for_maintenance(url)?;
76 let mut conn = options.connect().await?;
77
78 let _ = conn
79 .execute(&*format!(
80 "DROP DATABASE IF EXISTS \"{}\"",
81 database.replace('"', "\"\"")
82 ))
83 .await?;
84
85 Ok(())
86 })
87 }
88
89 fn force_drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
90 Box::pin(async move {
91 let (options, database) = parse_for_maintenance(url)?;
92 let mut conn = options.connect().await?;
93
94 let row: (String,) = query_as("SELECT current_setting('server_version_num')")
95 .fetch_one(&mut conn)
96 .await?;
97
98 let version = row.0.parse::<i32>().unwrap();
99
100 let pid_type = if version >= 90200 { "pid" } else { "procpid" };
101
102 conn.execute(&*format!(
103 "SELECT pg_terminate_backend(pg_stat_activity.{pid_type}) FROM pg_stat_activity \
104 WHERE pg_stat_activity.datname = '{database}' AND {pid_type} <> pg_backend_pid()"
105 ))
106 .await?;
107
108 Self::drop_database(url).await
109 })
110 }
111}
112
113impl Migrate for PgConnection {
114 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
115 Box::pin(async move {
116 self.execute(
118 r#"
119CREATE TABLE IF NOT EXISTS _sqlx_migrations (
120 version BIGINT PRIMARY KEY,
121 description TEXT NOT NULL,
122 installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
123 success BOOLEAN NOT NULL,
124 checksum BYTEA NOT NULL,
125 execution_time BIGINT NOT NULL
126);
127 "#,
128 )
129 .await?;
130
131 Ok(())
132 })
133 }
134
135 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
136 Box::pin(async move {
137 let row: Option<(i64,)> = query_as(
139 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
140 )
141 .fetch_optional(self)
142 .await?;
143
144 Ok(row.map(|r| r.0))
145 })
146 }
147
148 fn list_applied_migrations(
149 &mut self,
150 ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
151 Box::pin(async move {
152 let rows: Vec<(i64, Vec<u8>)> =
154 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
155 .fetch_all(self)
156 .await?;
157
158 let migrations = rows
159 .into_iter()
160 .map(|(version, checksum)| AppliedMigration {
161 version,
162 checksum: checksum.into(),
163 })
164 .collect();
165
166 Ok(migrations)
167 })
168 }
169
170 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
171 Box::pin(async move {
172 let database_name = current_database(self).await?;
173 let lock_id = generate_lock_id(&database_name);
174
175 let _ = query("SELECT pg_advisory_lock($1)")
183 .bind(lock_id)
184 .execute(self)
185 .await?;
186
187 Ok(())
188 })
189 }
190
191 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
192 Box::pin(async move {
193 let database_name = current_database(self).await?;
194 let lock_id = generate_lock_id(&database_name);
195
196 let _ = query("SELECT pg_advisory_unlock($1)")
198 .bind(lock_id)
199 .execute(self)
200 .await?;
201
202 Ok(())
203 })
204 }
205
206 fn apply<'e: 'm, 'm>(
207 &'e mut self,
208 migration: &'m Migration,
209 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
210 Box::pin(async move {
211 let start = Instant::now();
212
213 if migration.no_tx {
215 execute_migration(self, migration).await?;
216 } else {
217 let mut tx = self.begin().await?;
223 execute_migration(&mut tx, migration).await?;
224 tx.commit().await?;
225 }
226
227 let elapsed = start.elapsed();
231
232 #[allow(clippy::cast_possible_truncation)]
234 let _ = query(
235 r#"
236 UPDATE _sqlx_migrations
237 SET execution_time = $1
238 WHERE version = $2
239 "#,
240 )
241 .bind(elapsed.as_nanos() as i64)
242 .bind(migration.version)
243 .execute(self)
244 .await?;
245
246 Ok(elapsed)
247 })
248 }
249
250 fn revert<'e: 'm, 'm>(
251 &'e mut self,
252 migration: &'m Migration,
253 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
254 Box::pin(async move {
255 let start = Instant::now();
256
257 if migration.no_tx {
259 revert_migration(self, migration).await?;
260 } else {
261 let mut tx = self.begin().await?;
264 revert_migration(&mut tx, migration).await?;
265 tx.commit().await?;
266 }
267
268 let elapsed = start.elapsed();
269
270 Ok(elapsed)
271 })
272 }
273}
274
275async fn execute_migration(
276 conn: &mut PgConnection,
277 migration: &Migration,
278) -> Result<(), MigrateError> {
279 let _ = conn
280 .execute(&*migration.sql)
281 .await
282 .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
283
284 let _ = query(
286 r#"
287 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
288 VALUES ( $1, $2, TRUE, $3, -1 )
289 "#,
290 )
291 .bind(migration.version)
292 .bind(&*migration.description)
293 .bind(&*migration.checksum)
294 .execute(conn)
295 .await?;
296
297 Ok(())
298}
299
300async fn revert_migration(
301 conn: &mut PgConnection,
302 migration: &Migration,
303) -> Result<(), MigrateError> {
304 let _ = conn
305 .execute(&*migration.sql)
306 .await
307 .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
308
309 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
311 .bind(migration.version)
312 .execute(conn)
313 .await?;
314
315 Ok(())
316}
317
318async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
319 Ok(query_scalar("SELECT current_database()")
321 .fetch_one(conn)
322 .await?)
323}
324
325fn generate_lock_id(database_name: &str) -> i64 {
327 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
328 0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
330}