headless_lms_server/programs/
email_deliver.rs1use std::{env, time::Duration};
2
3use anyhow::{Context, Result};
4use futures::{FutureExt, StreamExt};
5use headless_lms_models::email_deliveries::{Email, fetch_emails, mark_as_sent, save_err_to_email};
6use headless_lms_utils::email_processor::{self, EmailGutenbergBlock};
7use lettre::{
8 Message, SmtpTransport, Transport,
9 message::{MultiPart, SinglePart, header},
10};
11use once_cell::sync::Lazy;
12use sqlx::PgPool;
13
14const BATCH_SIZE: usize = 100;
15
16static MOOCFI_EMAIL: Lazy<String> =
17 Lazy::new(|| env::var("MOOCFI_EMAIL").expect("No moocfi email found in the env variables."));
18static EMAIL_RELAY: Lazy<String> =
19 Lazy::new(|| env::var("EMAIL_RELAY").expect("No email relay found in the env variables."));
20static DB_URL: Lazy<String> =
21 Lazy::new(|| env::var("DATABASE_URL").expect("No db url found in the env variables."));
22
23pub async fn mail_sender() -> Result<()> {
24 tracing_subscriber::fmt().init();
25 dotenv::dotenv().ok();
26
27 let pool = PgPool::connect(&DB_URL).await?;
28
29 let mut conn = pool.acquire().await?;
30
31 let emails = fetch_emails(&mut conn).await?;
32 let mailer = SmtpTransport::relay(&EMAIL_RELAY)?.build();
33
34 let mut futures = tokio_stream::iter(emails)
35 .map(|email| {
36 let email_id = email.id;
37 send_message(email, &mailer, pool.clone()).inspect(move |r| {
38 if let Err(err) = r {
39 tracing::error!("Failed to send email {}: {}", email_id, err)
40 }
41 })
42 })
43 .buffer_unordered(BATCH_SIZE);
44
45 while futures.next().await.is_some() {}
46
47 Ok(())
48}
49
50pub async fn send_message(email: Email, mailer: &SmtpTransport, pool: PgPool) -> Result<()> {
51 let email_block: Vec<EmailGutenbergBlock> =
52 serde_json::from_value(email.body.context("No body")?)?;
53
54 let msg_as_plaintext = email_processor::process_content_to_plaintext(&email_block);
55 let msg_as_html = email_processor::process_content_to_html(&email_block);
56
57 let mut conn = pool.acquire().await?;
58 let email_to = email.to;
59 let msg = Message::builder()
60 .from(MOOCFI_EMAIL.parse()?)
61 .to(email
62 .to
63 .to_string()
64 .parse()
65 .with_context(|| format!("Invalid address: {}", email_to))?)
66 .subject(email.subject.context("No subject")?)
67 .multipart(
68 MultiPart::alternative()
69 .singlepart(
70 SinglePart::builder()
71 .header(header::ContentType::TEXT_PLAIN)
72 .body(msg_as_plaintext),
73 )
74 .singlepart(
75 SinglePart::builder()
76 .header(header::ContentType::TEXT_HTML)
77 .body(msg_as_html),
78 ),
79 )
80 .expect("Failed to build email");
82
83 match mailer.send(&msg) {
84 Ok(_) => mark_as_sent(email.id, &mut conn)
85 .await
86 .context("Couldn't mark as sent")?,
87 Err(err) => save_err_to_email(email.id, err, &mut conn)
88 .await
89 .context("Couldn't save sent err to db")?,
90 };
91
92 Ok(())
93}
94
95pub async fn main() -> anyhow::Result<()> {
96 let mut interval = tokio::time::interval(Duration::from_secs(10));
97 loop {
98 interval.tick().await;
99 mail_sender().await?;
100 }
101}