headless_lms_server/programs/
email_deliver.rs

1use std::{env, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{FutureExt, StreamExt};
5use headless_lms_models::email_deliveries::{Email, fetch_emails, mark_as_sent, save_err_to_email};
6use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
7use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
8use lettre::transport::smtp::authentication::Credentials;
9use lettre::{
10    Message, SmtpTransport, Transport,
11    message::{MultiPart, SinglePart, header},
12};
13use once_cell::sync::Lazy;
14use sqlx::PgConnection;
15use sqlx::PgPool;
16use std::collections::HashMap;
17use uuid::Uuid;
18const BATCH_SIZE: usize = 100;
19
20const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
21
22static SMTP_FROM: Lazy<String> =
23    Lazy::new(|| env::var("SMTP_FROM").expect("No moocfi email found in the env variables."));
24static SMTP_HOST: Lazy<String> =
25    Lazy::new(|| env::var("SMTP_HOST").expect("No email relay found in the env variables."));
26static DB_URL: Lazy<String> =
27    Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
28static SMTP_USER: Lazy<String> =
29    Lazy::new(|| env::var("SMTP_USER").expect("No smtp user found in env variables."));
30static SMTP_PASS: Lazy<String> =
31    Lazy::new(|| env::var("SMTP_PASS").expect("No smtp password found in env variables."));
32
33pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
34    let mut conn = pool.acquire().await?;
35
36    let emails = fetch_emails(&mut conn).await?;
37
38    let mut futures = tokio_stream::iter(emails)
39        .map(|email| {
40            let email_id = email.id;
41            send_message(email, mailer, pool.clone()).inspect(move |r| {
42                if let Err(err) = r {
43                    tracing::error!("Failed to send email {}: {}", email_id, err)
44                }
45            })
46        })
47        .buffer_unordered(BATCH_SIZE);
48
49    while futures.next().await.is_some() {}
50
51    Ok(())
52}
53
54pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
55    let mut conn = pool.acquire().await?;
56    tracing::info!("Email send messages...");
57
58    let mut email_block: Vec<EmailGutenbergBlock> =
59        serde_json::from_value(email.body.context("No body")?)?;
60
61    if let Some(name) = &email.name {
62        email_block = apply_email_template_replacements(
63            &mut conn,
64            name,
65            email.id,
66            email.user_id,
67            email_block,
68        )
69        .await?;
70    }
71
72    let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
73    let msg_as_html = email_processor::process_content_to_html(&email_block);
74
75    let email_to = &email.to;
76    let msg = Message::builder()
77        .from(SMTP_FROM.parse()?)
78        .to(email
79            .to
80            .parse()
81            .with_context(|| format!("Invalid address: {}", email_to))?)
82        .subject(email.subject.context("No subject")?)
83        .multipart(
84            MultiPart::alternative()
85                .singlepart(
86                    SinglePart::builder()
87                        .header(header::ContentType::TEXT_PLAIN)
88                        .body(msg_as_plaintext),
89                )
90                .singlepart(
91                    SinglePart::builder()
92                        .header(header::ContentType::TEXT_HTML)
93                        .body(msg_as_html),
94                ),
95        )
96        // should never fail
97        .expect("Failed to build email");
98
99    match mailer.send(&msg) {
100        Ok(_) => {
101            tracing::info!("Email sent successfully {}", email.id);
102            mark_as_sent(email.id, &mut conn)
103                .await
104                .context("Couldn't mark as sent")?;
105        }
106        Err(err) => {
107            tracing::error!("SMTP send failed for {}: {:?}", email.id, err);
108            save_err_to_email(email.id, err, &mut conn)
109                .await
110                .context("Couldn't save sent err to db")?;
111        }
112    };
113
114    Ok(())
115}
116
117async fn apply_email_template_replacements(
118    conn: &mut PgConnection,
119    template_name: &str,
120    email_id: Uuid,
121    user_id: Uuid,
122    blocks: Vec<EmailGutenbergBlock>,
123) -> anyhow::Result<Vec<EmailGutenbergBlock>> {
124    let mut replacements = HashMap::new();
125
126    match template_name.to_lowercase().as_str() {
127        "reset-password-email" => {
128            if let Some(token_str) =
129                get_unused_reset_password_token_with_user_id(conn, user_id).await?
130            {
131                let base =
132                    std::env::var("FRONTEND_BASE_URL").unwrap_or(FRONTEND_BASE_URL.to_string());
133
134                let reset_url = format!(
135                    "{}/reset-user-password/{}",
136                    base.trim_end_matches('/'),
137                    token_str.token
138                );
139
140                replacements.insert("RESET_LINK".to_string(), reset_url);
141            } else {
142                let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
143                save_err_to_email(email_id, msg, conn).await?;
144                return Ok(blocks);
145            }
146        }
147        "delete-user-email" => {
148            if let Some(code) =
149                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
150                    conn, user_id,
151                )
152                .await?
153            {
154                replacements.insert("CODE".to_string(), code.code);
155            } else {
156                let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
157                save_err_to_email(email_id, msg, conn).await?;
158                return Ok(blocks);
159            }
160        }
161        _ => {}
162    }
163
164    Ok(insert_placeholders(blocks, &replacements))
165}
166
167fn insert_placeholders(
168    blocks: Vec<EmailGutenbergBlock>,
169    replacements: &HashMap<String, String>,
170) -> Vec<EmailGutenbergBlock> {
171    blocks
172        .into_iter()
173        .map(|mut block| {
174            if let BlockAttributes::Paragraph {
175                content,
176                drop_cap,
177                rest,
178            } = block.attributes
179            {
180                let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
181                    acc.replace(&format!("{{{{{}}}}}", key), value)
182                });
183
184                block.attributes = BlockAttributes::Paragraph {
185                    content: replaced_content,
186                    drop_cap,
187                    rest,
188                };
189            }
190            block
191        })
192        .collect()
193}
194
195pub async fn main() -> anyhow::Result<()> {
196    tracing_subscriber::fmt().init();
197    dotenv::dotenv().ok();
198    tracing::info!("Email sender starting up...");
199
200    if std::env::var("SMTP_USER").is_err() || std::env::var("SMTP_PASS").is_err() {
201        tracing::warn!("SMTP user or password is missing or incorrect");
202    }
203
204    let pool = PgPool::connect(&DB_URL.to_string()).await?;
205    let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
206
207    let mailer = match SmtpTransport::relay(&SMTP_HOST) {
208        Ok(builder) => builder.credentials(creds).build(),
209        Err(e) => {
210            tracing::error!("Could not configure SMTP transport: {}", e);
211            return Err(e.into());
212        }
213    };
214
215    let mut interval = tokio::time::interval(Duration::from_secs(10));
216    loop {
217        interval.tick().await;
218        mail_sender(&pool, &mailer).await?;
219    }
220}