headless_lms_server/programs/
ended_exams_processor.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4};
5
6use crate::setup_tracing;
7use chrono::{Duration, Utc};
8use dotenv::dotenv;
9use headless_lms_models::{self as models, ModelError, ModelErrorType};
10use headless_lms_utils::prelude::BackendError;
11use sqlx::{Connection, PgConnection, PgPool};
12use uuid::Uuid;
13
14pub async fn main() -> anyhow::Result<()> {
15    // TODO: Audit that the environment access only happens in single-threaded code.
16    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
17    dotenv().ok();
18    setup_tracing()?;
19    let database_url = env::var("DATABASE_URL")
20        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
21    let db_pool = PgPool::connect(&database_url).await?;
22    let mut conn = db_pool.acquire().await?;
23    process_ended_exams(&mut conn).await?;
24    process_ended_exam_enrollments(&mut conn).await
25}
26
27/// Fetches ended exams that haven't yet been processed and updates completions for them.
28async fn process_ended_exams(conn: &mut sqlx::PgConnection) -> anyhow::Result<()> {
29    let now = Utc::now();
30    let exam_ids =
31        models::ended_processed_exams::get_unprocessed_ended_exams_by_timestamp(conn, now).await?;
32    tracing::info!("Processing completions for {} ended exams.", exam_ids.len());
33    let mut processed_courses_cache = HashSet::new();
34    let mut success = 0;
35    for exam_id in exam_ids.iter() {
36        match process_ended_exam(conn, *exam_id, &mut processed_courses_cache).await {
37            Ok(_) => success += 1,
38            Err(err) => {
39                tracing::error!("Failed to process exam {}: {:#?}", exam_id, err);
40            }
41        }
42    }
43    tracing::info!(
44        "Exams processed. Succeeded: {}, failed: {}.",
45        success,
46        exam_ids.len() - success
47    );
48    Ok(())
49}
50
51/// Processes completions for courses associated with the given exam.
52///
53/// Because the same course can belong to multiple exams at the same time, a cache for already
54/// processed courses can be provided to avoid unnecessarily reprocessing those courses again.
55async fn process_ended_exam(
56    conn: &mut PgConnection,
57    exam_id: Uuid,
58    already_processed_courses: &mut HashSet<Uuid>,
59) -> anyhow::Result<()> {
60    let course_ids = models::course_exams::get_course_ids_by_exam_id(conn, exam_id).await?;
61    let mut tx = conn.begin().await?;
62    for course_id in course_ids {
63        if already_processed_courses.contains(&course_id) {
64            continue;
65        } else {
66            models::library::progressing::process_all_course_completions(&mut tx, course_id)
67                .await?;
68            already_processed_courses.insert(course_id);
69        }
70    }
71    models::ended_processed_exams::upsert(&mut tx, exam_id).await?;
72    tx.commit().await?;
73    Ok(())
74}
75
76/// Processes ended exam enrollments
77async fn process_ended_exam_enrollments(conn: &mut PgConnection) -> anyhow::Result<()> {
78    let mut tx = conn.begin().await?;
79    let mut success = 0;
80    let mut failed = 0;
81
82    let ongoing_exam_enrollments: Vec<headless_lms_models::exams::ExamEnrollment> =
83        models::exams::get_ongoing_exam_enrollments(&mut tx).await?;
84    let exams = models::exams::get_exams(&mut tx).await?;
85
86    let mut needs_ended_at_date: HashMap<Uuid, Vec<Uuid>> = HashMap::new();
87    for enrollment in ongoing_exam_enrollments {
88        let exam = exams
89            .get(&enrollment.exam_id)
90            .ok_or_else(|| ModelError::new(ModelErrorType::Generic, "Exam not found", None))?;
91
92        //Check if users exams should have ended
93        if Utc::now() > enrollment.started_at + Duration::minutes(exam.time_minutes.into()) {
94            needs_ended_at_date
95                .entry(exam.id)
96                .or_default()
97                .push(enrollment.user_id);
98        }
99    }
100
101    for entry in needs_ended_at_date.into_iter() {
102        let exam_id = entry.0;
103        let user_ids = entry.1;
104        match models::exams::update_exam_ended_at_for_users_with_exam_id(
105            &mut tx,
106            exam_id,
107            &user_ids,
108            Utc::now(),
109        )
110        .await
111        {
112            Ok(_) => success += user_ids.len(),
113            Err(err) => {
114                failed += user_ids.len();
115                tracing::error!(
116                    "Failed to end exam enrolments for exam {}: {:#?}",
117                    exam_id,
118                    err
119                );
120            }
121        }
122    }
123
124    tracing::info!(
125        "Exam enrollments processed. Succeeded: {}, failed: {}.",
126        success,
127        failed
128    );
129    tx.commit().await?;
130
131    Ok(())
132}