1use std::{env, error::Error as StdError, time::Duration};
2
3use anyhow::{Context, Result};
4use chrono::{DateTime, Duration as ChronoDuration, Utc};
5use futures::{FutureExt, StreamExt};
6use headless_lms_models::email_deliveries::{
7 Email, EmailDeliveryErrorInsert, FETCH_LIMIT, fetch_emails,
8 increment_retry_and_mark_non_retryable, increment_retry_and_schedule,
9 insert_email_delivery_error, mark_as_sent,
10};
11use headless_lms_models::email_templates::EmailTemplateType;
12use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
13use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
14use lettre::transport::smtp::Error as SmtpError;
15use lettre::transport::smtp::authentication::Credentials;
16use lettre::{
17 Message, SmtpTransport, Transport,
18 message::{MultiPart, SinglePart, header},
19};
20use once_cell::sync::Lazy;
21use rand::Rng;
22use sqlx::{Connection, PgConnection, PgPool};
23use std::collections::HashMap;
24use uuid::Uuid;
25const BATCH_SIZE: usize = FETCH_LIMIT as usize;
26
27const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
28const BASE_BACKOFF_SECS: i64 = 60;
29const MAX_BACKOFF_SECS: i64 = 24 * 60 * 60;
30const MAX_RETRY_AGE_SECS: i64 = 3 * 24 * 60 * 60;
31const JITTER_SECS: i64 = 30;
32
33static SMTP_FROM: Lazy<String> =
34 Lazy::new(|| env::var("SMTP_FROM").expect("No moocfi email found in the env variables."));
35static SMTP_HOST: Lazy<String> =
36 Lazy::new(|| env::var("SMTP_HOST").expect("No email relay found in the env variables."));
37static DB_URL: Lazy<String> =
38 Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
39static SMTP_MESSAGE_ID_DOMAIN: Lazy<String> = Lazy::new(|| {
40 env::var("SMTP_MESSAGE_ID_DOMAIN")
41 .ok()
42 .and_then(|value| {
43 let trimmed = value.trim();
44 if trimmed.is_empty() {
45 None
46 } else {
47 Some(trimmed.to_string())
48 }
49 })
50 .or_else(|| infer_email_domain(SMTP_FROM.as_str()))
51 .unwrap_or_else(|| "courses.mooc.fi".to_string())
52});
53static SMTP_USER: Lazy<String> =
54 Lazy::new(|| env::var("SMTP_USER").expect("No smtp user found in env variables."));
55static SMTP_PASS: Lazy<String> =
56 Lazy::new(|| env::var("SMTP_PASS").expect("No smtp password found in env variables."));
57
58pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
59 let mut conn = pool.acquire().await?;
60
61 let emails = fetch_emails(&mut conn).await?;
62
63 let mut futures = tokio_stream::iter(emails)
64 .map(|email| {
65 let email_id = email.id;
66 send_message(email, mailer, pool.clone()).inspect(move |r| {
67 if let Err(err) = r {
68 tracing::error!("Failed to send email {}: {}", email_id, err)
69 }
70 })
71 })
72 .buffer_unordered(BATCH_SIZE);
73
74 while futures.next().await.is_some() {}
75
76 Ok(())
77}
78
79pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
80 let mut conn = pool.acquire().await?;
81 tracing::info!("Email send messages...");
82
83 let now = Utc::now();
84 let attempt = email.retry_count + 1;
85 if retry_window_expired(email.first_failed_at, now) {
86 tracing::warn!(
87 "Retry window expired for email {} (first_failed_at={:?})",
88 email.id,
89 email.first_failed_at
90 );
91 record_non_retryable_failure(
92 &mut conn,
93 email.id,
94 attempt,
95 "retry_window_expired",
96 format!(
97 "Retry window expired before send attempt (email_id={}, user_id={}, template={:?}, first_failed_at={:?})",
98 email.id, email.user_id, email.template_type, email.first_failed_at
99 ),
100 )
101 .await?;
102 return Ok(());
103 }
104
105 let mut email_block: Vec<EmailGutenbergBlock> =
106 match email.body.as_ref().context("No body").and_then(|value| {
107 serde_json::from_value(value.clone()).context("Failed to parse email body JSON")
108 }) {
109 Ok(blocks) => blocks,
110 Err(err) => {
111 record_message_build_failure(&mut conn, &email, attempt, &err).await?;
112 return Ok(());
113 }
114 };
115
116 if let Some(template_type) = email.template_type {
117 let template_result = apply_email_template_replacements(
118 &mut conn,
119 template_type,
120 email.id,
121 email.user_id,
122 email_block,
123 attempt,
124 )
125 .await?;
126 match template_result {
127 TemplateApplyResult::Ready(blocks) => email_block = blocks,
128 TemplateApplyResult::Abandoned => return Ok(()),
129 }
130 }
131
132 let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
133 let msg_as_html = email_processor::process_content_to_html(&email_block);
134
135 let msg = match build_email_message(&email, attempt, msg_as_plaintext, msg_as_html) {
136 Ok(msg) => msg,
137 Err(err) => {
138 record_message_build_failure(&mut conn, &email, attempt, &err).await?;
139 return Ok(());
140 }
141 };
142
143 match mailer.send(&msg) {
144 Ok(_) => {
145 tracing::info!("Email sent successfully {}", email.id);
146 mark_as_sent(&mut conn, email.id)
147 .await
148 .context("Couldn't mark as sent")?;
149 }
150 Err(err) => {
151 let is_transient = is_transient_smtp_error(&err);
152 let (error_code, smtp_response, smtp_response_code) = extract_smtp_error_details(&err);
153
154 tracing::error!(
155 "SMTP send failed for {} (attempt {}, transient={}): {:?}",
156 email.id,
157 attempt,
158 is_transient,
159 err
160 );
161
162 let mut tx = (*conn)
163 .begin()
164 .await
165 .context("Couldn't start email failure transaction")?;
166
167 insert_email_delivery_error(
168 &mut tx,
169 EmailDeliveryErrorInsert {
170 email_delivery_id: email.id,
171 attempt,
172 error_message: err.to_string(),
173 error_code,
174 smtp_response,
175 smtp_response_code,
176 is_transient,
177 },
178 )
179 .await
180 .context("Couldn't insert email delivery error history")?;
181
182 if is_transient {
183 if retry_window_expired(Some(email.first_failed_at.unwrap_or(now)), now) {
184 increment_retry_and_mark_non_retryable(&mut tx, email.id)
185 .await
186 .context("Couldn't close expired retryable email")?;
187 } else {
188 let next_retry_at = compute_next_retry_at(now, email.retry_count);
191 increment_retry_and_schedule(&mut tx, email.id, Some(next_retry_at))
192 .await
193 .context("Couldn't schedule retry")?;
194 }
195 } else {
196 increment_retry_and_mark_non_retryable(&mut tx, email.id)
197 .await
198 .context("Couldn't close non-retryable email")?;
199 }
200
201 tx.commit()
202 .await
203 .context("Couldn't commit email failure transaction")?;
204 }
205 };
206
207 Ok(())
208}
209
210enum TemplateApplyResult {
211 Ready(Vec<EmailGutenbergBlock>),
212 Abandoned,
213}
214
215async fn apply_email_template_replacements(
216 conn: &mut PgConnection,
217 template_type: EmailTemplateType,
218 email_id: Uuid,
219 user_id: Uuid,
220 blocks: Vec<EmailGutenbergBlock>,
221 attempt: i32,
222) -> anyhow::Result<TemplateApplyResult> {
223 let mut replacements = HashMap::new();
224
225 match template_type {
226 EmailTemplateType::ResetPasswordEmail => {
227 if let Some(token_str) =
228 get_unused_reset_password_token_with_user_id(conn, user_id).await?
229 {
230 let base =
231 std::env::var("FRONTEND_BASE_URL").unwrap_or(FRONTEND_BASE_URL.to_string());
232
233 let reset_url = format!(
234 "{}/reset-user-password/{}",
235 base.trim_end_matches('/'),
236 token_str.token
237 );
238
239 replacements.insert("RESET_LINK".to_string(), reset_url);
240 } else {
241 let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
242 record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
243 .await?;
244 return Ok(TemplateApplyResult::Abandoned);
245 }
246 }
247 EmailTemplateType::DeleteUserEmail => {
248 if let Some(code) =
249 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
250 conn, user_id,
251 )
252 .await?
253 {
254 replacements.insert("CODE".to_string(), code.code);
255 } else {
256 let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
257 record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
258 .await?;
259 return Ok(TemplateApplyResult::Abandoned);
260 }
261 }
262 EmailTemplateType::ConfirmEmailCode => {
263 if let Some(code) =
264 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
265 conn, user_id,
266 )
267 .await?
268 {
269 replacements.insert("CODE".to_string(), code.code);
270 } else {
271 let msg = anyhow::anyhow!("No verification code found for user {}", user_id);
272 record_non_retryable_failure(conn, email_id, attempt, "template", msg.to_string())
273 .await?;
274 return Ok(TemplateApplyResult::Abandoned);
275 }
276 }
277 EmailTemplateType::Generic => {
278 return Ok(TemplateApplyResult::Ready(blocks));
279 }
280 }
281
282 Ok(TemplateApplyResult::Ready(insert_placeholders(
283 blocks,
284 &replacements,
285 )))
286}
287
288fn insert_placeholders(
289 blocks: Vec<EmailGutenbergBlock>,
290 replacements: &HashMap<String, String>,
291) -> Vec<EmailGutenbergBlock> {
292 blocks
293 .into_iter()
294 .map(|mut block| {
295 if let BlockAttributes::Paragraph {
296 content,
297 drop_cap,
298 rest,
299 } = block.attributes
300 {
301 let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
302 acc.replace(&format!("{{{{{}}}}}", key), value)
303 });
304
305 block.attributes = BlockAttributes::Paragraph {
306 content: replaced_content,
307 drop_cap,
308 rest,
309 };
310 }
311 block
312 })
313 .collect()
314}
315
316fn build_email_message(
317 email: &Email,
318 attempt: i32,
319 msg_as_plaintext: String,
320 msg_as_html: String,
321) -> Result<Message> {
322 Message::builder()
323 .from(SMTP_FROM.parse()?)
324 .to(email
325 .to
326 .parse()
327 .with_context(|| format!("Invalid recipient address for email_id {}", email.id))?)
328 .subject(email.subject.clone().context("No subject")?)
329 .message_id(Some(format!(
330 "<{}-{}@{}>",
331 email.id,
332 attempt,
333 SMTP_MESSAGE_ID_DOMAIN.as_str()
334 )))
335 .multipart(
336 MultiPart::alternative()
337 .singlepart(
338 SinglePart::builder()
339 .header(header::ContentType::TEXT_PLAIN)
340 .body(msg_as_plaintext),
341 )
342 .singlepart(
343 SinglePart::builder()
344 .header(header::ContentType::TEXT_HTML)
345 .body(msg_as_html),
346 ),
347 )
348 .context("Failed to build email message")
349}
350
351fn infer_email_domain(value: &str) -> Option<String> {
352 let candidate = value
353 .rsplit('<')
354 .next()
355 .unwrap_or(value)
356 .trim()
357 .trim_end_matches('>')
358 .trim();
359 let (_, domain) = candidate.rsplit_once('@')?;
360 let domain = domain.trim();
361 if domain.is_empty() || domain.contains(' ') {
362 None
363 } else {
364 Some(domain.to_string())
365 }
366}
367
368async fn record_message_build_failure(
369 conn: &mut PgConnection,
370 email: &Email,
371 attempt: i32,
372 err: &anyhow::Error,
373) -> Result<()> {
374 tracing::error!(
375 "Message construction failed for email {} (attempt {}): {:#}",
376 email.id,
377 attempt,
378 err
379 );
380 record_non_retryable_failure(
381 conn,
382 email.id,
383 attempt,
384 "message_build",
385 format!("Message construction failed: {err:#}"),
386 )
387 .await
388}
389
390pub async fn main() -> anyhow::Result<()> {
391 tracing_subscriber::fmt().init();
392 dotenv::dotenv().ok();
393 tracing::info!("Email sender starting up...");
394
395 if std::env::var("SMTP_USER").is_err() || std::env::var("SMTP_PASS").is_err() {
396 tracing::warn!("SMTP user or password is missing or incorrect");
397 }
398
399 let pool = PgPool::connect(&DB_URL.to_string()).await?;
400 let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
401
402 let mailer = match SmtpTransport::relay(&SMTP_HOST) {
403 Ok(builder) => builder.credentials(creds).build(),
404 Err(e) => {
405 tracing::error!("Could not configure SMTP transport: {}", e);
406 return Err(e.into());
407 }
408 };
409
410 let mut interval = tokio::time::interval(Duration::from_secs(10));
411 loop {
412 interval.tick().await;
413 mail_sender(&pool, &mailer).await?;
414 }
415}
416
417fn retry_window_expired(first_failed_at: Option<DateTime<Utc>>, now: DateTime<Utc>) -> bool {
418 match first_failed_at {
419 Some(ts) => (now - ts).num_seconds() > MAX_RETRY_AGE_SECS,
420 None => false,
421 }
422}
423
424fn compute_next_retry_at(now: DateTime<Utc>, retry_count: i32) -> DateTime<Utc> {
425 let exponent = retry_count.max(0) as u32;
427 let multiplier = 2_i64.checked_pow(exponent).unwrap_or(i64::MAX);
428 let backoff = BASE_BACKOFF_SECS.saturating_mul(multiplier);
429 let capped = backoff.min(MAX_BACKOFF_SECS);
430 let jitter = rand::rng().random_range(0..=JITTER_SECS);
431 now + ChronoDuration::seconds(capped + jitter)
432}
433
434fn is_transient_smtp_error(err: &SmtpError) -> bool {
435 if err.is_transient() {
436 return true;
437 }
438 if err.is_timeout() || err.is_transport_shutdown() {
439 return true;
440 }
441 has_io_error(err)
442}
443
444fn has_io_error(err: &SmtpError) -> bool {
445 let mut source = err.source();
446 while let Some(inner) = source {
447 if inner.is::<std::io::Error>() {
448 return true;
449 }
450 source = inner.source();
451 }
452 false
453}
454
455fn extract_smtp_error_details(err: &SmtpError) -> (Option<String>, Option<String>, Option<i32>) {
456 let smtp_response_code = err.status().map(|code| i32::from(u16::from(code)));
457
458 let error_code = if err.is_transient() {
459 Some("transient".to_string())
460 } else if err.is_permanent() {
461 Some("permanent".to_string())
462 } else if err.is_timeout() {
463 Some("timeout".to_string())
464 } else if has_io_error(err) {
465 Some("network_io".to_string())
466 } else if err.is_transport_shutdown() {
467 Some("transport_shutdown".to_string())
468 } else if err.is_response() {
469 Some("response".to_string())
470 } else if err.is_client() {
471 Some("client".to_string())
472 } else {
473 None
474 };
475
476 let smtp_response = err.source().map(|source| source.to_string());
477
478 (error_code, smtp_response, smtp_response_code)
479}
480
481async fn record_non_retryable_failure(
482 conn: &mut PgConnection,
483 email_id: Uuid,
484 attempt: i32,
485 error_code: &'static str,
486 message: String,
487) -> Result<()> {
488 let mut tx = (*conn)
489 .begin()
490 .await
491 .context("Couldn't start template failure transaction")?;
492
493 insert_email_delivery_error(
494 &mut tx,
495 EmailDeliveryErrorInsert {
496 email_delivery_id: email_id,
497 attempt,
498 error_message: message,
499 error_code: Some(error_code.to_string()),
500 smtp_response: None,
501 smtp_response_code: None,
502 is_transient: false,
503 },
504 )
505 .await
506 .context("Couldn't insert email delivery error history")?;
507
508 increment_retry_and_mark_non_retryable(&mut tx, email_id)
509 .await
510 .context("Couldn't mark email as non-retryable for template error")?;
511
512 tx.commit()
513 .await
514 .context("Couldn't commit template failure transaction")?;
515
516 Ok(())
517}