1use crate::{
2 Either, PgColumn, PgConnectOptions, PgConnection, PgQueryResult, PgRow, PgTransactionManager,
3 PgTypeInfo, Postgres,
4};
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
8use std::borrow::Cow;
9use std::{future, pin::pin};
10
11use sqlx_core::any::{
12 Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
13 AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
14};
15
16use crate::type_info::PgType;
17use sqlx_core::connection::Connection;
18use sqlx_core::database::Database;
19use sqlx_core::describe::Describe;
20use sqlx_core::executor::Executor;
21use sqlx_core::ext::ustr::UStr;
22use sqlx_core::transaction::TransactionManager;
23
24sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Postgres);
25
26impl AnyConnectionBackend for PgConnection {
27 fn name(&self) -> &str {
28 <Postgres as Database>::NAME
29 }
30
31 fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
32 Connection::close(*self)
33 }
34
35 fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
36 Connection::close_hard(*self)
37 }
38
39 fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
40 Connection::ping(self)
41 }
42
43 fn begin(
44 &mut self,
45 statement: Option<Cow<'static, str>>,
46 ) -> BoxFuture<'_, sqlx_core::Result<()>> {
47 PgTransactionManager::begin(self, statement)
48 }
49
50 fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
51 PgTransactionManager::commit(self)
52 }
53
54 fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
55 PgTransactionManager::rollback(self)
56 }
57
58 fn start_rollback(&mut self) {
59 PgTransactionManager::start_rollback(self)
60 }
61
62 fn get_transaction_depth(&self) -> usize {
63 PgTransactionManager::get_transaction_depth(self)
64 }
65
66 fn shrink_buffers(&mut self) {
67 Connection::shrink_buffers(self);
68 }
69
70 fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
71 Connection::flush(self)
72 }
73
74 fn should_flush(&self) -> bool {
75 Connection::should_flush(self)
76 }
77
78 #[cfg(feature = "migrate")]
79 fn as_migrate(
80 &mut self,
81 ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
82 Ok(self)
83 }
84
85 fn fetch_many<'q>(
86 &'q mut self,
87 query: &'q str,
88 persistent: bool,
89 arguments: Option<AnyArguments<'q>>,
90 ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
91 let persistent = persistent && arguments.is_some();
92 let arguments = match arguments.as_ref().map(AnyArguments::convert_to).transpose() {
93 Ok(arguments) => arguments,
94 Err(error) => {
95 return stream::once(future::ready(Err(sqlx_core::Error::Encode(error)))).boxed()
96 }
97 };
98
99 Box::pin(
100 self.run(query, arguments, persistent, None)
101 .try_flatten_stream()
102 .map(
103 move |res: sqlx_core::Result<Either<PgQueryResult, PgRow>>| match res? {
104 Either::Left(result) => Ok(Either::Left(map_result(result))),
105 Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
106 },
107 ),
108 )
109 }
110
111 fn fetch_optional<'q>(
112 &'q mut self,
113 query: &'q str,
114 persistent: bool,
115 arguments: Option<AnyArguments<'q>>,
116 ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
117 let persistent = persistent && arguments.is_some();
118 let arguments = arguments
119 .as_ref()
120 .map(AnyArguments::convert_to)
121 .transpose()
122 .map_err(sqlx_core::Error::Encode);
123
124 Box::pin(async move {
125 let arguments = arguments?;
126 let mut stream = pin!(self.run(query, arguments, persistent, None).await?);
127
128 if let Some(Either::Right(row)) = stream.try_next().await? {
129 return Ok(Some(AnyRow::try_from(&row)?));
130 }
131
132 Ok(None)
133 })
134 }
135
136 fn prepare_with<'c, 'q: 'c>(
137 &'c mut self,
138 sql: &'q str,
139 _parameters: &[AnyTypeInfo],
140 ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
141 Box::pin(async move {
142 let statement = Executor::prepare_with(self, sql, &[]).await?;
143 AnyStatement::try_from_statement(
144 sql,
145 &statement,
146 statement.metadata.column_names.clone(),
147 )
148 })
149 }
150
151 fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
152 Box::pin(async move {
153 let describe = Executor::describe(self, sql).await?;
154
155 let columns = describe
156 .columns
157 .iter()
158 .map(AnyColumn::try_from)
159 .collect::<Result<Vec<_>, _>>()?;
160
161 let parameters = match describe.parameters {
162 Some(Either::Left(parameters)) => Some(Either::Left(
163 parameters
164 .iter()
165 .enumerate()
166 .map(|(i, type_info)| {
167 AnyTypeInfo::try_from(type_info).map_err(|_| {
168 sqlx_core::Error::AnyDriverError(
169 format!(
170 "Any driver does not support type {type_info} of parameter {i}"
171 )
172 .into(),
173 )
174 })
175 })
176 .collect::<Result<Vec<_>, _>>()?,
177 )),
178 Some(Either::Right(count)) => Some(Either::Right(count)),
179 None => None,
180 };
181
182 Ok(Describe {
183 columns,
184 parameters,
185 nullable: describe.nullable,
186 })
187 })
188 }
189}
190
191impl<'a> TryFrom<&'a PgTypeInfo> for AnyTypeInfo {
192 type Error = sqlx_core::Error;
193
194 fn try_from(pg_type: &'a PgTypeInfo) -> Result<Self, Self::Error> {
195 Ok(AnyTypeInfo {
196 kind: match &pg_type.0 {
197 PgType::Bool => AnyTypeInfoKind::Bool,
198 PgType::Void => AnyTypeInfoKind::Null,
199 PgType::Int2 => AnyTypeInfoKind::SmallInt,
200 PgType::Int4 => AnyTypeInfoKind::Integer,
201 PgType::Int8 => AnyTypeInfoKind::BigInt,
202 PgType::Float4 => AnyTypeInfoKind::Real,
203 PgType::Float8 => AnyTypeInfoKind::Double,
204 PgType::Bytea => AnyTypeInfoKind::Blob,
205 PgType::Text | PgType::Varchar => AnyTypeInfoKind::Text,
206 PgType::DeclareWithName(UStr::Static("citext")) => AnyTypeInfoKind::Text,
207 _ => {
208 return Err(sqlx_core::Error::AnyDriverError(
209 format!("Any driver does not support the Postgres type {pg_type:?}").into(),
210 ))
211 }
212 },
213 })
214 }
215}
216
217impl<'a> TryFrom<&'a PgColumn> for AnyColumn {
218 type Error = sqlx_core::Error;
219
220 fn try_from(col: &'a PgColumn) -> Result<Self, Self::Error> {
221 let type_info =
222 AnyTypeInfo::try_from(&col.type_info).map_err(|e| sqlx_core::Error::ColumnDecode {
223 index: col.name.to_string(),
224 source: e.into(),
225 })?;
226
227 Ok(AnyColumn {
228 ordinal: col.ordinal,
229 name: col.name.clone(),
230 type_info,
231 })
232 }
233}
234
235impl<'a> TryFrom<&'a PgRow> for AnyRow {
236 type Error = sqlx_core::Error;
237
238 fn try_from(row: &'a PgRow) -> Result<Self, Self::Error> {
239 AnyRow::map_from(row, row.metadata.column_names.clone())
240 }
241}
242
243impl<'a> TryFrom<&'a AnyConnectOptions> for PgConnectOptions {
244 type Error = sqlx_core::Error;
245
246 fn try_from(value: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
247 let mut opts = PgConnectOptions::parse_from_url(&value.database_url)?;
248 opts.log_settings = value.log_settings.clone();
249 Ok(opts)
250 }
251}
252
253fn map_result(res: PgQueryResult) -> AnyQueryResult {
254 AnyQueryResult {
255 rows_affected: res.rows_affected(),
256 last_insert_id: None,
257 }
258}