sqlx_postgres/
migrate.rs

1use std::str::FromStr;
2use std::time::Duration;
3use std::time::Instant;
4
5use futures_core::future::BoxFuture;
6
7pub(crate) use sqlx_core::migrate::MigrateError;
8pub(crate) use sqlx_core::migrate::{AppliedMigration, Migration};
9pub(crate) use sqlx_core::migrate::{Migrate, MigrateDatabase};
10
11use crate::connection::{ConnectOptions, Connection};
12use crate::error::Error;
13use crate::executor::Executor;
14use crate::query::query;
15use crate::query_as::query_as;
16use crate::query_scalar::query_scalar;
17use crate::{PgConnectOptions, PgConnection, Postgres};
18
19fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
20    let mut options = PgConnectOptions::from_str(url)?;
21
22    // pull out the name of the database to create
23    let database = options
24        .database
25        .as_deref()
26        .unwrap_or(&options.username)
27        .to_owned();
28
29    // switch us to the maintenance database
30    // use `postgres` _unless_ the database is postgres, in which case, use `template1`
31    // this matches the behavior of the `createdb` util
32    options.database = if database == "postgres" {
33        Some("template1".into())
34    } else {
35        Some("postgres".into())
36    };
37
38    Ok((options, database))
39}
40
41impl MigrateDatabase for Postgres {
42    fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
43        Box::pin(async move {
44            let (options, database) = parse_for_maintenance(url)?;
45            let mut conn = options.connect().await?;
46
47            let _ = conn
48                .execute(&*format!(
49                    "CREATE DATABASE \"{}\"",
50                    database.replace('"', "\"\"")
51                ))
52                .await?;
53
54            Ok(())
55        })
56    }
57
58    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
59        Box::pin(async move {
60            let (options, database) = parse_for_maintenance(url)?;
61            let mut conn = options.connect().await?;
62
63            let exists: bool =
64                query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
65                    .bind(database)
66                    .fetch_one(&mut conn)
67                    .await?;
68
69            Ok(exists)
70        })
71    }
72
73    fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
74        Box::pin(async move {
75            let (options, database) = parse_for_maintenance(url)?;
76            let mut conn = options.connect().await?;
77
78            let _ = conn
79                .execute(&*format!(
80                    "DROP DATABASE IF EXISTS \"{}\"",
81                    database.replace('"', "\"\"")
82                ))
83                .await?;
84
85            Ok(())
86        })
87    }
88
89    fn force_drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
90        Box::pin(async move {
91            let (options, database) = parse_for_maintenance(url)?;
92            let mut conn = options.connect().await?;
93
94            let row: (String,) = query_as("SELECT current_setting('server_version_num')")
95                .fetch_one(&mut conn)
96                .await?;
97
98            let version = row.0.parse::<i32>().unwrap();
99
100            let pid_type = if version >= 90200 { "pid" } else { "procpid" };
101
102            conn.execute(&*format!(
103                "SELECT pg_terminate_backend(pg_stat_activity.{pid_type}) FROM pg_stat_activity \
104                 WHERE pg_stat_activity.datname = '{database}' AND {pid_type} <> pg_backend_pid()"
105            ))
106            .await?;
107
108            Self::drop_database(url).await
109        })
110    }
111}
112
113impl Migrate for PgConnection {
114    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
115        Box::pin(async move {
116            // language=SQL
117            self.execute(
118                r#"
119CREATE TABLE IF NOT EXISTS _sqlx_migrations (
120    version BIGINT PRIMARY KEY,
121    description TEXT NOT NULL,
122    installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
123    success BOOLEAN NOT NULL,
124    checksum BYTEA NOT NULL,
125    execution_time BIGINT NOT NULL
126);
127                "#,
128            )
129            .await?;
130
131            Ok(())
132        })
133    }
134
135    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
136        Box::pin(async move {
137            // language=SQL
138            let row: Option<(i64,)> = query_as(
139                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
140            )
141            .fetch_optional(self)
142            .await?;
143
144            Ok(row.map(|r| r.0))
145        })
146    }
147
148    fn list_applied_migrations(
149        &mut self,
150    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
151        Box::pin(async move {
152            // language=SQL
153            let rows: Vec<(i64, Vec<u8>)> =
154                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
155                    .fetch_all(self)
156                    .await?;
157
158            let migrations = rows
159                .into_iter()
160                .map(|(version, checksum)| AppliedMigration {
161                    version,
162                    checksum: checksum.into(),
163                })
164                .collect();
165
166            Ok(migrations)
167        })
168    }
169
170    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
171        Box::pin(async move {
172            let database_name = current_database(self).await?;
173            let lock_id = generate_lock_id(&database_name);
174
175            // create an application lock over the database
176            // this function will not return until the lock is acquired
177
178            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
179            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
180
181            // language=SQL
182            let _ = query("SELECT pg_advisory_lock($1)")
183                .bind(lock_id)
184                .execute(self)
185                .await?;
186
187            Ok(())
188        })
189    }
190
191    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
192        Box::pin(async move {
193            let database_name = current_database(self).await?;
194            let lock_id = generate_lock_id(&database_name);
195
196            // language=SQL
197            let _ = query("SELECT pg_advisory_unlock($1)")
198                .bind(lock_id)
199                .execute(self)
200                .await?;
201
202            Ok(())
203        })
204    }
205
206    fn apply<'e: 'm, 'm>(
207        &'e mut self,
208        migration: &'m Migration,
209    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
210        Box::pin(async move {
211            let start = Instant::now();
212
213            // execute migration queries
214            if migration.no_tx {
215                execute_migration(self, migration).await?;
216            } else {
217                // Use a single transaction for the actual migration script and the essential bookeeping so we never
218                // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
219                // The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
220                // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
221                // and update it once the actual transaction completed.
222                let mut tx = self.begin().await?;
223                execute_migration(&mut tx, migration).await?;
224                tx.commit().await?;
225            }
226
227            // Update `elapsed_time`.
228            // NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
229            //       this small risk since this value is not super important.
230            let elapsed = start.elapsed();
231
232            // language=SQL
233            #[allow(clippy::cast_possible_truncation)]
234            let _ = query(
235                r#"
236    UPDATE _sqlx_migrations
237    SET execution_time = $1
238    WHERE version = $2
239                "#,
240            )
241            .bind(elapsed.as_nanos() as i64)
242            .bind(migration.version)
243            .execute(self)
244            .await?;
245
246            Ok(elapsed)
247        })
248    }
249
250    fn revert<'e: 'm, 'm>(
251        &'e mut self,
252        migration: &'m Migration,
253    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
254        Box::pin(async move {
255            let start = Instant::now();
256
257            // execute migration queries
258            if migration.no_tx {
259                revert_migration(self, migration).await?;
260            } else {
261                // Use a single transaction for the actual migration script and the essential bookeeping so we never
262                // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
263                let mut tx = self.begin().await?;
264                revert_migration(&mut tx, migration).await?;
265                tx.commit().await?;
266            }
267
268            let elapsed = start.elapsed();
269
270            Ok(elapsed)
271        })
272    }
273}
274
275async fn execute_migration(
276    conn: &mut PgConnection,
277    migration: &Migration,
278) -> Result<(), MigrateError> {
279    let _ = conn
280        .execute(&*migration.sql)
281        .await
282        .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
283
284    // language=SQL
285    let _ = query(
286        r#"
287    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
288    VALUES ( $1, $2, TRUE, $3, -1 )
289                "#,
290    )
291    .bind(migration.version)
292    .bind(&*migration.description)
293    .bind(&*migration.checksum)
294    .execute(conn)
295    .await?;
296
297    Ok(())
298}
299
300async fn revert_migration(
301    conn: &mut PgConnection,
302    migration: &Migration,
303) -> Result<(), MigrateError> {
304    let _ = conn
305        .execute(&*migration.sql)
306        .await
307        .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
308
309    // language=SQL
310    let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
311        .bind(migration.version)
312        .execute(conn)
313        .await?;
314
315    Ok(())
316}
317
318async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
319    // language=SQL
320    Ok(query_scalar("SELECT current_database()")
321        .fetch_one(conn)
322        .await?)
323}
324
325// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
326fn generate_lock_id(database_name: &str) -> i64 {
327    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
328    // 0x3d32ad9e chosen by fair dice roll
329    0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
330}