Skip to main content

headless_lms_server/programs/
email_deliver.rs

1use std::{error::Error as StdError, time::Duration};
2
3use crate::config::program_config::ProgramConfig;
4use crate::prelude::*;
5use anyhow::{Context, Result};
6use chrono::{DateTime, Duration as ChronoDuration, Utc};
7use futures::{FutureExt, StreamExt};
8use headless_lms_models::email_deliveries::{
9    Email, EmailDeliveryErrorInsert, FETCH_LIMIT, fetch_emails,
10    increment_retry_and_mark_non_retryable, increment_retry_and_schedule,
11    insert_email_delivery_error, mark_as_sent,
12};
13use headless_lms_models::email_templates::EmailTemplateType;
14use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
15use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
16use lettre::transport::smtp::Error as SmtpError;
17use lettre::transport::smtp::authentication::Credentials;
18use lettre::{
19    Message, SmtpTransport, Transport,
20    message::{MultiPart, SinglePart, header},
21};
22use once_cell::sync::Lazy;
23use sqlx::{Connection, PgConnection, PgPool};
24use std::collections::HashMap;
25use uuid::Uuid;
26const BATCH_SIZE: usize = FETCH_LIMIT as usize;
27
28const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
29const BASE_BACKOFF_SECS: i64 = 60;
30const MAX_BACKOFF_SECS: i64 = 24 * 60 * 60;
31const MAX_RETRY_AGE_SECS: i64 = 3 * 24 * 60 * 60;
32const JITTER_SECS: i64 = 30;
33
34static SMTP_FROM: Lazy<String> = Lazy::new(|| {
35    ProgramConfig::required("SMTP_FROM").expect("No moocfi email found in the env variables.")
36});
37static SMTP_HOST: Lazy<String> = Lazy::new(|| {
38    ProgramConfig::required("SMTP_HOST").expect("No email relay found in the env variables.")
39});
40static DB_URL: Lazy<String> = Lazy::new(|| {
41    ProgramConfig::required("DATABASE_URL").expect("No db url found in the env variables.")
42});
43static SMTP_MESSAGE_ID_DOMAIN: Lazy<String> = Lazy::new(|| {
44    ProgramConfig::optional("SMTP_MESSAGE_ID_DOMAIN")
45        .and_then(|value| {
46            let trimmed = value.trim();
47            if trimmed.is_empty() {
48                None
49            } else {
50                Some(trimmed.to_string())
51            }
52        })
53        .or_else(|| infer_email_domain(SMTP_FROM.as_str()))
54        .unwrap_or_else(|| "courses.mooc.fi".to_string())
55});
56static SMTP_USER: Lazy<String> = Lazy::new(|| {
57    ProgramConfig::required("SMTP_USER").expect("No smtp user found in env variables.")
58});
59static SMTP_PASS: Lazy<String> = Lazy::new(|| {
60    ProgramConfig::required("SMTP_PASS").expect("No smtp password found in env variables.")
61});
62
63pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
64    let mut conn = pool.acquire().await?;
65
66    let emails = fetch_emails(&mut conn).await?;
67
68    let mut futures = tokio_stream::iter(emails)
69        .map(|email| {
70            let email_id = email.id;
71            send_message(email, mailer, pool.clone()).inspect(move |r| {
72                if let Err(err) = r {
73                    tracing::error!("Failed to send email {}: {}", email_id, err)
74                }
75            })
76        })
77        .buffer_unordered(BATCH_SIZE);
78
79    while futures.next().await.is_some() {}
80
81    Ok(())
82}
83
84pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
85    let mut conn = pool.acquire().await?;
86    tracing::info!("Email send messages...");
87
88    let now = Utc::now();
89    let attempt = email.retry_count + 1;
90    if retry_window_expired(email.first_failed_at, now) {
91        tracing::warn!(
92            "Retry window expired for email {} (first_failed_at={:?})",
93            email.id,
94            email.first_failed_at
95        );
96        record_non_retryable_failure(
97            &mut conn,
98            email.id,
99            attempt,
100            "retry_window_expired",
101            format!(
102                "Retry window expired before send attempt (email_id={}, user_id={}, template={:?}, first_failed_at={:?})",
103                email.id, email.user_id, email.template_type, email.first_failed_at
104            ),
105        )
106        .await?;
107        return Ok(());
108    }
109
110    let mut email_block: Vec<EmailGutenbergBlock> =
111        match email.body.as_ref().context("No body").and_then(|value| {
112            serde_json::from_value(value.clone()).context("Failed to parse email body JSON")
113        }) {
114            Ok(blocks) => blocks,
115            Err(err) => {
116                record_message_build_failure(&mut conn, &email, attempt, &err).await?;
117                return Ok(());
118            }
119        };
120
121    if let Some(template_type) = email.template_type {
122        let template_result = apply_email_template_replacements(
123            &mut conn,
124            template_type,
125            email.id,
126            email.user_id,
127            email_block,
128            attempt,
129        )
130        .await?;
131        match template_result {
132            TemplateApplyResult::Ready(blocks) => email_block = blocks,
133            TemplateApplyResult::Abandoned => return Ok(()),
134        }
135    }
136
137    let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
138    let msg_as_html = email_processor::process_content_to_html(&email_block);
139
140    let msg = match build_email_message(&email, attempt, msg_as_plaintext, msg_as_html) {
141        Ok(msg) => msg,
142        Err(err) => {
143            record_message_build_failure(&mut conn, &email, attempt, &err).await?;
144            return Ok(());
145        }
146    };
147
148    match mailer.send(&msg) {
149        Ok(_) => {
150            tracing::info!("Email sent successfully {}", email.id);
151            mark_as_sent(&mut conn, email.id)
152                .await
153                .context("Couldn't mark as sent")?;
154        }
155        Err(err) => {
156            let is_transient = is_transient_smtp_error(&err);
157            let (error_code, smtp_response, smtp_response_code) = extract_smtp_error_details(&err);
158
159            tracing::error!(
160                "SMTP send failed for {} (attempt {}, transient={}): {:?}",
161                email.id,
162                attempt,
163                is_transient,
164                err
165            );
166
167            let mut tx = (*conn)
168                .begin()
169                .await
170                .context("Couldn't start email failure transaction")?;
171
172            insert_email_delivery_error(
173                &mut tx,
174                EmailDeliveryErrorInsert {
175                    email_delivery_id: email.id,
176                    attempt,
177                    error_message: err.to_string(),
178                    error_code,
179                    smtp_response,
180                    smtp_response_code,
181                    is_transient,
182                },
183            )
184            .await
185            .context("Couldn't insert email delivery error history")?;
186
187            if is_transient {
188                if retry_window_expired(Some(email.first_failed_at.unwrap_or(now)), now) {
189                    increment_retry_and_mark_non_retryable(&mut tx, email.id)
190                        .await
191                        .context("Couldn't close expired retryable email")?;
192                } else {
193                    // `retry_count` is pre-increment from the claimed row; using it here keeps
194                    // backoff aligned with the next failed-attempt number.
195                    let next_retry_at = compute_next_retry_at(now, email.retry_count);
196                    increment_retry_and_schedule(&mut tx, email.id, Some(next_retry_at))
197                        .await
198                        .context("Couldn't schedule retry")?;
199                }
200            } else {
201                increment_retry_and_mark_non_retryable(&mut tx, email.id)
202                    .await
203                    .context("Couldn't close non-retryable email")?;
204            }
205
206            tx.commit()
207                .await
208                .context("Couldn't commit email failure transaction")?;
209        }
210    };
211
212    Ok(())
213}
214
215enum TemplateApplyResult {
216    Ready(Vec<EmailGutenbergBlock>),
217    Abandoned,
218}
219
220async fn apply_email_template_replacements(
221    conn: &mut PgConnection,
222    template_type: EmailTemplateType,
223    email_id: Uuid,
224    user_id: Uuid,
225    blocks: Vec<EmailGutenbergBlock>,
226    attempt: i32,
227) -> anyhow::Result<TemplateApplyResult> {
228    let mut replacements = HashMap::new();
229
230    match template_type {
231        EmailTemplateType::ResetPasswordEmail => {
232            if let Some(token_str) =
233                get_unused_reset_password_token_with_user_id(conn, user_id).await?
234            {
235                let base = ProgramConfig::optional("FRONTEND_BASE_URL")
236                    .and_then(|value| {
237                        let trimmed = value.trim();
238                        if trimmed.is_empty() {
239                            None
240                        } else {
241                            Some(trimmed.to_string())
242                        }
243                    })
244                    .unwrap_or_else(|| FRONTEND_BASE_URL.to_string());
245
246                let reset_url = format!(
247                    "{}/reset-user-password/{}",
248                    base.trim_end_matches('/'),
249                    token_str.token
250                );
251
252                replacements.insert("RESET_LINK".to_string(), reset_url);
253            } else {
254                let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
255                record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
256                    .await?;
257                return Ok(TemplateApplyResult::Abandoned);
258            }
259        }
260        EmailTemplateType::DeleteUserEmail => {
261            if let Some(code) =
262                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
263                    conn, user_id,
264                )
265                .await?
266            {
267                replacements.insert("CODE".to_string(), code.code);
268            } else {
269                let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
270                record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
271                    .await?;
272                return Ok(TemplateApplyResult::Abandoned);
273            }
274        }
275        EmailTemplateType::ConfirmEmailCode => {
276            if let Some(code) =
277                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
278                    conn, user_id,
279                )
280                .await?
281            {
282                replacements.insert("CODE".to_string(), code.code);
283            } else {
284                let msg = anyhow::anyhow!("No verification code found for user {}", user_id);
285                record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
286                    .await?;
287                return Ok(TemplateApplyResult::Abandoned);
288            }
289        }
290        EmailTemplateType::Generic => {
291            return Ok(TemplateApplyResult::Ready(blocks));
292        }
293    }
294
295    Ok(TemplateApplyResult::Ready(insert_placeholders(
296        blocks,
297        &replacements,
298    )))
299}
300
301fn insert_placeholders(
302    blocks: Vec<EmailGutenbergBlock>,
303    replacements: &HashMap<String, String>,
304) -> Vec<EmailGutenbergBlock> {
305    blocks
306        .into_iter()
307        .map(|mut block| {
308            if let BlockAttributes::Paragraph {
309                content,
310                drop_cap,
311                rest,
312            } = block.attributes
313            {
314                let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
315                    acc.replace(&format!("{{{{{}}}}}", key), value)
316                });
317
318                block.attributes = BlockAttributes::Paragraph {
319                    content: replaced_content,
320                    drop_cap,
321                    rest,
322                };
323            }
324            block
325        })
326        .collect()
327}
328
329fn build_email_message(
330    email: &Email,
331    attempt: i32,
332    msg_as_plaintext: String,
333    msg_as_html: String,
334) -> Result<Message> {
335    Message::builder()
336        .from(SMTP_FROM.parse()?)
337        .to(email
338            .to
339            .parse()
340            .with_context(|| format!("Invalid recipient address for email_id {}", email.id))?)
341        .subject(email.subject.clone().context("No subject")?)
342        .message_id(Some(format!(
343            "<{}-{}@{}>",
344            email.id,
345            attempt,
346            SMTP_MESSAGE_ID_DOMAIN.as_str()
347        )))
348        .multipart(
349            MultiPart::alternative()
350                .singlepart(
351                    SinglePart::builder()
352                        .header(header::ContentType::TEXT_PLAIN)
353                        .body(msg_as_plaintext),
354                )
355                .singlepart(
356                    SinglePart::builder()
357                        .header(header::ContentType::TEXT_HTML)
358                        .body(msg_as_html),
359                ),
360        )
361        .context("Failed to build email message")
362}
363
364fn infer_email_domain(value: &str) -> Option<String> {
365    let candidate = value
366        .rsplit('<')
367        .next()
368        .unwrap_or(value)
369        .trim()
370        .trim_end_matches('>')
371        .trim();
372    let (_, domain) = candidate.rsplit_once('@')?;
373    let domain = domain.trim();
374    if domain.is_empty() || domain.contains(' ') {
375        None
376    } else {
377        Some(domain.to_string())
378    }
379}
380
381async fn record_message_build_failure(
382    conn: &mut PgConnection,
383    email: &Email,
384    attempt: i32,
385    err: &anyhow::Error,
386) -> Result<()> {
387    tracing::error!(
388        "Message construction failed for email {} (attempt {}): {:#}",
389        email.id,
390        attempt,
391        err
392    );
393    record_non_retryable_failure(
394        conn,
395        email.id,
396        attempt,
397        "message_build",
398        format!("Message construction failed: {err:#}"),
399    )
400    .await
401}
402
403pub async fn main() -> anyhow::Result<()> {
404    tracing_subscriber::fmt().init();
405    dotenvy::dotenv().ok();
406    tracing::info!("Email sender starting up...");
407
408    if ProgramConfig::optional("SMTP_USER").is_none()
409        || ProgramConfig::optional("SMTP_PASS").is_none()
410    {
411        tracing::warn!("SMTP user or password is missing or incorrect");
412    }
413
414    let pool = PgPool::connect(&DB_URL.to_string()).await?;
415    let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
416
417    let mailer = match SmtpTransport::relay(&SMTP_HOST) {
418        Ok(builder) => builder.credentials(creds).build(),
419        Err(e) => {
420            tracing::error!("Could not configure SMTP transport: {}", e);
421            return Err(e.into());
422        }
423    };
424
425    let mut interval = tokio::time::interval(Duration::from_secs(10));
426    loop {
427        interval.tick().await;
428        mail_sender(&pool, &mailer).await?;
429    }
430}
431
432fn retry_window_expired(first_failed_at: Option<DateTime<Utc>>, now: DateTime<Utc>) -> bool {
433    match first_failed_at {
434        Some(ts) => (now - ts).num_seconds() > MAX_RETRY_AGE_SECS,
435        None => false,
436    }
437}
438
439fn compute_next_retry_at(now: DateTime<Utc>, retry_count: i32) -> DateTime<Utc> {
440    // Saturating math + MAX_BACKOFF_SECS clamp intentionally handles outlier values safely.
441    let exponent = retry_count.max(0) as u32;
442    let multiplier = 2_i64.checked_pow(exponent).unwrap_or(i64::MAX);
443    let backoff = BASE_BACKOFF_SECS.saturating_mul(multiplier);
444    let capped = backoff.min(MAX_BACKOFF_SECS);
445    let jitter = rand::rng().random_range(0..=JITTER_SECS);
446    now + ChronoDuration::seconds(capped + jitter)
447}
448
449fn is_transient_smtp_error(err: &SmtpError) -> bool {
450    if err.is_transient() {
451        return true;
452    }
453    if err.is_timeout() || err.is_transport_shutdown() {
454        return true;
455    }
456    has_io_error(err)
457}
458
459fn has_io_error(err: &SmtpError) -> bool {
460    let mut source = err.source();
461    while let Some(inner) = source {
462        if inner.is::<std::io::Error>() {
463            return true;
464        }
465        source = inner.source();
466    }
467    false
468}
469
470fn extract_smtp_error_details(err: &SmtpError) -> (Option<String>, Option<String>, Option<i32>) {
471    let smtp_response_code = err.status().map(|code| i32::from(u16::from(code)));
472
473    let error_code = if err.is_transient() {
474        Some("transient".to_string())
475    } else if err.is_permanent() {
476        Some("permanent".to_string())
477    } else if err.is_timeout() {
478        Some("timeout".to_string())
479    } else if has_io_error(err) {
480        Some("network_io".to_string())
481    } else if err.is_transport_shutdown() {
482        Some("transport_shutdown".to_string())
483    } else if err.is_response() {
484        Some("response".to_string())
485    } else if err.is_client() {
486        Some("client".to_string())
487    } else {
488        None
489    };
490
491    let smtp_response = err.source().map(|source| source.to_string());
492
493    (error_code, smtp_response, smtp_response_code)
494}
495
496async fn record_non_retryable_failure(
497    conn: &mut PgConnection,
498    email_id: Uuid,
499    attempt: i32,
500    error_code: &'static str,
501    message: String,
502) -> Result<()> {
503    let mut tx = (*conn)
504        .begin()
505        .await
506        .context("Couldn't start template failure transaction")?;
507
508    insert_email_delivery_error(
509        &mut tx,
510        EmailDeliveryErrorInsert {
511            email_delivery_id: email_id,
512            attempt,
513            error_message: message,
514            error_code: Some(error_code.to_string()),
515            smtp_response: None,
516            smtp_response_code: None,
517            is_transient: false,
518        },
519    )
520    .await
521    .context("Couldn't insert email delivery error history")?;
522
523    increment_retry_and_mark_non_retryable(&mut tx, email_id)
524        .await
525        .context("Couldn't mark email as non-retryable for template error")?;
526
527    tx.commit()
528        .await
529        .context("Couldn't commit template failure transaction")?;
530
531    Ok(())
532}