headless_lms_server/programs/
email_deliver.rs

1use std::{env, error::Error as StdError, time::Duration};
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Duration as ChronoDuration, Utc};
5use futures::{FutureExt, StreamExt};
6use headless_lms_models::email_deliveries::{
7    Email, EmailDeliveryErrorInsert, FETCH_LIMIT, fetch_emails,
8    increment_retry_and_mark_non_retryable, increment_retry_and_schedule,
9    insert_email_delivery_error, mark_as_sent,
10};
11use headless_lms_models::email_templates::EmailTemplateType;
12use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
13use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
14use lettre::transport::smtp::Error as SmtpError;
15use lettre::transport::smtp::authentication::Credentials;
16use lettre::{
17    Message, SmtpTransport, Transport,
18    message::{MultiPart, SinglePart, header},
19};
20use once_cell::sync::Lazy;
21use rand::Rng;
22use sqlx::{Connection, PgConnection, PgPool};
23use std::collections::HashMap;
24use uuid::Uuid;
25const BATCH_SIZE: usize = FETCH_LIMIT as usize;
26
27const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
28const BASE_BACKOFF_SECS: i64 = 60;
29const MAX_BACKOFF_SECS: i64 = 24 * 60 * 60;
30const MAX_RETRY_AGE_SECS: i64 = 3 * 24 * 60 * 60;
31const JITTER_SECS: i64 = 30;
32
33static SMTP_FROM: Lazy<String> =
34    Lazy::new(|| env::var("SMTP_FROM").expect("No moocfi email found in the env variables."));
35static SMTP_HOST: Lazy<String> =
36    Lazy::new(|| env::var("SMTP_HOST").expect("No email relay found in the env variables."));
37static DB_URL: Lazy<String> =
38    Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
39static SMTP_MESSAGE_ID_DOMAIN: Lazy<String> = Lazy::new(|| {
40    env::var("SMTP_MESSAGE_ID_DOMAIN")
41        .ok()
42        .and_then(|value| {
43            let trimmed = value.trim();
44            if trimmed.is_empty() {
45                None
46            } else {
47                Some(trimmed.to_string())
48            }
49        })
50        .or_else(|| infer_email_domain(SMTP_FROM.as_str()))
51        .unwrap_or_else(|| "courses.mooc.fi".to_string())
52});
53static SMTP_USER: Lazy<String> =
54    Lazy::new(|| env::var("SMTP_USER").expect("No smtp user found in env variables."));
55static SMTP_PASS: Lazy<String> =
56    Lazy::new(|| env::var("SMTP_PASS").expect("No smtp password found in env variables."));
57
58pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
59    let mut conn = pool.acquire().await?;
60
61    let emails = fetch_emails(&mut conn).await?;
62
63    let mut futures = tokio_stream::iter(emails)
64        .map(|email| {
65            let email_id = email.id;
66            send_message(email, mailer, pool.clone()).inspect(move |r| {
67                if let Err(err) = r {
68                    tracing::error!("Failed to send email {}: {}", email_id, err)
69                }
70            })
71        })
72        .buffer_unordered(BATCH_SIZE);
73
74    while futures.next().await.is_some() {}
75
76    Ok(())
77}
78
79pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
80    let mut conn = pool.acquire().await?;
81    tracing::info!("Email send messages...");
82
83    let now = Utc::now();
84    let attempt = email.retry_count + 1;
85    if retry_window_expired(email.first_failed_at, now) {
86        tracing::warn!(
87            "Retry window expired for email {} (first_failed_at={:?})",
88            email.id,
89            email.first_failed_at
90        );
91        record_non_retryable_failure(
92            &mut conn,
93            email.id,
94            attempt,
95            "retry_window_expired",
96            format!(
97                "Retry window expired before send attempt (email_id={}, user_id={}, template={:?}, first_failed_at={:?})",
98                email.id, email.user_id, email.template_type, email.first_failed_at
99            ),
100        )
101        .await?;
102        return Ok(());
103    }
104
105    let mut email_block: Vec<EmailGutenbergBlock> =
106        match email.body.as_ref().context("No body").and_then(|value| {
107            serde_json::from_value(value.clone()).context("Failed to parse email body JSON")
108        }) {
109            Ok(blocks) => blocks,
110            Err(err) => {
111                record_message_build_failure(&mut conn, &email, attempt, &err).await?;
112                return Ok(());
113            }
114        };
115
116    if let Some(template_type) = email.template_type {
117        let template_result = apply_email_template_replacements(
118            &mut conn,
119            template_type,
120            email.id,
121            email.user_id,
122            email_block,
123            attempt,
124        )
125        .await?;
126        match template_result {
127            TemplateApplyResult::Ready(blocks) => email_block = blocks,
128            TemplateApplyResult::Abandoned => return Ok(()),
129        }
130    }
131
132    let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
133    let msg_as_html = email_processor::process_content_to_html(&email_block);
134
135    let msg = match build_email_message(&email, attempt, msg_as_plaintext, msg_as_html) {
136        Ok(msg) => msg,
137        Err(err) => {
138            record_message_build_failure(&mut conn, &email, attempt, &err).await?;
139            return Ok(());
140        }
141    };
142
143    match mailer.send(&msg) {
144        Ok(_) => {
145            tracing::info!("Email sent successfully {}", email.id);
146            mark_as_sent(&mut conn, email.id)
147                .await
148                .context("Couldn't mark as sent")?;
149        }
150        Err(err) => {
151            let is_transient = is_transient_smtp_error(&err);
152            let (error_code, smtp_response, smtp_response_code) = extract_smtp_error_details(&err);
153
154            tracing::error!(
155                "SMTP send failed for {} (attempt {}, transient={}): {:?}",
156                email.id,
157                attempt,
158                is_transient,
159                err
160            );
161
162            let mut tx = (*conn)
163                .begin()
164                .await
165                .context("Couldn't start email failure transaction")?;
166
167            insert_email_delivery_error(
168                &mut tx,
169                EmailDeliveryErrorInsert {
170                    email_delivery_id: email.id,
171                    attempt,
172                    error_message: err.to_string(),
173                    error_code,
174                    smtp_response,
175                    smtp_response_code,
176                    is_transient,
177                },
178            )
179            .await
180            .context("Couldn't insert email delivery error history")?;
181
182            if is_transient {
183                if retry_window_expired(Some(email.first_failed_at.unwrap_or(now)), now) {
184                    increment_retry_and_mark_non_retryable(&mut tx, email.id)
185                        .await
186                        .context("Couldn't close expired retryable email")?;
187                } else {
188                    // `retry_count` is pre-increment from the claimed row; using it here keeps
189                    // backoff aligned with the next failed-attempt number.
190                    let next_retry_at = compute_next_retry_at(now, email.retry_count);
191                    increment_retry_and_schedule(&mut tx, email.id, Some(next_retry_at))
192                        .await
193                        .context("Couldn't schedule retry")?;
194                }
195            } else {
196                increment_retry_and_mark_non_retryable(&mut tx, email.id)
197                    .await
198                    .context("Couldn't close non-retryable email")?;
199            }
200
201            tx.commit()
202                .await
203                .context("Couldn't commit email failure transaction")?;
204        }
205    };
206
207    Ok(())
208}
209
210enum TemplateApplyResult {
211    Ready(Vec<EmailGutenbergBlock>),
212    Abandoned,
213}
214
215async fn apply_email_template_replacements(
216    conn: &mut PgConnection,
217    template_type: EmailTemplateType,
218    email_id: Uuid,
219    user_id: Uuid,
220    blocks: Vec<EmailGutenbergBlock>,
221    attempt: i32,
222) -> anyhow::Result<TemplateApplyResult> {
223    let mut replacements = HashMap::new();
224
225    match template_type {
226        EmailTemplateType::ResetPasswordEmail => {
227            if let Some(token_str) =
228                get_unused_reset_password_token_with_user_id(conn, user_id).await?
229            {
230                let base =
231                    std::env::var("FRONTEND_BASE_URL").unwrap_or(FRONTEND_BASE_URL.to_string());
232
233                let reset_url = format!(
234                    "{}/reset-user-password/{}",
235                    base.trim_end_matches('/'),
236                    token_str.token
237                );
238
239                replacements.insert("RESET_LINK".to_string(), reset_url);
240            } else {
241                let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
242                record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
243                    .await?;
244                return Ok(TemplateApplyResult::Abandoned);
245            }
246        }
247        EmailTemplateType::DeleteUserEmail => {
248            if let Some(code) =
249                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
250                    conn, user_id,
251                )
252                .await?
253            {
254                replacements.insert("CODE".to_string(), code.code);
255            } else {
256                let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
257                record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
258                    .await?;
259                return Ok(TemplateApplyResult::Abandoned);
260            }
261        }
262        EmailTemplateType::ConfirmEmailCode => {
263            if let Some(code) =
264                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
265                    conn, user_id,
266                )
267                .await?
268            {
269                replacements.insert("CODE".to_string(), code.code);
270            } else {
271                let msg = anyhow::anyhow!("No verification code found for user {}", user_id);
272                record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
273                    .await?;
274                return Ok(TemplateApplyResult::Abandoned);
275            }
276        }
277        EmailTemplateType::Generic => {
278            return Ok(TemplateApplyResult::Ready(blocks));
279        }
280    }
281
282    Ok(TemplateApplyResult::Ready(insert_placeholders(
283        blocks,
284        &replacements,
285    )))
286}
287
288fn insert_placeholders(
289    blocks: Vec<EmailGutenbergBlock>,
290    replacements: &HashMap<String, String>,
291) -> Vec<EmailGutenbergBlock> {
292    blocks
293        .into_iter()
294        .map(|mut block| {
295            if let BlockAttributes::Paragraph {
296                content,
297                drop_cap,
298                rest,
299            } = block.attributes
300            {
301                let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
302                    acc.replace(&format!("{{{{{}}}}}", key), value)
303                });
304
305                block.attributes = BlockAttributes::Paragraph {
306                    content: replaced_content,
307                    drop_cap,
308                    rest,
309                };
310            }
311            block
312        })
313        .collect()
314}
315
316fn build_email_message(
317    email: &Email,
318    attempt: i32,
319    msg_as_plaintext: String,
320    msg_as_html: String,
321) -> Result<Message> {
322    Message::builder()
323        .from(SMTP_FROM.parse()?)
324        .to(email
325            .to
326            .parse()
327            .with_context(|| format!("Invalid recipient address for email_id {}", email.id))?)
328        .subject(email.subject.clone().context("No subject")?)
329        .message_id(Some(format!(
330            "<{}-{}@{}>",
331            email.id,
332            attempt,
333            SMTP_MESSAGE_ID_DOMAIN.as_str()
334        )))
335        .multipart(
336            MultiPart::alternative()
337                .singlepart(
338                    SinglePart::builder()
339                        .header(header::ContentType::TEXT_PLAIN)
340                        .body(msg_as_plaintext),
341                )
342                .singlepart(
343                    SinglePart::builder()
344                        .header(header::ContentType::TEXT_HTML)
345                        .body(msg_as_html),
346                ),
347        )
348        .context("Failed to build email message")
349}
350
351fn infer_email_domain(value: &str) -> Option<String> {
352    let candidate = value
353        .rsplit('<')
354        .next()
355        .unwrap_or(value)
356        .trim()
357        .trim_end_matches('>')
358        .trim();
359    let (_, domain) = candidate.rsplit_once('@')?;
360    let domain = domain.trim();
361    if domain.is_empty() || domain.contains(' ') {
362        None
363    } else {
364        Some(domain.to_string())
365    }
366}
367
368async fn record_message_build_failure(
369    conn: &mut PgConnection,
370    email: &Email,
371    attempt: i32,
372    err: &anyhow::Error,
373) -> Result<()> {
374    tracing::error!(
375        "Message construction failed for email {} (attempt {}): {:#}",
376        email.id,
377        attempt,
378        err
379    );
380    record_non_retryable_failure(
381        conn,
382        email.id,
383        attempt,
384        "message_build",
385        format!("Message construction failed: {err:#}"),
386    )
387    .await
388}
389
390pub async fn main() -> anyhow::Result<()> {
391    tracing_subscriber::fmt().init();
392    dotenv::dotenv().ok();
393    tracing::info!("Email sender starting up...");
394
395    if std::env::var("SMTP_USER").is_err() || std::env::var("SMTP_PASS").is_err() {
396        tracing::warn!("SMTP user or password is missing or incorrect");
397    }
398
399    let pool = PgPool::connect(&DB_URL.to_string()).await?;
400    let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
401
402    let mailer = match SmtpTransport::relay(&SMTP_HOST) {
403        Ok(builder) => builder.credentials(creds).build(),
404        Err(e) => {
405            tracing::error!("Could not configure SMTP transport: {}", e);
406            return Err(e.into());
407        }
408    };
409
410    let mut interval = tokio::time::interval(Duration::from_secs(10));
411    loop {
412        interval.tick().await;
413        mail_sender(&pool, &mailer).await?;
414    }
415}
416
417fn retry_window_expired(first_failed_at: Option<DateTime<Utc>>, now: DateTime<Utc>) -> bool {
418    match first_failed_at {
419        Some(ts) => (now - ts).num_seconds() > MAX_RETRY_AGE_SECS,
420        None => false,
421    }
422}
423
424fn compute_next_retry_at(now: DateTime<Utc>, retry_count: i32) -> DateTime<Utc> {
425    // Saturating math + MAX_BACKOFF_SECS clamp intentionally handles outlier values safely.
426    let exponent = retry_count.max(0) as u32;
427    let multiplier = 2_i64.checked_pow(exponent).unwrap_or(i64::MAX);
428    let backoff = BASE_BACKOFF_SECS.saturating_mul(multiplier);
429    let capped = backoff.min(MAX_BACKOFF_SECS);
430    let jitter = rand::rng().random_range(0..=JITTER_SECS);
431    now + ChronoDuration::seconds(capped + jitter)
432}
433
434fn is_transient_smtp_error(err: &SmtpError) -> bool {
435    if err.is_transient() {
436        return true;
437    }
438    if err.is_timeout() || err.is_transport_shutdown() {
439        return true;
440    }
441    has_io_error(err)
442}
443
444fn has_io_error(err: &SmtpError) -> bool {
445    let mut source = err.source();
446    while let Some(inner) = source {
447        if inner.is::<std::io::Error>() {
448            return true;
449        }
450        source = inner.source();
451    }
452    false
453}
454
455fn extract_smtp_error_details(err: &SmtpError) -> (Option<String>, Option<String>, Option<i32>) {
456    let smtp_response_code = err.status().map(|code| i32::from(u16::from(code)));
457
458    let error_code = if err.is_transient() {
459        Some("transient".to_string())
460    } else if err.is_permanent() {
461        Some("permanent".to_string())
462    } else if err.is_timeout() {
463        Some("timeout".to_string())
464    } else if has_io_error(err) {
465        Some("network_io".to_string())
466    } else if err.is_transport_shutdown() {
467        Some("transport_shutdown".to_string())
468    } else if err.is_response() {
469        Some("response".to_string())
470    } else if err.is_client() {
471        Some("client".to_string())
472    } else {
473        None
474    };
475
476    let smtp_response = err.source().map(|source| source.to_string());
477
478    (error_code, smtp_response, smtp_response_code)
479}
480
481async fn record_non_retryable_failure(
482    conn: &mut PgConnection,
483    email_id: Uuid,
484    attempt: i32,
485    error_code: &'static str,
486    message: String,
487) -> Result<()> {
488    let mut tx = (*conn)
489        .begin()
490        .await
491        .context("Couldn't start template failure transaction")?;
492
493    insert_email_delivery_error(
494        &mut tx,
495        EmailDeliveryErrorInsert {
496            email_delivery_id: email_id,
497            attempt,
498            error_message: message,
499            error_code: Some(error_code.to_string()),
500            smtp_response: None,
501            smtp_response_code: None,
502            is_transient: false,
503        },
504    )
505    .await
506    .context("Couldn't insert email delivery error history")?;
507
508    increment_retry_and_mark_non_retryable(&mut tx, email_id)
509        .await
510        .context("Couldn't mark email as non-retryable for template error")?;
511
512    tx.commit()
513        .await
514        .context("Couldn't commit template failure transaction")?;
515
516    Ok(())
517}