sqlx_core/
transaction.rs

1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3use std::ops::{Deref, DerefMut};
4
5use futures_core::future::BoxFuture;
6
7use crate::database::Database;
8use crate::error::Error;
9use crate::pool::MaybePoolConnection;
10
11/// Generic management of database transactions.
12///
13/// This trait should not be used, except when implementing [`Connection`].
14#[doc(hidden)]
15pub trait TransactionManager {
16    type Database: Database;
17
18    /// Begin a new transaction or establish a savepoint within the active transaction.
19    ///
20    /// If this is a new transaction, `statement` may be used instead of the
21    /// default "BEGIN" statement.
22    ///
23    /// If we are already inside a transaction and `statement.is_some()`, then
24    /// `Error::InvalidSavePoint` is returned without running any statements.
25    fn begin<'conn>(
26        conn: &'conn mut <Self::Database as Database>::Connection,
27        statement: Option<Cow<'static, str>>,
28    ) -> BoxFuture<'conn, Result<(), Error>>;
29
30    /// Commit the active transaction or release the most recent savepoint.
31    fn commit(
32        conn: &mut <Self::Database as Database>::Connection,
33    ) -> BoxFuture<'_, Result<(), Error>>;
34
35    /// Abort the active transaction or restore from the most recent savepoint.
36    fn rollback(
37        conn: &mut <Self::Database as Database>::Connection,
38    ) -> BoxFuture<'_, Result<(), Error>>;
39
40    /// Starts to abort the active transaction or restore from the most recent snapshot.
41    fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
42
43    /// Returns the current transaction depth.
44    ///
45    /// Transaction depth indicates the level of nested transactions:
46    /// - Level 0: No active transaction.
47    /// - Level 1: A transaction is active.
48    /// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
49    fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
50}
51
52/// An in-progress database transaction or savepoint.
53///
54/// A transaction starts with a call to [`Pool::begin`] or [`Connection::begin`].
55///
56/// A transaction should end with a call to [`commit`] or [`rollback`]. If neither are called
57/// before the transaction goes out-of-scope, [`rollback`] is called. In other
58/// words, [`rollback`] is called on `drop` if the transaction is still in-progress.
59///
60/// A savepoint is a special mark inside a transaction that allows all commands that are
61/// executed after it was established to be rolled back, restoring the transaction state to
62/// what it was at the time of the savepoint.
63///
64/// A transaction can be used as an [`Executor`] when performing queries:
65/// ```rust,no_run
66/// # use sqlx_core::acquire::Acquire;
67/// # async fn example() -> sqlx::Result<()> {
68/// # let id = 1;
69/// # let mut conn: sqlx::PgConnection = unimplemented!();
70/// let mut tx = conn.begin().await?;
71///
72/// let result = sqlx::query("DELETE FROM \"testcases\" WHERE id = $1")
73///     .bind(id)
74///     .execute(&mut *tx)
75///     .await?
76///     .rows_affected();
77///
78/// tx.commit().await
79/// # }
80/// ```
81/// [`Executor`]: crate::executor::Executor
82/// [`Connection::begin`]: crate::connection::Connection::begin()
83/// [`Pool::begin`]: crate::pool::Pool::begin()
84/// [`commit`]: Self::commit()
85/// [`rollback`]: Self::rollback()
86pub struct Transaction<'c, DB>
87where
88    DB: Database,
89{
90    connection: MaybePoolConnection<'c, DB>,
91    open: bool,
92}
93
94impl<'c, DB> Transaction<'c, DB>
95where
96    DB: Database,
97{
98    #[doc(hidden)]
99    pub fn begin(
100        conn: impl Into<MaybePoolConnection<'c, DB>>,
101        statement: Option<Cow<'static, str>>,
102    ) -> BoxFuture<'c, Result<Self, Error>> {
103        let mut conn = conn.into();
104
105        Box::pin(async move {
106            DB::TransactionManager::begin(&mut conn, statement).await?;
107
108            Ok(Self {
109                connection: conn,
110                open: true,
111            })
112        })
113    }
114
115    /// Commits this transaction or savepoint.
116    pub async fn commit(mut self) -> Result<(), Error> {
117        DB::TransactionManager::commit(&mut self.connection).await?;
118        self.open = false;
119
120        Ok(())
121    }
122
123    /// Aborts this transaction or savepoint.
124    pub async fn rollback(mut self) -> Result<(), Error> {
125        DB::TransactionManager::rollback(&mut self.connection).await?;
126        self.open = false;
127
128        Ok(())
129    }
130}
131
132// NOTE: fails to compile due to lack of lazy normalization
133// impl<'c, 't, DB: Database> crate::executor::Executor<'t>
134//     for &'t mut crate::transaction::Transaction<'c, DB>
135// where
136//     &'c mut DB::Connection: Executor<'c, Database = DB>,
137// {
138//     type Database = DB;
139//
140//
141//
142//     fn fetch_many<'e, 'q: 'e, E: 'q>(
143//         self,
144//         query: E,
145//     ) -> futures_core::stream::BoxStream<
146//         'e,
147//         Result<
148//             crate::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
149//             crate::error::Error,
150//         >,
151//     >
152//     where
153//         't: 'e,
154//         E: crate::executor::Execute<'q, Self::Database>,
155//     {
156//         (&mut **self).fetch_many(query)
157//     }
158//
159//     fn fetch_optional<'e, 'q: 'e, E: 'q>(
160//         self,
161//         query: E,
162//     ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
163//     where
164//         't: 'e,
165//         E: crate::executor::Execute<'q, Self::Database>,
166//     {
167//         (&mut **self).fetch_optional(query)
168//     }
169//
170//     fn prepare_with<'e, 'q: 'e>(
171//         self,
172//         sql: &'q str,
173//         parameters: &'e [<Self::Database as crate::database::Database>::TypeInfo],
174//     ) -> futures_core::future::BoxFuture<
175//         'e,
176//         Result<
177//             <Self::Database as crate::database::Database>::Statement<'q>,
178//             crate::error::Error,
179//         >,
180//     >
181//     where
182//         't: 'e,
183//     {
184//         (&mut **self).prepare_with(sql, parameters)
185//     }
186//
187//     #[doc(hidden)]
188//     fn describe<'e, 'q: 'e>(
189//         self,
190//         query: &'q str,
191//     ) -> futures_core::future::BoxFuture<
192//         'e,
193//         Result<crate::describe::Describe<Self::Database>, crate::error::Error>,
194//     >
195//     where
196//         't: 'e,
197//     {
198//         (&mut **self).describe(query)
199//     }
200// }
201
202impl<'c, DB> Debug for Transaction<'c, DB>
203where
204    DB: Database,
205{
206    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
207        // TODO: Show the full type <..<..<..
208        f.debug_struct("Transaction").finish()
209    }
210}
211
212impl<'c, DB> Deref for Transaction<'c, DB>
213where
214    DB: Database,
215{
216    type Target = DB::Connection;
217
218    #[inline]
219    fn deref(&self) -> &Self::Target {
220        &self.connection
221    }
222}
223
224impl<'c, DB> DerefMut for Transaction<'c, DB>
225where
226    DB: Database,
227{
228    #[inline]
229    fn deref_mut(&mut self) -> &mut Self::Target {
230        &mut self.connection
231    }
232}
233
234// Implement `AsMut<DB::Connection>` so `Transaction` can be given to a
235// `PgAdvisoryLockGuard`.
236//
237// See: https://github.com/launchbadge/sqlx/issues/2520
238impl<'c, DB: Database> AsMut<DB::Connection> for Transaction<'c, DB> {
239    fn as_mut(&mut self) -> &mut DB::Connection {
240        &mut self.connection
241    }
242}
243
244impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'c, DB> {
245    type Database = DB;
246
247    type Connection = &'t mut <DB as Database>::Connection;
248
249    #[inline]
250    fn acquire(self) -> BoxFuture<'t, Result<Self::Connection, Error>> {
251        Box::pin(futures_util::future::ok(&mut **self))
252    }
253
254    #[inline]
255    fn begin(self) -> BoxFuture<'t, Result<Transaction<'t, DB>, Error>> {
256        Transaction::begin(&mut **self, None)
257    }
258}
259
260impl<'c, DB> Drop for Transaction<'c, DB>
261where
262    DB: Database,
263{
264    fn drop(&mut self) {
265        if self.open {
266            // starts a rollback operation
267
268            // what this does depends on the database but generally this means we queue a rollback
269            // operation that will happen on the next asynchronous invocation of the underlying
270            // connection (including if the connection is returned to a pool)
271
272            DB::TransactionManager::start_rollback(&mut self.connection);
273        }
274    }
275}
276
277pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
278    if depth == 0 {
279        Cow::Borrowed("BEGIN")
280    } else {
281        Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
282    }
283}
284
285pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
286    if depth == 1 {
287        Cow::Borrowed("COMMIT")
288    } else {
289        Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
290    }
291}
292
293pub fn rollback_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
294    if depth == 1 {
295        Cow::Borrowed("ROLLBACK")
296    } else {
297        Cow::Owned(format!(
298            "ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
299            depth - 1
300        ))
301    }
302}