headless_lms_server/programs/
email_deliver.rs1use std::{env, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{FutureExt, StreamExt};
5use headless_lms_models::email_deliveries::{Email, fetch_emails, mark_as_sent, save_err_to_email};
6use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
7use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
8use lettre::transport::smtp::authentication::Credentials;
9use lettre::{
10 Message, SmtpTransport, Transport,
11 message::{MultiPart, SinglePart, header},
12};
13use once_cell::sync::Lazy;
14use sqlx::PgConnection;
15use sqlx::PgPool;
16use std::collections::HashMap;
17use uuid::Uuid;
18const BATCH_SIZE: usize = 100;
19
20const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
21
22static SMTP_FROM: Lazy<String> =
23 Lazy::new(|| env::var("SMTP_FROM").expect("No moocfi email found in the env variables."));
24static SMTP_HOST: Lazy<String> =
25 Lazy::new(|| env::var("SMTP_HOST").expect("No email relay found in the env variables."));
26static DB_URL: Lazy<String> =
27 Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
28static SMTP_USER: Lazy<String> =
29 Lazy::new(|| env::var("SMTP_USER").expect("No smtp user found in env variables."));
30static SMTP_PASS: Lazy<String> =
31 Lazy::new(|| env::var("SMTP_PASS").expect("No smtp password found in env variables."));
32
33pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
34 let mut conn = pool.acquire().await?;
35
36 let emails = fetch_emails(&mut conn).await?;
37
38 let mut futures = tokio_stream::iter(emails)
39 .map(|email| {
40 let email_id = email.id;
41 send_message(email, mailer, pool.clone()).inspect(move |r| {
42 if let Err(err) = r {
43 tracing::error!("Failed to send email {}: {}", email_id, err)
44 }
45 })
46 })
47 .buffer_unordered(BATCH_SIZE);
48
49 while futures.next().await.is_some() {}
50
51 Ok(())
52}
53
54pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
55 let mut conn = pool.acquire().await?;
56 tracing::info!("Email send messages...");
57
58 let mut email_block: Vec<EmailGutenbergBlock> =
59 serde_json::from_value(email.body.context("No body")?)?;
60
61 if let Some(name) = &email.name {
62 email_block = apply_email_template_replacements(
63 &mut conn,
64 name,
65 email.id,
66 email.user_id,
67 email_block,
68 )
69 .await?;
70 }
71
72 let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
73 let msg_as_html = email_processor::process_content_to_html(&email_block);
74
75 let email_to = &email.to;
76 let msg = Message::builder()
77 .from(SMTP_FROM.parse()?)
78 .to(email
79 .to
80 .parse()
81 .with_context(|| format!("Invalid address: {}", email_to))?)
82 .subject(email.subject.context("No subject")?)
83 .multipart(
84 MultiPart::alternative()
85 .singlepart(
86 SinglePart::builder()
87 .header(header::ContentType::TEXT_PLAIN)
88 .body(msg_as_plaintext),
89 )
90 .singlepart(
91 SinglePart::builder()
92 .header(header::ContentType::TEXT_HTML)
93 .body(msg_as_html),
94 ),
95 )
96 .expect("Failed to build email");
98
99 match mailer.send(&msg) {
100 Ok(_) => {
101 tracing::info!("Email sent successfully {}", email.id);
102 mark_as_sent(email.id, &mut conn)
103 .await
104 .context("Couldn't mark as sent")?;
105 }
106 Err(err) => {
107 tracing::error!("SMTP send failed for {}: {:?}", email.id, err);
108 save_err_to_email(email.id, err, &mut conn)
109 .await
110 .context("Couldn't save sent err to db")?;
111 }
112 };
113
114 Ok(())
115}
116
117async fn apply_email_template_replacements(
118 conn: &mut PgConnection,
119 template_name: &str,
120 email_id: Uuid,
121 user_id: Uuid,
122 blocks: Vec<EmailGutenbergBlock>,
123) -> anyhow::Result<Vec<EmailGutenbergBlock>> {
124 let mut replacements = HashMap::new();
125
126 match template_name.to_lowercase().as_str() {
127 "reset-password-email" => {
128 if let Some(token_str) =
129 get_unused_reset_password_token_with_user_id(conn, user_id).await?
130 {
131 let base =
132 std::env::var("FRONTEND_BASE_URL").unwrap_or(FRONTEND_BASE_URL.to_string());
133
134 let reset_url = format!(
135 "{}/reset-user-password/{}",
136 base.trim_end_matches('/'),
137 token_str.token
138 );
139
140 replacements.insert("RESET_LINK".to_string(), reset_url);
141 } else {
142 let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
143 save_err_to_email(email_id, msg, conn).await?;
144 return Ok(blocks);
145 }
146 }
147 "delete-user-email" => {
148 if let Some(code) =
149 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
150 conn, user_id,
151 )
152 .await?
153 {
154 replacements.insert("CODE".to_string(), code.code);
155 } else {
156 let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
157 save_err_to_email(email_id, msg, conn).await?;
158 return Ok(blocks);
159 }
160 }
161 _ => {}
162 }
163
164 Ok(insert_placeholders(blocks, &replacements))
165}
166
167fn insert_placeholders(
168 blocks: Vec<EmailGutenbergBlock>,
169 replacements: &HashMap<String, String>,
170) -> Vec<EmailGutenbergBlock> {
171 blocks
172 .into_iter()
173 .map(|mut block| {
174 if let BlockAttributes::Paragraph {
175 content,
176 drop_cap,
177 rest,
178 } = block.attributes
179 {
180 let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
181 acc.replace(&format!("{{{{{}}}}}", key), value)
182 });
183
184 block.attributes = BlockAttributes::Paragraph {
185 content: replaced_content,
186 drop_cap,
187 rest,
188 };
189 }
190 block
191 })
192 .collect()
193}
194
195pub async fn main() -> anyhow::Result<()> {
196 tracing_subscriber::fmt().init();
197 dotenv::dotenv().ok();
198 tracing::info!("Email sender starting up...");
199
200 if std::env::var("SMTP_USER").is_err() || std::env::var("SMTP_PASS").is_err() {
201 tracing::warn!("SMTP user or password is missing or incorrect");
202 }
203
204 let pool = PgPool::connect(&DB_URL.to_string()).await?;
205 let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
206
207 let mailer = match SmtpTransport::relay(&SMTP_HOST) {
208 Ok(builder) => builder.credentials(creds).build(),
209 Err(e) => {
210 tracing::error!("Could not configure SMTP transport: {}", e);
211 return Err(e.into());
212 }
213 };
214
215 let mut interval = tokio::time::interval(Duration::from_secs(10));
216 loop {
217 interval.tick().await;
218 mail_sender(&pool, &mailer).await?;
219 }
220}