headless_lms_server/programs/
email_deliver.rs1use std::{env, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{FutureExt, StreamExt};
5use headless_lms_models::email_deliveries::{Email, fetch_emails, mark_as_sent, save_err_to_email};
6use headless_lms_models::email_templates::EmailTemplateType;
7use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
8use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
9use lettre::transport::smtp::authentication::Credentials;
10use lettre::{
11 Message, SmtpTransport, Transport,
12 message::{MultiPart, SinglePart, header},
13};
14use once_cell::sync::Lazy;
15use sqlx::PgConnection;
16use sqlx::PgPool;
17use std::collections::HashMap;
18use uuid::Uuid;
19const BATCH_SIZE: usize = 100;
20
21const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
22
23static SMTP_FROM: Lazy<String> =
24 Lazy::new(|| env::var("SMTP_FROM").expect("No moocfi email found in the env variables."));
25static SMTP_HOST: Lazy<String> =
26 Lazy::new(|| env::var("SMTP_HOST").expect("No email relay found in the env variables."));
27static DB_URL: Lazy<String> =
28 Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
29static SMTP_USER: Lazy<String> =
30 Lazy::new(|| env::var("SMTP_USER").expect("No smtp user found in env variables."));
31static SMTP_PASS: Lazy<String> =
32 Lazy::new(|| env::var("SMTP_PASS").expect("No smtp password found in env variables."));
33
34pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
35 let mut conn = pool.acquire().await?;
36
37 let emails = fetch_emails(&mut conn).await?;
38
39 let mut futures = tokio_stream::iter(emails)
40 .map(|email| {
41 let email_id = email.id;
42 send_message(email, mailer, pool.clone()).inspect(move |r| {
43 if let Err(err) = r {
44 tracing::error!("Failed to send email {}: {}", email_id, err)
45 }
46 })
47 })
48 .buffer_unordered(BATCH_SIZE);
49
50 while futures.next().await.is_some() {}
51
52 Ok(())
53}
54
55pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
56 let mut conn = pool.acquire().await?;
57 tracing::info!("Email send messages...");
58
59 let mut email_block: Vec<EmailGutenbergBlock> =
60 serde_json::from_value(email.body.context("No body")?)?;
61
62 if let Some(template_type) = email.template_type {
63 email_block = apply_email_template_replacements(
64 &mut conn,
65 template_type,
66 email.id,
67 email.user_id,
68 email_block,
69 )
70 .await?;
71 }
72
73 let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
74 let msg_as_html = email_processor::process_content_to_html(&email_block);
75
76 let email_to = &email.to;
77 let msg = Message::builder()
78 .from(SMTP_FROM.parse()?)
79 .to(email
80 .to
81 .parse()
82 .with_context(|| format!("Invalid address: {}", email_to))?)
83 .subject(email.subject.context("No subject")?)
84 .multipart(
85 MultiPart::alternative()
86 .singlepart(
87 SinglePart::builder()
88 .header(header::ContentType::TEXT_PLAIN)
89 .body(msg_as_plaintext),
90 )
91 .singlepart(
92 SinglePart::builder()
93 .header(header::ContentType::TEXT_HTML)
94 .body(msg_as_html),
95 ),
96 )
97 .expect("Failed to build email");
99
100 match mailer.send(&msg) {
101 Ok(_) => {
102 tracing::info!("Email sent successfully {}", email.id);
103 mark_as_sent(email.id, &mut conn)
104 .await
105 .context("Couldn't mark as sent")?;
106 }
107 Err(err) => {
108 tracing::error!("SMTP send failed for {}: {:?}", email.id, err);
109 save_err_to_email(email.id, err, &mut conn)
110 .await
111 .context("Couldn't save sent err to db")?;
112 }
113 };
114
115 Ok(())
116}
117
118async fn apply_email_template_replacements(
119 conn: &mut PgConnection,
120 template_type: EmailTemplateType,
121 email_id: Uuid,
122 user_id: Uuid,
123 blocks: Vec<EmailGutenbergBlock>,
124) -> anyhow::Result<Vec<EmailGutenbergBlock>> {
125 let mut replacements = HashMap::new();
126
127 match template_type {
128 EmailTemplateType::ResetPasswordEmail => {
129 if let Some(token_str) =
130 get_unused_reset_password_token_with_user_id(conn, user_id).await?
131 {
132 let base =
133 std::env::var("FRONTEND_BASE_URL").unwrap_or(FRONTEND_BASE_URL.to_string());
134
135 let reset_url = format!(
136 "{}/reset-user-password/{}",
137 base.trim_end_matches('/'),
138 token_str.token
139 );
140
141 replacements.insert("RESET_LINK".to_string(), reset_url);
142 } else {
143 let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
144 save_err_to_email(email_id, msg, conn).await?;
145 return Ok(blocks);
146 }
147 }
148 EmailTemplateType::DeleteUserEmail => {
149 if let Some(code) =
150 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
151 conn, user_id,
152 )
153 .await?
154 {
155 replacements.insert("CODE".to_string(), code.code);
156 } else {
157 let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
158 save_err_to_email(email_id, msg, conn).await?;
159 return Ok(blocks);
160 }
161 }
162 EmailTemplateType::ConfirmEmailCode => {
163 if let Some(code) =
164 headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
165 conn, user_id,
166 )
167 .await?
168 {
169 replacements.insert("CODE".to_string(), code.code);
170 } else {
171 let msg = anyhow::anyhow!("No verification code found for user {}", user_id);
172 save_err_to_email(email_id, msg, conn).await?;
173 return Ok(blocks);
174 }
175 }
176 EmailTemplateType::Generic => {
177 return Ok(blocks);
178 }
179 }
180
181 Ok(insert_placeholders(blocks, &replacements))
182}
183
184fn insert_placeholders(
185 blocks: Vec<EmailGutenbergBlock>,
186 replacements: &HashMap<String, String>,
187) -> Vec<EmailGutenbergBlock> {
188 blocks
189 .into_iter()
190 .map(|mut block| {
191 if let BlockAttributes::Paragraph {
192 content,
193 drop_cap,
194 rest,
195 } = block.attributes
196 {
197 let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
198 acc.replace(&format!("{{{{{}}}}}", key), value)
199 });
200
201 block.attributes = BlockAttributes::Paragraph {
202 content: replaced_content,
203 drop_cap,
204 rest,
205 };
206 }
207 block
208 })
209 .collect()
210}
211
212pub async fn main() -> anyhow::Result<()> {
213 tracing_subscriber::fmt().init();
214 dotenv::dotenv().ok();
215 tracing::info!("Email sender starting up...");
216
217 if std::env::var("SMTP_USER").is_err() || std::env::var("SMTP_PASS").is_err() {
218 tracing::warn!("SMTP user or password is missing or incorrect");
219 }
220
221 let pool = PgPool::connect(&DB_URL.to_string()).await?;
222 let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
223
224 let mailer = match SmtpTransport::relay(&SMTP_HOST) {
225 Ok(builder) => builder.credentials(creds).build(),
226 Err(e) => {
227 tracing::error!("Could not configure SMTP transport: {}", e);
228 return Err(e.into());
229 }
230 };
231
232 let mut interval = tokio::time::interval(Duration::from_secs(10));
233 loop {
234 interval.tick().await;
235 mail_sender(&pool, &mailer).await?;
236 }
237}