headless_lms_server/domain/csv_export/
points.rs1use anyhow::{Context, Result};
2use bytes::Bytes;
3
4use futures::TryStreamExt;
5use headless_lms_models::{chapters, course_instances, exercises, user_exercise_states, users};
6
7use async_trait::async_trait;
8
9use crate::domain::csv_export::CsvWriter;
10
11use sqlx::PgConnection;
12use std::{
13 collections::{HashMap, HashSet},
14 io::Write,
15};
16use tokio::sync::mpsc::UnboundedSender;
17
18use uuid::Uuid;
19
20use crate::prelude::*;
21
22use super::{
23 super::authorization::{AuthorizationToken, AuthorizedResponse},
24 CSVExportAdapter, CsvExportDataLoader,
25};
26
27pub struct PointExportOperation {
28 pub course_instance_id: Uuid,
29}
30
31#[async_trait]
32impl CsvExportDataLoader for PointExportOperation {
33 async fn load_data(
34 &self,
35 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
36 conn: &mut PgConnection,
37 token: AuthorizationToken,
38 ) -> anyhow::Result<CSVExportAdapter> {
39 export_course_instance_points(
40 &mut *conn,
41 self.course_instance_id,
42 CSVExportAdapter {
43 sender,
44 authorization_token: token,
45 },
46 )
47 .await
48 }
49}
50
51pub async fn export_course_instance_points<W>(
53 conn: &mut PgConnection,
54 course_instance_id: Uuid,
55 writer: W,
56) -> Result<W>
57where
58 W: Write + Send + 'static,
59{
60 let csv_fields_before_headers = 1;
61
62 let course_instance = course_instances::get_course_instance(conn, course_instance_id).await?;
63 let mut chapters = chapters::course_chapters(conn, course_instance.course_id).await?;
64 chapters.sort_by_key(|c| c.chapter_number);
65 let mut chapter_number_to_header_idx = HashMap::new();
66 for (idx, chapter) in chapters.iter().enumerate() {
67 chapter_number_to_header_idx
68 .insert(chapter.chapter_number, csv_fields_before_headers + idx);
69 }
70
71 let header_count = csv_fields_before_headers + chapters.len();
72 let headers = IntoIterator::into_iter(["user_id".to_string()])
74 .chain(chapters.into_iter().map(|c| c.chapter_number.to_string()));
75
76 let enrolled_user_ids: HashSet<Uuid> =
77 users::get_users_by_course_instance_enrollment(conn, course_instance_id)
78 .await?
79 .into_iter()
80 .map(|user| user.id)
81 .collect();
82 let mut stream = user_exercise_states::stream_course_points(conn, course_instance.course_id);
83
84 let writer = CsvWriter::new_with_initialized_headers(writer, headers).await?;
85 while let Some(next) = stream.try_next().await? {
86 if !enrolled_user_ids.contains(&next.user_id) {
87 continue;
88 }
89 let mut csv_row = vec!["0".to_string(); header_count];
90 csv_row[0] = next.user_id.to_string();
91 for points in next.points_for_each_chapter {
92 let idx = chapter_number_to_header_idx
93 .get(&points.chapter_number)
94 .with_context(|| format!("Unexpected chapter number {}", points.chapter_number))?;
95 let item = csv_row
96 .get_mut(*idx)
97 .with_context(|| format!("Invalid chapter number {}", idx))?;
98 *item = points.points_for_chapter.to_string();
99 }
100 writer.write_record(csv_row);
101 }
102 let writer = writer.finish().await?;
103 Ok(writer)
104}
105
106pub struct ExamPointExportOperation {
107 pub exam_id: Uuid,
108}
109
110#[async_trait]
112impl CsvExportDataLoader for ExamPointExportOperation {
113 async fn load_data(
114 &self,
115 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
116 conn: &mut PgConnection,
117 token: AuthorizationToken,
118 ) -> anyhow::Result<CSVExportAdapter> {
119 export_exam_points(
120 &mut *conn,
121 self.exam_id,
122 CSVExportAdapter {
123 sender,
124 authorization_token: token,
125 },
126 )
127 .await
128 }
129}
130
131pub async fn export_exam_points<W>(conn: &mut PgConnection, exam_id: Uuid, writer: W) -> Result<W>
133where
134 W: Write + Send + 'static,
135{
136 let csv_fields_before_headers = 2;
137
138 let mut exercises = exercises::get_exercises_by_exam_id(conn, exam_id).await?;
139 exercises.sort_by_key(|a| a.order_number);
141
142 let mut exercise_id_to_header_idx = HashMap::new();
143 for (idx, exercise) in exercises.iter().enumerate() {
144 exercise_id_to_header_idx.insert(exercise.id, csv_fields_before_headers + idx);
145 }
146
147 let header_count = csv_fields_before_headers + exercises.len();
148 let headers = IntoIterator::into_iter(["user_id".to_string(), "email".to_string()]).chain(
150 exercises
151 .into_iter()
152 .map(|e| format!("{}: {}", e.order_number, e.name)),
153 );
154
155 let mut stream = user_exercise_states::stream_exam_points(conn, exam_id);
156
157 let writer = CsvWriter::new_with_initialized_headers(writer, headers).await?;
158 while let Some(next) = stream.try_next().await? {
159 let mut csv_row = vec!["0".to_string(); header_count];
160 csv_row[0] = next.user_id.to_string();
161 csv_row[1] = next.email;
162 for points in next.points_for_exercise {
163 let idx = exercise_id_to_header_idx
164 .get(&points.exercise_id)
165 .with_context(|| format!("Unexpected exercise id {}", points.exercise_id))?;
166 let item = csv_row
167 .get_mut(*idx)
168 .with_context(|| format!("Invalid index {}", idx))?;
169 *item = points.score_given.to_string();
170 }
171 writer.write_record(csv_row);
172 }
173 let writer = writer.finish().await?;
174 Ok(writer)
175}