headless_lms_server/programs/
email_deliver.rs

1use std::{env, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{FutureExt, StreamExt};
5use headless_lms_models::email_deliveries::{Email, fetch_emails, mark_as_sent, save_err_to_email};
6use headless_lms_utils::email_processor::{self, EmailGutenbergBlock};
7use lettre::{
8    Message, SmtpTransport, Transport,
9    message::{MultiPart, SinglePart, header},
10};
11use once_cell::sync::Lazy;
12use sqlx::PgPool;
13
14const BATCH_SIZE: usize = 100;
15
16static MOOCFI_EMAIL: Lazy<String> =
17    Lazy::new(|| env::var("MOOCFI_EMAIL").expect("No moocfi email found in the env variables."));
18static EMAIL_RELAY: Lazy<String> =
19    Lazy::new(|| env::var("EMAIL_RELAY").expect("No email relay found in the env variables."));
20static DB_URL: Lazy<String> =
21    Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
22
23pub async fn mail_sender() -> Result<()> {
24    tracing_subscriber::fmt().init();
25    dotenv::dotenv().ok();
26
27    let pool = PgPool::connect(&DB_URL).await?;
28
29    let mut conn = pool.acquire().await?;
30
31    let emails = fetch_emails(&mut conn).await?;
32    let mailer = SmtpTransport::relay(&EMAIL_RELAY)?.build();
33
34    let mut futures = tokio_stream::iter(emails)
35        .map(|email| {
36            let email_id = email.id;
37            send_message(email, &mailer, pool.clone()).inspect(move |r| {
38                if let Err(err) = r {
39                    tracing::error!("Failed to send email {}: {}", email_id, err)
40                }
41            })
42        })
43        .buffer_unordered(BATCH_SIZE);
44
45    while futures.next().await.is_some() {}
46
47    Ok(())
48}
49
50pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
51    let email_block: Vec<EmailGutenbergBlock> =
52        serde_json::from_value(email.body.context("No body")?)?;
53
54    let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
55    let msg_as_html = email_processor::process_content_to_html(&email_block);
56
57    let mut conn = pool.acquire().await?;
58    let email_to = email.to;
59    let msg = Message::builder()
60        .from(MOOCFI_EMAIL.parse()?)
61        .to(email
62            .to
63            .to_string()
64            .parse()
65            .with_context(|| format!("Invalid address: {}", email_to))?)
66        .subject(email.subject.context("No subject")?)
67        .multipart(
68            MultiPart::alternative()
69                .singlepart(
70                    SinglePart::builder()
71                        .header(header::ContentType::TEXT_PLAIN)
72                        .body(msg_as_plaintext),
73                )
74                .singlepart(
75                    SinglePart::builder()
76                        .header(header::ContentType::TEXT_HTML)
77                        .body(msg_as_html),
78                ),
79        )
80        // should never fail
81        .expect("Failed to build email");
82
83    match mailer.send(&msg) {
84        Ok(_) => mark_as_sent(email.id, &mut conn)
85            .await
86            .context("Couldn't mark as sent")?,
87        Err(err) => save_err_to_email(email.id, err, &mut conn)
88            .await
89            .context("Couldn't save sent err to db")?,
90    };
91
92    Ok(())
93}
94
95pub async fn main() -> anyhow::Result<()> {
96    let mut interval = tokio::time::interval(Duration::from_secs(10));
97    loop {
98        interval.tick().await;
99        mail_sender().await?;
100    }
101}