headless_lms_models/
email_deliveries.rs

1use crate::email_templates::EmailTemplateType;
2use crate::prelude::*;
3
4pub const FETCH_LIMIT: i64 = 20;
5
6#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
7pub struct EmailDelivery {
8    pub id: Uuid,
9    pub created_at: DateTime<Utc>,
10    pub updated_at: DateTime<Utc>,
11    pub deleted_at: Option<DateTime<Utc>>,
12    pub email_template_id: Uuid,
13    pub sent: bool,
14    pub user_id: Uuid,
15    /// Number of failed send attempts recorded so far.
16    pub retry_count: i32,
17    pub next_retry_at: Option<DateTime<Utc>>,
18    pub retryable: bool,
19    pub first_failed_at: Option<DateTime<Utc>>,
20    pub last_attempt_at: Option<DateTime<Utc>>,
21}
22
23pub struct Email {
24    pub id: Uuid,
25    pub user_id: Uuid,
26    pub to: String,
27    pub subject: Option<String>,
28    pub body: Option<serde_json::Value>,
29    pub template_type: Option<EmailTemplateType>,
30    /// Number of failed send attempts recorded so far.
31    pub retry_count: i32,
32    pub next_retry_at: Option<DateTime<Utc>>,
33    pub retryable: bool,
34    pub first_failed_at: Option<DateTime<Utc>>,
35    pub last_attempt_at: Option<DateTime<Utc>>,
36}
37
38/// Inserts an email delivery; fails if the user or email template is soft-deleted.
39pub async fn insert_email_delivery(
40    conn: &mut PgConnection,
41    user_id: Uuid,
42    email_template_id: Uuid,
43) -> ModelResult<Uuid> {
44    let check = sqlx::query_as!(
45        CheckUserAndTemplateRow,
46        r#"
47SELECT
48    EXISTS(SELECT 1 FROM users WHERE id = $1 AND deleted_at IS NULL) AS "user_ok!",
49    EXISTS(SELECT 1 FROM email_templates WHERE id = $2 AND deleted_at IS NULL) AS "template_ok!"
50        "#,
51        user_id,
52        email_template_id
53    )
54    .fetch_one(&mut *conn)
55    .await?;
56    if !check.user_ok {
57        return Err(ModelError::new(
58            ModelErrorType::PreconditionFailed,
59            "User not found or deleted".to_string(),
60            None,
61        ));
62    }
63    if !check.template_ok {
64        return Err(ModelError::new(
65            ModelErrorType::PreconditionFailed,
66            "Email template not found or deleted".to_string(),
67            None,
68        ));
69    }
70
71    let id = Uuid::new_v4();
72    sqlx::query!(
73        r#"
74INSERT INTO email_deliveries (
75    id,
76    user_id,
77    email_template_id
78)
79VALUES ($1, $2, $3)
80        "#,
81        id,
82        user_id,
83        email_template_id
84    )
85    .execute(conn)
86    .await?;
87
88    Ok(id)
89}
90
91struct CheckUserAndTemplateRow {
92    user_ok: bool,
93    template_ok: bool,
94}
95
96pub async fn fetch_emails(conn: &mut PgConnection) -> ModelResult<Vec<Email>> {
97    let emails = sqlx::query_as!(
98        Email,
99        r#"
100WITH due AS (
101    SELECT
102        ed.id
103    FROM email_deliveries ed
104    JOIN users u ON u.id = ed.user_id
105    JOIN user_details ud ON ud.user_id = ed.user_id
106    JOIN email_templates et ON et.id = ed.email_template_id
107    WHERE ed.deleted_at IS NULL
108      AND ed.sent = FALSE
109      AND ed.retryable = TRUE
110      AND u.deleted_at IS NULL
111      AND et.deleted_at IS NULL
112      AND (ed.next_retry_at IS NULL OR ed.next_retry_at <= now())
113    ORDER BY coalesce(ed.next_retry_at, '-infinity'::timestamptz), ed.created_at
114    FOR UPDATE SKIP LOCKED
115    LIMIT $1
116),
117claimed AS (
118    UPDATE email_deliveries ed
119    SET last_attempt_at = now(),
120        -- Crash-recovery lease for claimed rows; this is not retry backoff.
121        next_retry_at = now() + interval '5 minutes'
122    FROM due
123    WHERE ed.id = due.id
124    RETURNING
125        ed.id,
126        ed.user_id,
127        ed.email_template_id,
128        ed.retry_count,
129        ed.next_retry_at,
130        ed.retryable,
131        ed.first_failed_at,
132        ed.last_attempt_at
133)
134SELECT
135    c.id AS id,
136    c.user_id AS user_id,
137    ud.email AS to,
138    et.subject AS subject,
139    et.content AS body,
140    et.email_template_type AS "template_type: EmailTemplateType",
141    c.retry_count AS retry_count,
142    c.next_retry_at AS next_retry_at,
143    c.retryable AS retryable,
144    c.first_failed_at AS first_failed_at,
145    c.last_attempt_at AS last_attempt_at
146FROM claimed c
147JOIN email_templates et ON et.id = c.email_template_id
148JOIN users u ON u.id = c.user_id
149JOIN user_details ud ON ud.user_id = u.id
150ORDER BY c.last_attempt_at ASC;
151        "#,
152        FETCH_LIMIT
153    )
154    .fetch_all(conn)
155    .await?;
156
157    Ok(emails)
158}
159
160pub async fn mark_as_sent(conn: &mut PgConnection, email_id: Uuid) -> ModelResult<()> {
161    sqlx::query!(
162        "
163update email_deliveries
164set sent = TRUE,
165    next_retry_at = NULL
166where id = $1;
167    ",
168        email_id
169    )
170    .execute(conn)
171    .await?;
172
173    Ok(())
174}
175
176pub async fn insert_email_delivery_error(
177    conn: &mut PgConnection,
178    error: EmailDeliveryErrorInsert,
179) -> ModelResult<Uuid> {
180    let id = Uuid::new_v4();
181    sqlx::query!(
182        r#"
183INSERT INTO email_delivery_errors (
184    id,
185    email_delivery_id,
186    attempt,
187    error_message,
188    error_code,
189    smtp_response,
190    smtp_response_code,
191    is_transient
192)
193VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
194        "#,
195        id,
196        error.email_delivery_id,
197        error.attempt,
198        error.error_message,
199        error.error_code,
200        error.smtp_response,
201        error.smtp_response_code,
202        error.is_transient
203    )
204    .execute(conn)
205    .await?;
206
207    Ok(id)
208}
209
210pub struct EmailDeliveryErrorInsert {
211    pub email_delivery_id: Uuid,
212    pub attempt: i32,
213    pub error_message: String,
214    pub error_code: Option<String>,
215    pub smtp_response: Option<String>,
216    pub smtp_response_code: Option<i32>,
217    pub is_transient: bool,
218}
219
220#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
221pub struct EmailDeliveryError {
222    pub id: Uuid,
223    pub email_delivery_id: Uuid,
224    pub attempt: i32,
225    pub error_message: String,
226    pub error_code: Option<String>,
227    pub smtp_response: Option<String>,
228    pub smtp_response_code: Option<i32>,
229    pub is_transient: bool,
230    pub created_at: DateTime<Utc>,
231    pub updated_at: DateTime<Utc>,
232    pub deleted_at: Option<DateTime<Utc>>,
233}
234
235pub async fn increment_retry_and_schedule(
236    conn: &mut PgConnection,
237    email_id: Uuid,
238    next_retry_at: Option<DateTime<Utc>>,
239) -> ModelResult<()> {
240    sqlx::query!(
241        "
242UPDATE email_deliveries
243SET retry_count = retry_count + 1,
244    next_retry_at = $2,
245    first_failed_at = COALESCE(first_failed_at, NOW())
246where id = $1;
247    ",
248        email_id,
249        next_retry_at
250    )
251    .execute(conn)
252    .await?;
253
254    Ok(())
255}
256
257pub async fn increment_retry_and_mark_non_retryable(
258    conn: &mut PgConnection,
259    email_id: Uuid,
260) -> ModelResult<()> {
261    sqlx::query!(
262        "
263UPDATE email_deliveries
264SET retry_count = retry_count + 1,
265    first_failed_at = COALESCE(first_failed_at, NOW()),
266    retryable = FALSE,
267    next_retry_at = NULL
268WHERE id = $1;
269    ",
270        email_id
271    )
272    .execute(conn)
273    .await?;
274
275    Ok(())
276}
277
278/// Soft-deletes unsent, still-retryable email deliveries for a user. Call when soft-deleting the user so pending deliveries are not retried.
279pub async fn soft_delete_unsent_retryable_deliveries_for_user(
280    conn: &mut PgConnection,
281    user_id: Uuid,
282) -> ModelResult<()> {
283    sqlx::query!(
284        "
285UPDATE email_deliveries
286SET deleted_at = NOW()
287WHERE user_id = $1
288  AND deleted_at IS NULL
289  AND sent = FALSE
290  AND retryable = TRUE",
291        user_id
292    )
293    .execute(conn)
294    .await?;
295    Ok(())
296}