headless_lms_server/programs/
email_deliver.rs

1use std::{env, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{FutureExt, StreamExt};
5use headless_lms_models::email_deliveries::{Email, fetch_emails, mark_as_sent, save_err_to_email};
6use headless_lms_models::email_templates::EmailTemplateType;
7use headless_lms_models::user_passwords::get_unused_reset_password_token_with_user_id;
8use headless_lms_utils::email_processor::{self, BlockAttributes, EmailGutenbergBlock};
9use lettre::transport::smtp::authentication::Credentials;
10use lettre::{
11    Message, SmtpTransport, Transport,
12    message::{MultiPart, SinglePart, header},
13};
14use once_cell::sync::Lazy;
15use sqlx::PgConnection;
16use sqlx::PgPool;
17use std::collections::HashMap;
18use uuid::Uuid;
19const BATCH_SIZE: usize = 100;
20
21const FRONTEND_BASE_URL: &str = "https://courses.mooc.fi";
22
23static SMTP_FROM: Lazy<String> =
24    Lazy::new(|| env::var("SMTP_FROM").expect("No moocfi email found in the env variables."));
25static SMTP_HOST: Lazy<String> =
26    Lazy::new(|| env::var("SMTP_HOST").expect("No email relay found in the env variables."));
27static DB_URL: Lazy<String> =
28    Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
29static SMTP_USER: Lazy<String> =
30    Lazy::new(|| env::var("SMTP_USER").expect("No smtp user found in env variables."));
31static SMTP_PASS: Lazy<String> =
32    Lazy::new(|| env::var("SMTP_PASS").expect("No smtp password found in env variables."));
33
34pub async fn mail_sender(pool: &PgPool, mailer: &SmtpTransport) -> Result<()> {
35    let mut conn = pool.acquire().await?;
36
37    let emails = fetch_emails(&mut conn).await?;
38
39    let mut futures = tokio_stream::iter(emails)
40        .map(|email| {
41            let email_id = email.id;
42            send_message(email, mailer, pool.clone()).inspect(move |r| {
43                if let Err(err) = r {
44                    tracing::error!("Failed to send email {}: {}", email_id, err)
45                }
46            })
47        })
48        .buffer_unordered(BATCH_SIZE);
49
50    while futures.next().await.is_some() {}
51
52    Ok(())
53}
54
55pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
56    let mut conn = pool.acquire().await?;
57    tracing::info!("Email send messages...");
58
59    let mut email_block: Vec<EmailGutenbergBlock> =
60        serde_json::from_value(email.body.context("No body")?)?;
61
62    if let Some(template_type) = email.template_type {
63        email_block = apply_email_template_replacements(
64            &mut conn,
65            template_type,
66            email.id,
67            email.user_id,
68            email_block,
69        )
70        .await?;
71    }
72
73    let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
74    let msg_as_html = email_processor::process_content_to_html(&email_block);
75
76    let email_to = &email.to;
77    let msg = Message::builder()
78        .from(SMTP_FROM.parse()?)
79        .to(email
80            .to
81            .parse()
82            .with_context(|| format!("Invalid address: {}", email_to))?)
83        .subject(email.subject.context("No subject")?)
84        .multipart(
85            MultiPart::alternative()
86                .singlepart(
87                    SinglePart::builder()
88                        .header(header::ContentType::TEXT_PLAIN)
89                        .body(msg_as_plaintext),
90                )
91                .singlepart(
92                    SinglePart::builder()
93                        .header(header::ContentType::TEXT_HTML)
94                        .body(msg_as_html),
95                ),
96        )
97        // should never fail
98        .expect("Failed to build email");
99
100    match mailer.send(&msg) {
101        Ok(_) => {
102            tracing::info!("Email sent successfully {}", email.id);
103            mark_as_sent(email.id, &mut conn)
104                .await
105                .context("Couldn't mark as sent")?;
106        }
107        Err(err) => {
108            tracing::error!("SMTP send failed for {}: {:?}", email.id, err);
109            save_err_to_email(email.id, err, &mut conn)
110                .await
111                .context("Couldn't save sent err to db")?;
112        }
113    };
114
115    Ok(())
116}
117
118async fn apply_email_template_replacements(
119    conn: &mut PgConnection,
120    template_type: EmailTemplateType,
121    email_id: Uuid,
122    user_id: Uuid,
123    blocks: Vec<EmailGutenbergBlock>,
124) -> anyhow::Result<Vec<EmailGutenbergBlock>> {
125    let mut replacements = HashMap::new();
126
127    match template_type {
128        EmailTemplateType::ResetPasswordEmail => {
129            if let Some(token_str) =
130                get_unused_reset_password_token_with_user_id(conn, user_id).await?
131            {
132                let base =
133                    std::env::var("FRONTEND_BASE_URL").unwrap_or(FRONTEND_BASE_URL.to_string());
134
135                let reset_url = format!(
136                    "{}/reset-user-password/{}",
137                    base.trim_end_matches('/'),
138                    token_str.token
139                );
140
141                replacements.insert("RESET_LINK".to_string(), reset_url);
142            } else {
143                let msg = anyhow::anyhow!("No reset token found for user {}", user_id);
144                save_err_to_email(email_id, msg, conn).await?;
145                return Ok(blocks);
146            }
147        }
148        EmailTemplateType::DeleteUserEmail => {
149            if let Some(code) =
150                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
151                    conn, user_id,
152                )
153                .await?
154            {
155                replacements.insert("CODE".to_string(), code.code);
156            } else {
157                let msg = anyhow::anyhow!("No deletion code found for user {}", user_id);
158                save_err_to_email(email_id, msg, conn).await?;
159                return Ok(blocks);
160            }
161        }
162        EmailTemplateType::ConfirmEmailCode => {
163            if let Some(code) =
164                headless_lms_models::user_email_codes::get_unused_user_email_code_with_user_id(
165                    conn, user_id,
166                )
167                .await?
168            {
169                replacements.insert("CODE".to_string(), code.code);
170            } else {
171                let msg = anyhow::anyhow!("No verification code found for user {}", user_id);
172                save_err_to_email(email_id, msg, conn).await?;
173                return Ok(blocks);
174            }
175        }
176        EmailTemplateType::Generic => {
177            return Ok(blocks);
178        }
179    }
180
181    Ok(insert_placeholders(blocks, &replacements))
182}
183
184fn insert_placeholders(
185    blocks: Vec<EmailGutenbergBlock>,
186    replacements: &HashMap<String, String>,
187) -> Vec<EmailGutenbergBlock> {
188    blocks
189        .into_iter()
190        .map(|mut block| {
191            if let BlockAttributes::Paragraph {
192                content,
193                drop_cap,
194                rest,
195            } = block.attributes
196            {
197                let replaced_content = replacements.iter().fold(content, |acc, (key, value)| {
198                    acc.replace(&format!("{{{{{}}}}}", key), value)
199                });
200
201                block.attributes = BlockAttributes::Paragraph {
202                    content: replaced_content,
203                    drop_cap,
204                    rest,
205                };
206            }
207            block
208        })
209        .collect()
210}
211
212pub async fn main() -> anyhow::Result<()> {
213    tracing_subscriber::fmt().init();
214    dotenv::dotenv().ok();
215    tracing::info!("Email sender starting up...");
216
217    if std::env::var("SMTP_USER").is_err() || std::env::var("SMTP_PASS").is_err() {
218        tracing::warn!("SMTP user or password is missing or incorrect");
219    }
220
221    let pool = PgPool::connect(&DB_URL.to_string()).await?;
222    let creds = Credentials::new(SMTP_USER.to_string(), SMTP_PASS.to_string());
223
224    let mailer = match SmtpTransport::relay(&SMTP_HOST) {
225        Ok(builder) => builder.credentials(creds).build(),
226        Err(e) => {
227            tracing::error!("Could not configure SMTP transport: {}", e);
228            return Err(e.into());
229        }
230    };
231
232    let mut interval = tokio::time::interval(Duration::from_secs(10));
233    loop {
234        interval.tick().await;
235        mail_sender(&pool, &mailer).await?;
236    }
237}