sqlx_core/transaction.rs
1use std::borrow::Cow;
2use std::fmt::{self, Debug, Formatter};
3use std::ops::{Deref, DerefMut};
4
5use futures_core::future::BoxFuture;
6
7use crate::database::Database;
8use crate::error::Error;
9use crate::pool::MaybePoolConnection;
10
11/// Generic management of database transactions.
12///
13/// This trait should not be used, except when implementing [`Connection`].
14#[doc(hidden)]
15pub trait TransactionManager {
16 type Database: Database;
17
18 /// Begin a new transaction or establish a savepoint within the active transaction.
19 ///
20 /// If this is a new transaction, `statement` may be used instead of the
21 /// default "BEGIN" statement.
22 ///
23 /// If we are already inside a transaction and `statement.is_some()`, then
24 /// `Error::InvalidSavePoint` is returned without running any statements.
25 fn begin<'conn>(
26 conn: &'conn mut <Self::Database as Database>::Connection,
27 statement: Option<Cow<'static, str>>,
28 ) -> BoxFuture<'conn, Result<(), Error>>;
29
30 /// Commit the active transaction or release the most recent savepoint.
31 fn commit(
32 conn: &mut <Self::Database as Database>::Connection,
33 ) -> BoxFuture<'_, Result<(), Error>>;
34
35 /// Abort the active transaction or restore from the most recent savepoint.
36 fn rollback(
37 conn: &mut <Self::Database as Database>::Connection,
38 ) -> BoxFuture<'_, Result<(), Error>>;
39
40 /// Starts to abort the active transaction or restore from the most recent snapshot.
41 fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
42
43 /// Returns the current transaction depth.
44 ///
45 /// Transaction depth indicates the level of nested transactions:
46 /// - Level 0: No active transaction.
47 /// - Level 1: A transaction is active.
48 /// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
49 fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
50}
51
52/// An in-progress database transaction or savepoint.
53///
54/// A transaction starts with a call to [`Pool::begin`] or [`Connection::begin`].
55///
56/// A transaction should end with a call to [`commit`] or [`rollback`]. If neither are called
57/// before the transaction goes out-of-scope, [`rollback`] is called. In other
58/// words, [`rollback`] is called on `drop` if the transaction is still in-progress.
59///
60/// A savepoint is a special mark inside a transaction that allows all commands that are
61/// executed after it was established to be rolled back, restoring the transaction state to
62/// what it was at the time of the savepoint.
63///
64/// A transaction can be used as an [`Executor`] when performing queries:
65/// ```rust,no_run
66/// # use sqlx_core::acquire::Acquire;
67/// # async fn example() -> sqlx::Result<()> {
68/// # let id = 1;
69/// # let mut conn: sqlx::PgConnection = unimplemented!();
70/// let mut tx = conn.begin().await?;
71///
72/// let result = sqlx::query("DELETE FROM \"testcases\" WHERE id = $1")
73/// .bind(id)
74/// .execute(&mut *tx)
75/// .await?
76/// .rows_affected();
77///
78/// tx.commit().await
79/// # }
80/// ```
81/// [`Executor`]: crate::executor::Executor
82/// [`Connection::begin`]: crate::connection::Connection::begin()
83/// [`Pool::begin`]: crate::pool::Pool::begin()
84/// [`commit`]: Self::commit()
85/// [`rollback`]: Self::rollback()
86pub struct Transaction<'c, DB>
87where
88 DB: Database,
89{
90 connection: MaybePoolConnection<'c, DB>,
91 open: bool,
92}
93
94impl<'c, DB> Transaction<'c, DB>
95where
96 DB: Database,
97{
98 #[doc(hidden)]
99 pub fn begin(
100 conn: impl Into<MaybePoolConnection<'c, DB>>,
101 statement: Option<Cow<'static, str>>,
102 ) -> BoxFuture<'c, Result<Self, Error>> {
103 let mut conn = conn.into();
104
105 Box::pin(async move {
106 DB::TransactionManager::begin(&mut conn, statement).await?;
107
108 Ok(Self {
109 connection: conn,
110 open: true,
111 })
112 })
113 }
114
115 /// Commits this transaction or savepoint.
116 pub async fn commit(mut self) -> Result<(), Error> {
117 DB::TransactionManager::commit(&mut self.connection).await?;
118 self.open = false;
119
120 Ok(())
121 }
122
123 /// Aborts this transaction or savepoint.
124 pub async fn rollback(mut self) -> Result<(), Error> {
125 DB::TransactionManager::rollback(&mut self.connection).await?;
126 self.open = false;
127
128 Ok(())
129 }
130}
131
132// NOTE: fails to compile due to lack of lazy normalization
133// impl<'c, 't, DB: Database> crate::executor::Executor<'t>
134// for &'t mut crate::transaction::Transaction<'c, DB>
135// where
136// &'c mut DB::Connection: Executor<'c, Database = DB>,
137// {
138// type Database = DB;
139//
140//
141//
142// fn fetch_many<'e, 'q: 'e, E: 'q>(
143// self,
144// query: E,
145// ) -> futures_core::stream::BoxStream<
146// 'e,
147// Result<
148// crate::Either<<DB as crate::database::Database>::QueryResult, DB::Row>,
149// crate::error::Error,
150// >,
151// >
152// where
153// 't: 'e,
154// E: crate::executor::Execute<'q, Self::Database>,
155// {
156// (&mut **self).fetch_many(query)
157// }
158//
159// fn fetch_optional<'e, 'q: 'e, E: 'q>(
160// self,
161// query: E,
162// ) -> futures_core::future::BoxFuture<'e, Result<Option<DB::Row>, crate::error::Error>>
163// where
164// 't: 'e,
165// E: crate::executor::Execute<'q, Self::Database>,
166// {
167// (&mut **self).fetch_optional(query)
168// }
169//
170// fn prepare_with<'e, 'q: 'e>(
171// self,
172// sql: &'q str,
173// parameters: &'e [<Self::Database as crate::database::Database>::TypeInfo],
174// ) -> futures_core::future::BoxFuture<
175// 'e,
176// Result<
177// <Self::Database as crate::database::Database>::Statement<'q>,
178// crate::error::Error,
179// >,
180// >
181// where
182// 't: 'e,
183// {
184// (&mut **self).prepare_with(sql, parameters)
185// }
186//
187// #[doc(hidden)]
188// fn describe<'e, 'q: 'e>(
189// self,
190// query: &'q str,
191// ) -> futures_core::future::BoxFuture<
192// 'e,
193// Result<crate::describe::Describe<Self::Database>, crate::error::Error>,
194// >
195// where
196// 't: 'e,
197// {
198// (&mut **self).describe(query)
199// }
200// }
201
202impl<'c, DB> Debug for Transaction<'c, DB>
203where
204 DB: Database,
205{
206 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
207 // TODO: Show the full type <..<..<..
208 f.debug_struct("Transaction").finish()
209 }
210}
211
212impl<'c, DB> Deref for Transaction<'c, DB>
213where
214 DB: Database,
215{
216 type Target = DB::Connection;
217
218 #[inline]
219 fn deref(&self) -> &Self::Target {
220 &self.connection
221 }
222}
223
224impl<'c, DB> DerefMut for Transaction<'c, DB>
225where
226 DB: Database,
227{
228 #[inline]
229 fn deref_mut(&mut self) -> &mut Self::Target {
230 &mut self.connection
231 }
232}
233
234// Implement `AsMut<DB::Connection>` so `Transaction` can be given to a
235// `PgAdvisoryLockGuard`.
236//
237// See: https://github.com/launchbadge/sqlx/issues/2520
238impl<'c, DB: Database> AsMut<DB::Connection> for Transaction<'c, DB> {
239 fn as_mut(&mut self) -> &mut DB::Connection {
240 &mut self.connection
241 }
242}
243
244impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'c, DB> {
245 type Database = DB;
246
247 type Connection = &'t mut <DB as Database>::Connection;
248
249 #[inline]
250 fn acquire(self) -> BoxFuture<'t, Result<Self::Connection, Error>> {
251 Box::pin(futures_util::future::ok(&mut **self))
252 }
253
254 #[inline]
255 fn begin(self) -> BoxFuture<'t, Result<Transaction<'t, DB>, Error>> {
256 Transaction::begin(&mut **self, None)
257 }
258}
259
260impl<'c, DB> Drop for Transaction<'c, DB>
261where
262 DB: Database,
263{
264 fn drop(&mut self) {
265 if self.open {
266 // starts a rollback operation
267
268 // what this does depends on the database but generally this means we queue a rollback
269 // operation that will happen on the next asynchronous invocation of the underlying
270 // connection (including if the connection is returned to a pool)
271
272 DB::TransactionManager::start_rollback(&mut self.connection);
273 }
274 }
275}
276
277pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
278 if depth == 0 {
279 Cow::Borrowed("BEGIN")
280 } else {
281 Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
282 }
283}
284
285pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
286 if depth == 1 {
287 Cow::Borrowed("COMMIT")
288 } else {
289 Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
290 }
291}
292
293pub fn rollback_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
294 if depth == 1 {
295 Cow::Borrowed("ROLLBACK")
296 } else {
297 Cow::Owned(format!(
298 "ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
299 depth - 1
300 ))
301 }
302}