1use crate::email_templates::EmailTemplateType;
2use crate::prelude::*;
3
4pub const FETCH_LIMIT: i64 = 20;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7pub struct EmailDelivery {
8 pub id: Uuid,
9 pub created_at: DateTime<Utc>,
10 pub updated_at: DateTime<Utc>,
11 pub deleted_at: Option<DateTime<Utc>>,
12 pub email_template_id: Uuid,
13 pub sent: bool,
14 pub user_id: Uuid,
15 pub retry_count: i32,
17 pub next_retry_at: Option<DateTime<Utc>>,
18 pub retryable: bool,
19 pub first_failed_at: Option<DateTime<Utc>>,
20 pub last_attempt_at: Option<DateTime<Utc>>,
21}
22
23pub struct Email {
24 pub id: Uuid,
25 pub user_id: Uuid,
26 pub to: String,
27 pub subject: Option<String>,
28 pub body: Option<serde_json::Value>,
29 pub template_type: Option<EmailTemplateType>,
30 pub retry_count: i32,
32 pub next_retry_at: Option<DateTime<Utc>>,
33 pub retryable: bool,
34 pub first_failed_at: Option<DateTime<Utc>>,
35 pub last_attempt_at: Option<DateTime<Utc>>,
36}
37
38pub async fn insert_email_delivery(
40 conn: &mut PgConnection,
41 user_id: Uuid,
42 email_template_id: Uuid,
43) -> ModelResult<Uuid> {
44 let check = sqlx::query_as!(
45 CheckUserAndTemplateRow,
46 r#"
47SELECT
48 EXISTS(SELECT 1 FROM users WHERE id = $1 AND deleted_at IS NULL) AS "user_ok!",
49 EXISTS(SELECT 1 FROM email_templates WHERE id = $2 AND deleted_at IS NULL) AS "template_ok!"
50 "#,
51 user_id,
52 email_template_id
53 )
54 .fetch_one(&mut *conn)
55 .await?;
56 if !check.user_ok {
57 return Err(ModelError::new(
58 ModelErrorType::PreconditionFailed,
59 "User not found or deleted".to_string(),
60 None,
61 ));
62 }
63 if !check.template_ok {
64 return Err(ModelError::new(
65 ModelErrorType::PreconditionFailed,
66 "Email template not found or deleted".to_string(),
67 None,
68 ));
69 }
70
71 let id = Uuid::new_v4();
72 sqlx::query!(
73 r#"
74INSERT INTO email_deliveries (
75 id,
76 user_id,
77 email_template_id
78)
79VALUES ($1, $2, $3)
80 "#,
81 id,
82 user_id,
83 email_template_id
84 )
85 .execute(conn)
86 .await?;
87
88 Ok(id)
89}
90
91struct CheckUserAndTemplateRow {
92 user_ok: bool,
93 template_ok: bool,
94}
95
96pub async fn fetch_emails(conn: &mut PgConnection) -> ModelResult<Vec<Email>> {
97 let emails = sqlx::query_as!(
98 Email,
99 r#"
100WITH due AS (
101 SELECT
102 ed.id
103 FROM email_deliveries ed
104 JOIN users u ON u.id = ed.user_id
105 JOIN user_details ud ON ud.user_id = ed.user_id
106 JOIN email_templates et ON et.id = ed.email_template_id
107 WHERE ed.deleted_at IS NULL
108 AND ed.sent = FALSE
109 AND ed.retryable = TRUE
110 AND u.deleted_at IS NULL
111 AND et.deleted_at IS NULL
112 AND (ed.next_retry_at IS NULL OR ed.next_retry_at <= now())
113 ORDER BY coalesce(ed.next_retry_at, '-infinity'::timestamptz), ed.created_at
114 FOR UPDATE SKIP LOCKED
115 LIMIT $1
116),
117claimed AS (
118 UPDATE email_deliveries ed
119 SET last_attempt_at = now(),
120 -- Crash-recovery lease for claimed rows; this is not retry backoff.
121 next_retry_at = now() + interval '5 minutes'
122 FROM due
123 WHERE ed.id = due.id
124 RETURNING
125 ed.id,
126 ed.user_id,
127 ed.email_template_id,
128 ed.retry_count,
129 ed.next_retry_at,
130 ed.retryable,
131 ed.first_failed_at,
132 ed.last_attempt_at
133)
134SELECT
135 c.id AS id,
136 c.user_id AS user_id,
137 ud.email AS to,
138 et.subject AS subject,
139 et.content AS body,
140 et.email_template_type AS "template_type: EmailTemplateType",
141 c.retry_count AS retry_count,
142 c.next_retry_at AS next_retry_at,
143 c.retryable AS retryable,
144 c.first_failed_at AS first_failed_at,
145 c.last_attempt_at AS last_attempt_at
146FROM claimed c
147JOIN email_templates et ON et.id = c.email_template_id
148JOIN users u ON u.id = c.user_id
149JOIN user_details ud ON ud.user_id = u.id
150ORDER BY c.last_attempt_at ASC;
151 "#,
152 FETCH_LIMIT
153 )
154 .fetch_all(conn)
155 .await?;
156
157 Ok(emails)
158}
159
160pub async fn mark_as_sent(conn: &mut PgConnection, email_id: Uuid) -> ModelResult<()> {
161 sqlx::query!(
162 "
163update email_deliveries
164set sent = TRUE,
165 next_retry_at = NULL
166where id = $1;
167 ",
168 email_id
169 )
170 .execute(conn)
171 .await?;
172
173 Ok(())
174}
175
176pub async fn insert_email_delivery_error(
177 conn: &mut PgConnection,
178 error: EmailDeliveryErrorInsert,
179) -> ModelResult<Uuid> {
180 let id = Uuid::new_v4();
181 sqlx::query!(
182 r#"
183INSERT INTO email_delivery_errors (
184 id,
185 email_delivery_id,
186 attempt,
187 error_message,
188 error_code,
189 smtp_response,
190 smtp_response_code,
191 is_transient
192)
193VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
194 "#,
195 id,
196 error.email_delivery_id,
197 error.attempt,
198 error.error_message,
199 error.error_code,
200 error.smtp_response,
201 error.smtp_response_code,
202 error.is_transient
203 )
204 .execute(conn)
205 .await?;
206
207 Ok(id)
208}
209
210pub struct EmailDeliveryErrorInsert {
211 pub email_delivery_id: Uuid,
212 pub attempt: i32,
213 pub error_message: String,
214 pub error_code: Option<String>,
215 pub smtp_response: Option<String>,
216 pub smtp_response_code: Option<i32>,
217 pub is_transient: bool,
218}
219
220#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
221pub struct EmailDeliveryError {
222 pub id: Uuid,
223 pub email_delivery_id: Uuid,
224 pub attempt: i32,
225 pub error_message: String,
226 pub error_code: Option<String>,
227 pub smtp_response: Option<String>,
228 pub smtp_response_code: Option<i32>,
229 pub is_transient: bool,
230 pub created_at: DateTime<Utc>,
231 pub updated_at: DateTime<Utc>,
232 pub deleted_at: Option<DateTime<Utc>>,
233}
234
235pub async fn increment_retry_and_schedule(
236 conn: &mut PgConnection,
237 email_id: Uuid,
238 next_retry_at: Option<DateTime<Utc>>,
239) -> ModelResult<()> {
240 sqlx::query!(
241 "
242UPDATE email_deliveries
243SET retry_count = retry_count + 1,
244 next_retry_at = $2,
245 first_failed_at = COALESCE(first_failed_at, NOW())
246where id = $1;
247 ",
248 email_id,
249 next_retry_at
250 )
251 .execute(conn)
252 .await?;
253
254 Ok(())
255}
256
257pub async fn increment_retry_and_mark_non_retryable(
258 conn: &mut PgConnection,
259 email_id: Uuid,
260) -> ModelResult<()> {
261 sqlx::query!(
262 "
263UPDATE email_deliveries
264SET retry_count = retry_count + 1,
265 first_failed_at = COALESCE(first_failed_at, NOW()),
266 retryable = FALSE,
267 next_retry_at = NULL
268WHERE id = $1;
269 ",
270 email_id
271 )
272 .execute(conn)
273 .await?;
274
275 Ok(())
276}
277
278pub async fn soft_delete_unsent_retryable_deliveries_for_user(
280 conn: &mut PgConnection,
281 user_id: Uuid,
282) -> ModelResult<()> {
283 sqlx::query!(
284 "
285UPDATE email_deliveries
286SET deleted_at = NOW()
287WHERE user_id = $1
288 AND deleted_at IS NULL
289 AND sent = FALSE
290 AND retryable = TRUE",
291 user_id
292 )
293 .execute(conn)
294 .await?;
295 Ok(())
296}