headless_lms_server/domain/csv_export/
points.rs1use anyhow::{Context, Result};
2use bytes::Bytes;
3
4use futures::TryStreamExt;
5use headless_lms_models::{chapters, course_instances, exercises, user_exercise_states};
6
7use async_trait::async_trait;
8
9use crate::domain::csv_export::CsvWriter;
10
11use sqlx::PgConnection;
12use std::{collections::HashMap, io::Write};
13use tokio::sync::mpsc::UnboundedSender;
14
15use uuid::Uuid;
16
17use crate::prelude::*;
18
19use super::{
20 super::authorization::{AuthorizationToken, AuthorizedResponse},
21 CSVExportAdapter, CsvExportDataLoader,
22};
23
24pub struct PointExportOperation {
25 pub course_instance_id: Uuid,
26}
27
28#[async_trait]
29impl CsvExportDataLoader for PointExportOperation {
30 async fn load_data(
31 &self,
32 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
33 conn: &mut PgConnection,
34 token: AuthorizationToken,
35 ) -> anyhow::Result<CSVExportAdapter> {
36 export_course_instance_points(
37 &mut *conn,
38 self.course_instance_id,
39 CSVExportAdapter {
40 sender,
41 authorization_token: token,
42 },
43 )
44 .await
45 }
46}
47
48pub async fn export_course_instance_points<W>(
50 conn: &mut PgConnection,
51 course_instance_id: Uuid,
52 writer: W,
53) -> Result<W>
54where
55 W: Write + Send + 'static,
56{
57 let csv_fields_before_headers = 1;
58
59 let course_instance = course_instances::get_course_instance(conn, course_instance_id).await?;
60 let mut chapters = chapters::course_chapters(conn, course_instance.course_id).await?;
61 chapters.sort_by_key(|c| c.chapter_number);
62 let mut chapter_number_to_header_idx = HashMap::new();
63 for (idx, chapter) in chapters.iter().enumerate() {
64 chapter_number_to_header_idx
65 .insert(chapter.chapter_number, csv_fields_before_headers + idx);
66 }
67
68 let header_count = csv_fields_before_headers + chapters.len();
69 let headers = IntoIterator::into_iter(["user_id".to_string()])
71 .chain(chapters.into_iter().map(|c| c.chapter_number.to_string()));
72
73 let mut stream = user_exercise_states::stream_course_instance_points(conn, course_instance_id);
74
75 let writer = CsvWriter::new_with_initialized_headers(writer, headers).await?;
76 while let Some(next) = stream.try_next().await? {
77 let mut csv_row = vec!["0".to_string(); header_count];
78 csv_row[0] = next.user_id.to_string();
79 for points in next.points_for_each_chapter {
80 let idx = chapter_number_to_header_idx
81 .get(&points.chapter_number)
82 .with_context(|| format!("Unexpected chapter number {}", points.chapter_number))?;
83 let item = csv_row
84 .get_mut(*idx)
85 .with_context(|| format!("Invalid chapter number {}", idx))?;
86 *item = points.points_for_chapter.to_string();
87 }
88 writer.write_record(csv_row);
89 }
90 let writer = writer.finish().await?;
91 Ok(writer)
92}
93
94pub struct ExamPointExportOperation {
95 pub exam_id: Uuid,
96}
97
98#[async_trait]
100impl CsvExportDataLoader for ExamPointExportOperation {
101 async fn load_data(
102 &self,
103 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
104 conn: &mut PgConnection,
105 token: AuthorizationToken,
106 ) -> anyhow::Result<CSVExportAdapter> {
107 export_exam_points(
108 &mut *conn,
109 self.exam_id,
110 CSVExportAdapter {
111 sender,
112 authorization_token: token,
113 },
114 )
115 .await
116 }
117}
118
119pub async fn export_exam_points<W>(conn: &mut PgConnection, exam_id: Uuid, writer: W) -> Result<W>
121where
122 W: Write + Send + 'static,
123{
124 let csv_fields_before_headers = 2;
125
126 let mut exercises = exercises::get_exercises_by_exam_id(conn, exam_id).await?;
127 exercises.sort_by(|a, b| a.order_number.cmp(&b.order_number));
129
130 let mut exercise_id_to_header_idx = HashMap::new();
131 for (idx, exercise) in exercises.iter().enumerate() {
132 exercise_id_to_header_idx.insert(exercise.id, csv_fields_before_headers + idx);
133 }
134
135 let header_count = csv_fields_before_headers + exercises.len();
136 let headers = IntoIterator::into_iter(["user_id".to_string(), "email".to_string()]).chain(
138 exercises
139 .into_iter()
140 .map(|e| format!("{}: {}", e.order_number, e.name)),
141 );
142
143 let mut stream = user_exercise_states::stream_exam_points(conn, exam_id);
144
145 let writer = CsvWriter::new_with_initialized_headers(writer, headers).await?;
146 while let Some(next) = stream.try_next().await? {
147 let mut csv_row = vec!["0".to_string(); header_count];
148 csv_row[0] = next.user_id.to_string();
149 csv_row[1] = next.email;
150 for points in next.points_for_exercise {
151 let idx = exercise_id_to_header_idx
152 .get(&points.exercise_id)
153 .with_context(|| format!("Unexpected exercise id {}", points.exercise_id))?;
154 let item = csv_row
155 .get_mut(*idx)
156 .with_context(|| format!("Invalid index {}", idx))?;
157 *item = points.score_given.to_string();
158 }
159 writer.write_record(csv_row);
160 }
161 let writer = writer.finish().await?;
162 Ok(writer)
163}