sqlx_postgres/
any.rs

1use crate::{
2    Either, PgColumn, PgConnectOptions, PgConnection, PgQueryResult, PgRow, PgTransactionManager,
3    PgTypeInfo, Postgres,
4};
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
8use std::borrow::Cow;
9use std::{future, pin::pin};
10
11use sqlx_core::any::{
12    Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
13    AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
14};
15
16use crate::type_info::PgType;
17use sqlx_core::connection::Connection;
18use sqlx_core::database::Database;
19use sqlx_core::describe::Describe;
20use sqlx_core::executor::Executor;
21use sqlx_core::ext::ustr::UStr;
22use sqlx_core::transaction::TransactionManager;
23
24sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Postgres);
25
26impl AnyConnectionBackend for PgConnection {
27    fn name(&self) -> &str {
28        <Postgres as Database>::NAME
29    }
30
31    fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
32        Connection::close(*self)
33    }
34
35    fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
36        Connection::close_hard(*self)
37    }
38
39    fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
40        Connection::ping(self)
41    }
42
43    fn begin(
44        &mut self,
45        statement: Option<Cow<'static, str>>,
46    ) -> BoxFuture<'_, sqlx_core::Result<()>> {
47        PgTransactionManager::begin(self, statement)
48    }
49
50    fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
51        PgTransactionManager::commit(self)
52    }
53
54    fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
55        PgTransactionManager::rollback(self)
56    }
57
58    fn start_rollback(&mut self) {
59        PgTransactionManager::start_rollback(self)
60    }
61
62    fn get_transaction_depth(&self) -> usize {
63        PgTransactionManager::get_transaction_depth(self)
64    }
65
66    fn shrink_buffers(&mut self) {
67        Connection::shrink_buffers(self);
68    }
69
70    fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
71        Connection::flush(self)
72    }
73
74    fn should_flush(&self) -> bool {
75        Connection::should_flush(self)
76    }
77
78    #[cfg(feature = "migrate")]
79    fn as_migrate(
80        &mut self,
81    ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
82        Ok(self)
83    }
84
85    fn fetch_many<'q>(
86        &'q mut self,
87        query: &'q str,
88        persistent: bool,
89        arguments: Option<AnyArguments<'q>>,
90    ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
91        let persistent = persistent && arguments.is_some();
92        let arguments = match arguments.as_ref().map(AnyArguments::convert_to).transpose() {
93            Ok(arguments) => arguments,
94            Err(error) => {
95                return stream::once(future::ready(Err(sqlx_core::Error::Encode(error)))).boxed()
96            }
97        };
98
99        Box::pin(
100            self.run(query, arguments, persistent, None)
101                .try_flatten_stream()
102                .map(
103                    move |res: sqlx_core::Result<Either<PgQueryResult, PgRow>>| match res? {
104                        Either::Left(result) => Ok(Either::Left(map_result(result))),
105                        Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
106                    },
107                ),
108        )
109    }
110
111    fn fetch_optional<'q>(
112        &'q mut self,
113        query: &'q str,
114        persistent: bool,
115        arguments: Option<AnyArguments<'q>>,
116    ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
117        let persistent = persistent && arguments.is_some();
118        let arguments = arguments
119            .as_ref()
120            .map(AnyArguments::convert_to)
121            .transpose()
122            .map_err(sqlx_core::Error::Encode);
123
124        Box::pin(async move {
125            let arguments = arguments?;
126            let mut stream = pin!(self.run(query, arguments, persistent, None).await?);
127
128            if let Some(Either::Right(row)) = stream.try_next().await? {
129                return Ok(Some(AnyRow::try_from(&row)?));
130            }
131
132            Ok(None)
133        })
134    }
135
136    fn prepare_with<'c, 'q: 'c>(
137        &'c mut self,
138        sql: &'q str,
139        _parameters: &[AnyTypeInfo],
140    ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
141        Box::pin(async move {
142            let statement = Executor::prepare_with(self, sql, &[]).await?;
143            AnyStatement::try_from_statement(
144                sql,
145                &statement,
146                statement.metadata.column_names.clone(),
147            )
148        })
149    }
150
151    fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
152        Box::pin(async move {
153            let describe = Executor::describe(self, sql).await?;
154
155            let columns = describe
156                .columns
157                .iter()
158                .map(AnyColumn::try_from)
159                .collect::<Result<Vec<_>, _>>()?;
160
161            let parameters = match describe.parameters {
162                Some(Either::Left(parameters)) => Some(Either::Left(
163                    parameters
164                        .iter()
165                        .enumerate()
166                        .map(|(i, type_info)| {
167                            AnyTypeInfo::try_from(type_info).map_err(|_| {
168                                sqlx_core::Error::AnyDriverError(
169                                    format!(
170                                        "Any driver does not support type {type_info} of parameter {i}"
171                                    )
172                                    .into(),
173                                )
174                            })
175                        })
176                        .collect::<Result<Vec<_>, _>>()?,
177                )),
178                Some(Either::Right(count)) => Some(Either::Right(count)),
179                None => None,
180            };
181
182            Ok(Describe {
183                columns,
184                parameters,
185                nullable: describe.nullable,
186            })
187        })
188    }
189}
190
191impl<'a> TryFrom<&'a PgTypeInfo> for AnyTypeInfo {
192    type Error = sqlx_core::Error;
193
194    fn try_from(pg_type: &'a PgTypeInfo) -> Result<Self, Self::Error> {
195        Ok(AnyTypeInfo {
196            kind: match &pg_type.0 {
197                PgType::Bool => AnyTypeInfoKind::Bool,
198                PgType::Void => AnyTypeInfoKind::Null,
199                PgType::Int2 => AnyTypeInfoKind::SmallInt,
200                PgType::Int4 => AnyTypeInfoKind::Integer,
201                PgType::Int8 => AnyTypeInfoKind::BigInt,
202                PgType::Float4 => AnyTypeInfoKind::Real,
203                PgType::Float8 => AnyTypeInfoKind::Double,
204                PgType::Bytea => AnyTypeInfoKind::Blob,
205                PgType::Text | PgType::Varchar => AnyTypeInfoKind::Text,
206                PgType::DeclareWithName(UStr::Static("citext")) => AnyTypeInfoKind::Text,
207                _ => {
208                    return Err(sqlx_core::Error::AnyDriverError(
209                        format!("Any driver does not support the Postgres type {pg_type:?}").into(),
210                    ))
211                }
212            },
213        })
214    }
215}
216
217impl<'a> TryFrom<&'a PgColumn> for AnyColumn {
218    type Error = sqlx_core::Error;
219
220    fn try_from(col: &'a PgColumn) -> Result<Self, Self::Error> {
221        let type_info =
222            AnyTypeInfo::try_from(&col.type_info).map_err(|e| sqlx_core::Error::ColumnDecode {
223                index: col.name.to_string(),
224                source: e.into(),
225            })?;
226
227        Ok(AnyColumn {
228            ordinal: col.ordinal,
229            name: col.name.clone(),
230            type_info,
231        })
232    }
233}
234
235impl<'a> TryFrom<&'a PgRow> for AnyRow {
236    type Error = sqlx_core::Error;
237
238    fn try_from(row: &'a PgRow) -> Result<Self, Self::Error> {
239        AnyRow::map_from(row, row.metadata.column_names.clone())
240    }
241}
242
243impl<'a> TryFrom<&'a AnyConnectOptions> for PgConnectOptions {
244    type Error = sqlx_core::Error;
245
246    fn try_from(value: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
247        let mut opts = PgConnectOptions::parse_from_url(&value.database_url)?;
248        opts.log_settings = value.log_settings.clone();
249        Ok(opts)
250    }
251}
252
253fn map_result(res: PgQueryResult) -> AnyQueryResult {
254    AnyQueryResult {
255        rows_affected: res.rows_affected(),
256        last_insert_id: None,
257    }
258}