headless_lms_server/programs/
ended_exams_processor.rs1use std::{
2 collections::{HashMap, HashSet},
3 env,
4};
5
6use crate::setup_tracing;
7use chrono::{Duration, Utc};
8use dotenv::dotenv;
9use headless_lms_models::{self as models, ModelError, ModelErrorType};
10use headless_lms_utils::prelude::BackendError;
11use sqlx::{Connection, PgConnection, PgPool};
12use uuid::Uuid;
13
14pub async fn main() -> anyhow::Result<()> {
15 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
17 dotenv().ok();
18 setup_tracing()?;
19 let database_url = env::var("DATABASE_URL")
20 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
21 let db_pool = PgPool::connect(&database_url).await?;
22 let mut conn = db_pool.acquire().await?;
23 process_ended_exams(&mut conn).await?;
24 process_ended_exam_enrollments(&mut conn).await
25}
26
27async fn process_ended_exams(conn: &mut sqlx::PgConnection) -> anyhow::Result<()> {
29 let now = Utc::now();
30 let exam_ids =
31 models::ended_processed_exams::get_unprocessed_ended_exams_by_timestamp(conn, now).await?;
32 tracing::info!("Processing completions for {} ended exams.", exam_ids.len());
33 let mut processed_courses_cache = HashSet::new();
34 let mut success = 0;
35 for exam_id in exam_ids.iter() {
36 match process_ended_exam(conn, *exam_id, &mut processed_courses_cache).await {
37 Ok(_) => success += 1,
38 Err(err) => {
39 tracing::error!("Failed to process exam {}: {:#?}", exam_id, err);
40 }
41 }
42 }
43 tracing::info!(
44 "Exams processed. Succeeded: {}, failed: {}.",
45 success,
46 exam_ids.len() - success
47 );
48 Ok(())
49}
50
51async fn process_ended_exam(
56 conn: &mut PgConnection,
57 exam_id: Uuid,
58 already_processed_courses: &mut HashSet<Uuid>,
59) -> anyhow::Result<()> {
60 let course_ids = models::course_exams::get_course_ids_by_exam_id(conn, exam_id).await?;
61 let mut tx = conn.begin().await?;
62 for course_id in course_ids {
63 if already_processed_courses.contains(&course_id) {
64 continue;
65 } else {
66 models::library::progressing::process_all_course_completions(&mut tx, course_id)
67 .await?;
68 already_processed_courses.insert(course_id);
69 }
70 }
71 models::ended_processed_exams::upsert(&mut tx, exam_id).await?;
72 tx.commit().await?;
73 Ok(())
74}
75
76async fn process_ended_exam_enrollments(conn: &mut PgConnection) -> anyhow::Result<()> {
78 let mut tx = conn.begin().await?;
79 let mut success = 0;
80 let mut failed = 0;
81
82 let ongoing_exam_enrollments: Vec<headless_lms_models::exams::ExamEnrollment> =
83 models::exams::get_ongoing_exam_enrollments(&mut tx).await?;
84 let exams = models::exams::get_exams(&mut tx).await?;
85
86 let mut needs_ended_at_date: HashMap<Uuid, Vec<Uuid>> = HashMap::new();
87 for enrollment in ongoing_exam_enrollments {
88 let exam = exams
89 .get(&enrollment.exam_id)
90 .ok_or_else(|| ModelError::new(ModelErrorType::Generic, "Exam not found", None))?;
91
92 if Utc::now() > enrollment.started_at + Duration::minutes(exam.time_minutes.into()) {
94 needs_ended_at_date
95 .entry(exam.id)
96 .or_default()
97 .push(enrollment.user_id);
98 }
99 }
100
101 for entry in needs_ended_at_date.into_iter() {
102 let exam_id = entry.0;
103 let user_ids = entry.1;
104 match models::exams::update_exam_ended_at_for_users_with_exam_id(
105 &mut tx,
106 exam_id,
107 &user_ids,
108 Utc::now(),
109 )
110 .await
111 {
112 Ok(_) => success += user_ids.len(),
113 Err(err) => {
114 failed += user_ids.len();
115 tracing::error!(
116 "Failed to end exam enrolments for exam {}: {:#?}",
117 exam_id,
118 err
119 );
120 }
121 }
122 }
123
124 tracing::info!(
125 "Exam enrollments processed. Succeeded: {}, failed: {}.",
126 success,
127 failed
128 );
129 tx.commit().await?;
130
131 Ok(())
132}