1use std::{error::Error as StdError, time::Duration};
2
3use crate::config::program_config::ProgramConfig;
4use crate::prelude::*;
5use anyhow::{Context, Result};
6use chrono::{DateTime, Duration as ChronoDuration, Utc};
7use futures::{FutureExt, StreamExt};
8use headless_lms_models::email_deliveries::{
9 Email, EmailDeliveryErrorInsert, FETCH_LIMIT, fetch_emails,
10 increment_retry_and_mark_non_retryable, increment_retry_and_schedule,
11 insert_email_delivery_error, mark_as_sent,
12};
13use headless_lms_models::email_templates::EmailTemplateType;
14use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
15use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
16use lettre::transport::smtp::Error as SmtpError;
17use lettre::transport::smtp::authentication::Credentials;
18use lettre::{
19 Message, SmtpTransport, Transport,
20 message::{MultiPart, SinglePart, header},
21};
22use once_cell::sync::Lazy;
23use sqlx::{Connection, PgConnection, PgPool};
24use std::collections::HashMap;
25use uuid::Uuid;
26const BATCH_SIZE: usize = FETCH_LIMIT as usize;
27
28const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
29const BASE_BACKOFF_SECS: i64 = 60;
30const MAX_BACKOFF_SECS: i64 = 24 * 60 * 60;
31const MAX_RETRY_AGE_SECS: i64 = 3 * 24 * 60 * 60;
32const JITTER_SECS: i64 = 30;
33
34static SMTP_FROM: Lazy<String> = Lazy::new(|| {
35 ProgramConfig::required("SMTP_FROM").expect("No moocfi email found in the env variables.")
36});
37static SMTP_HOST: Lazy<String> = Lazy::new(|| {
38 ProgramConfig::required("SMTP_HOST").expect("No email relay found in the env variables.")
39});
40static DB_URL: Lazy<String> = Lazy::new(|| {
41 ProgramConfig::required("DATABASE_URL").expect("No db url found in the env variables.")
42});
43static SMTP_MESSAGE_ID_DOMAIN: Lazy<String> = Lazy::new(|| {
44 ProgramConfig::optional("SMTP_MESSAGE_ID_DOMAIN")
45 .and_then(|value| {
46 let trimmed = value.trim();
47 if trimmed.is_empty() {
48 None
49 } else {
50 Some(trimmed.to_string())
51 }
52 })
53 .or_else(|| infer_email_domain(SMTP_FROM.as_str()))
54 .unwrap_or_else(|| "courses.mooc.fi".to_string())
55});
56static SMTP_USER: Lazy<String> = Lazy::new(|| {
57 ProgramConfig::required("SMTP_USER").expect("No smtp user found in env variables.")
58});
59static SMTP_PASS: Lazy<String> = Lazy::new(|| {
60 ProgramConfig::required("SMTP_PASS").expect("No smtp password found in env variables.")
61});
62
63pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
64 let mut conn = pool.acquire().await?;
65
66 let emails = fetch_emails(&mut conn).await?;
67
68 let mut futures = tokio_stream::iter(emails)
69 .map(|email| {
70 let email_id = email.id;
71 send_message(email, mailer, pool.clone()).inspect(move |r| {
72 if let Err(err) = r {
73 tracing::error!("Failed to send email {}: {}", email_id, err)
74 }
75 })
76 })
77 .buffer_unordered(BATCH_SIZE);
78
79 while futures.next().await.is_some() {}
80
81 Ok(())
82}
83
84pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
85 let mut conn = pool.acquire().await?;
86 tracing::info!("Email send messages...");
87
88 let now = Utc::now();
89 let attempt = email.retry_count + 1;
90 if retry_window_expired(email.first_failed_at, now) {
91 tracing::warn!(
92 "Retry window expired for email {} (first_failed_at={:?})",
93 email.id,
94 email.first_failed_at
95 );
96 record_non_retryable_failure(
97 &mut conn,
98 email.id,
99 attempt,
100 "retry_window_expired",
101 format!(
102 "Retry window expired before send attempt (email_id={}, user_id={}, template={:?}, first_failed_at={:?})",
103 email.id, email.user_id, email.template_type, email.first_failed_at
104 ),
105 )
106 .await?;
107 return Ok(());
108 }
109
110 let mut email_block: Vec<EmailGutenbergBlock> =
111 match email.body.as_ref().context("No body").and_then(|value| {
112 serde_json::from_value(value.clone()).context("Failed to parse email body JSON")
113 }) {
114 Ok(blocks) => blocks,
115 Err(err) => {
116 record_message_build_failure(&mut conn, &email, attempt, &err).await?;
117 return Ok(());
118 }
119 };
120
121 if let Some(template_type) = email.template_type {
122 let template_result = apply_email_template_replacements(
123 &mut conn,
124 template_type,
125 email.id,
126 email.user_id,
127 email_block,
128 attempt,
129 )
130 .await?;
131 match template_result {
132 TemplateApplyResult::Ready(blocks) => email_block = blocks,
133 TemplateApplyResult::Abandoned => return Ok(()),
134 }
135 }
136
137 let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
138 let msg_as_html = email_processor::process_content_to_html(&email_block);
139
140 let msg = match build_email_message(&email, attempt, msg_as_plaintext, msg_as_html) {
141 Ok(msg) => msg,
142 Err(err) => {
143 record_message_build_failure(&mut conn, &email, attempt, &err).await?;
144 return Ok(());
145 }
146 };
147
148 match mailer.send(&msg) {
149 Ok(_) => {
150 tracing::info!("Email sent successfully {}", email.id);
151 mark_as_sent(&mut conn, email.id)
152 .await
153 .context("Couldn't mark as sent")?;
154 }
155 Err(err) => {
156 let is_transient = is_transient_smtp_error(&err);
157 let (error_code, smtp_response, smtp_response_code) = extract_smtp_error_details(&err);
158
159 tracing::error!(
160 "SMTP send failed for {} (attempt {}, transient={}): {:?}",
161 email.id,
162 attempt,
163 is_transient,
164 err
165 );
166
167 let mut tx = (*conn)
168 .begin()
169 .await
170 .context("Couldn't start email failure transaction")?;
171
172 insert_email_delivery_error(
173 &mut tx,
174 EmailDeliveryErrorInsert {
175 email_delivery_id: email.id,
176 attempt,
177 error_message: err.to_string(),
178 error_code,
179 smtp_response,
180 smtp_response_code,
181 is_transient,
182 },
183 )
184 .await
185 .context("Couldn't insert email delivery error history")?;
186
187 if is_transient {
188 if retry_window_expired(Some(email.first_failed_at.unwrap_or(now)), now) {
189 increment_retry_and_mark_non_retryable(&mut tx, email.id)
190 .await
191 .context("Couldn't close expired retryable email")?;
192 } else {
193 let next_retry_at = compute_next_retry_at(now, email.retry_count);
196 increment_retry_and_schedule(&mut tx, email.id, Some(next_retry_at))
197 .await
198 .context("Couldn't schedule retry")?;
199 }
200 } else {
201 increment_retry_and_mark_non_retryable(&mut tx, email.id)
202 .await
203 .context("Couldn't close non-retryable email")?;
204 }
205
206 tx.commit()
207 .await
208 .context("Couldn't commit email failure transaction")?;
209 }
210 };
211
212 Ok(())
213}
214
215enum TemplateApplyResult {
216 Ready(Vec<EmailGutenbergBlock>),
217 Abandoned,
218}
219
220async fn apply_email_template_replacements(
221 conn: &mut PgConnection,
222 template_type: EmailTemplateType,
223 email_id: Uuid,
224 user_id: Uuid,
225 blocks: Vec<EmailGutenbergBlock>,
226 attempt: i32,
227) -> anyhow::Result<TemplateApplyResult> {
228 let mut replacements = HashMap::new();
229
230 match template_type {
231 EmailTemplateType::ResetPasswordEmail => {
232 if let Some(token_str) =
233 get_unused_reset_password_token_with_user_id(conn, user_id).await?
234 {
235 let base = ProgramConfig::optional("FRONTEND_BASE_URL")
236 .and_then(|value| {
237 let trimmed = value.trim();
238 if trimmed.is_empty() {
239 None
240 } else {
241 Some(trimmed.to_string())
242 }
243 })
244 .unwrap_or_else(|| FRONTEND_BASE_URL.to_string());
245
246 let reset_url = format!(
247 "{}/reset-user-password/{}",
248 base.trim_end_matches('/'),
249 token_str.token
250 );
251
252 replacements.insert("RESET_LINK".to_string(), reset_url);
253 } else {
254 let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
255 record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
256 .await?;
257 return Ok(TemplateApplyResult::Abandoned);
258 }
259 }
260 EmailTemplateType::DeleteUserEmail => {
261 if let Some(code) =
262 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
263 conn, user_id,
264 )
265 .await?
266 {
267 replacements.insert("CODE".to_string(), code.code);
268 } else {
269 let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
270 record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
271 .await?;
272 return Ok(TemplateApplyResult::Abandoned);
273 }
274 }
275 EmailTemplateType::ConfirmEmailCode => {
276 if let Some(code) =
277 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
278 conn, user_id,
279 )
280 .await?
281 {
282 replacements.insert("CODE".to_string(), code.code);
283 } else {
284 let msg = anyhow::anyhow!("No verification code found for user {}", user_id);
285 record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
286 .await?;
287 return Ok(TemplateApplyResult::Abandoned);
288 }
289 }
290 EmailTemplateType::Generic => {
291 return Ok(TemplateApplyResult::Ready(blocks));
292 }
293 }
294
295 Ok(TemplateApplyResult::Ready(insert_placeholders(
296 blocks,
297 &replacements,
298 )))
299}
300
301fn insert_placeholders(
302 blocks: Vec<EmailGutenbergBlock>,
303 replacements: &HashMap<String, String>,
304) -> Vec<EmailGutenbergBlock> {
305 blocks
306 .into_iter()
307 .map(|mut block| {
308 if let BlockAttributes::Paragraph {
309 content,
310 drop_cap,
311 rest,
312 } = block.attributes
313 {
314 let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
315 acc.replace(&format!("{{{{{}}}}}", key), value)
316 });
317
318 block.attributes = BlockAttributes::Paragraph {
319 content: replaced_content,
320 drop_cap,
321 rest,
322 };
323 }
324 block
325 })
326 .collect()
327}
328
329fn build_email_message(
330 email: &Email,
331 attempt: i32,
332 msg_as_plaintext: String,
333 msg_as_html: String,
334) -> Result<Message> {
335 Message::builder()
336 .from(SMTP_FROM.parse()?)
337 .to(email
338 .to
339 .parse()
340 .with_context(|| format!("Invalid recipient address for email_id {}", email.id))?)
341 .subject(email.subject.clone().context("No subject")?)
342 .message_id(Some(format!(
343 "<{}-{}@{}>",
344 email.id,
345 attempt,
346 SMTP_MESSAGE_ID_DOMAIN.as_str()
347 )))
348 .multipart(
349 MultiPart::alternative()
350 .singlepart(
351 SinglePart::builder()
352 .header(header::ContentType::TEXT_PLAIN)
353 .body(msg_as_plaintext),
354 )
355 .singlepart(
356 SinglePart::builder()
357 .header(header::ContentType::TEXT_HTML)
358 .body(msg_as_html),
359 ),
360 )
361 .context("Failed to build email message")
362}
363
364fn infer_email_domain(value: &str) -> Option<String> {
365 let candidate = value
366 .rsplit('<')
367 .next()
368 .unwrap_or(value)
369 .trim()
370 .trim_end_matches('>')
371 .trim();
372 let (_, domain) = candidate.rsplit_once('@')?;
373 let domain = domain.trim();
374 if domain.is_empty() || domain.contains(' ') {
375 None
376 } else {
377 Some(domain.to_string())
378 }
379}
380
381async fn record_message_build_failure(
382 conn: &mut PgConnection,
383 email: &Email,
384 attempt: i32,
385 err: &anyhow::Error,
386) -> Result<()> {
387 tracing::error!(
388 "Message construction failed for email {} (attempt {}): {:#}",
389 email.id,
390 attempt,
391 err
392 );
393 record_non_retryable_failure(
394 conn,
395 email.id,
396 attempt,
397 "message_build",
398 format!("Message construction failed: {err:#}"),
399 )
400 .await
401}
402
403pub async fn main() -> anyhow::Result<()> {
404 tracing_subscriber::fmt().init();
405 dotenvy::dotenv().ok();
406 tracing::info!("Email sender starting up...");
407
408 if ProgramConfig::optional("SMTP_USER").is_none()
409 || ProgramConfig::optional("SMTP_PASS").is_none()
410 {
411 tracing::warn!("SMTP user or password is missing or incorrect");
412 }
413
414 let pool = PgPool::connect(&DB_URL.to_string()).await?;
415 let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
416
417 let mailer = match SmtpTransport::relay(&SMTP_HOST) {
418 Ok(builder) => builder.credentials(creds).build(),
419 Err(e) => {
420 tracing::error!("Could not configure SMTP transport: {}", e);
421 return Err(e.into());
422 }
423 };
424
425 let mut interval = tokio::time::interval(Duration::from_secs(10));
426 loop {
427 interval.tick().await;
428 mail_sender(&pool, &mailer).await?;
429 }
430}
431
432fn retry_window_expired(first_failed_at: Option<DateTime<Utc>>, now: DateTime<Utc>) -> bool {
433 match first_failed_at {
434 Some(ts) => (now - ts).num_seconds() > MAX_RETRY_AGE_SECS,
435 None => false,
436 }
437}
438
439fn compute_next_retry_at(now: DateTime<Utc>, retry_count: i32) -> DateTime<Utc> {
440 let exponent = retry_count.max(0) as u32;
442 let multiplier = 2_i64.checked_pow(exponent).unwrap_or(i64::MAX);
443 let backoff = BASE_BACKOFF_SECS.saturating_mul(multiplier);
444 let capped = backoff.min(MAX_BACKOFF_SECS);
445 let jitter = rand::rng().random_range(0..=JITTER_SECS);
446 now + ChronoDuration::seconds(capped + jitter)
447}
448
449fn is_transient_smtp_error(err: &SmtpError) -> bool {
450 if err.is_transient() {
451 return true;
452 }
453 if err.is_timeout() || err.is_transport_shutdown() {
454 return true;
455 }
456 has_io_error(err)
457}
458
459fn has_io_error(err: &SmtpError) -> bool {
460 let mut source = err.source();
461 while let Some(inner) = source {
462 if inner.is::<std::io::Error>() {
463 return true;
464 }
465 source = inner.source();
466 }
467 false
468}
469
470fn extract_smtp_error_details(err: &SmtpError) -> (Option<String>, Option<String>, Option<i32>) {
471 let smtp_response_code = err.status().map(|code| i32::from(u16::from(code)));
472
473 let error_code = if err.is_transient() {
474 Some("transient".to_string())
475 } else if err.is_permanent() {
476 Some("permanent".to_string())
477 } else if err.is_timeout() {
478 Some("timeout".to_string())
479 } else if has_io_error(err) {
480 Some("network_io".to_string())
481 } else if err.is_transport_shutdown() {
482 Some("transport_shutdown".to_string())
483 } else if err.is_response() {
484 Some("response".to_string())
485 } else if err.is_client() {
486 Some("client".to_string())
487 } else {
488 None
489 };
490
491 let smtp_response = err.source().map(|source| source.to_string());
492
493 (error_code, smtp_response, smtp_response_code)
494}
495
496async fn record_non_retryable_failure(
497 conn: &mut PgConnection,
498 email_id: Uuid,
499 attempt: i32,
500 error_code: &'static str,
501 message: String,
502) -> Result<()> {
503 let mut tx = (*conn)
504 .begin()
505 .await
506 .context("Couldn't start template failure transaction")?;
507
508 insert_email_delivery_error(
509 &mut tx,
510 EmailDeliveryErrorInsert {
511 email_delivery_id: email_id,
512 attempt,
513 error_message: message,
514 error_code: Some(error_code.to_string()),
515 smtp_response: None,
516 smtp_response_code: None,
517 is_transient: false,
518 },
519 )
520 .await
521 .context("Couldn't insert email delivery error history")?;
522
523 increment_retry_and_mark_non_retryable(&mut tx, email_id)
524 .await
525 .context("Couldn't mark email as non-retryable for template error")?;
526
527 tx.commit()
528 .await
529 .context("Couldn't commit template failure transaction")?;
530
531 Ok(())
532}