1pub mod code_giveaway_codes;
2pub mod course_instance_export;
3pub mod course_research_form_questions_answers_export;
4pub mod exercise_tasks_export;
5pub mod points;
6pub mod submissions;
7pub mod user_exericse_states_export;
8pub mod users_export;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use csv::Writer;
13use futures::{Stream, StreamExt, stream::FuturesUnordered};
14
15use async_trait::async_trait;
16
17use models::course_module_completions::CourseModuleCompletionWithRegistrationInfo;
18use serde::Serialize;
19use sqlx::PgConnection;
20use std::{
21 io,
22 io::Write,
23 sync::{Arc, Mutex},
24};
25use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::prelude::*;
29
30use super::authorization::{AuthorizationToken, AuthorizedResponse};
31struct CsvWriter<W: Write> {
33 csv_writer: Arc<Mutex<Writer<W>>>,
34 handles: FuturesUnordered<JoinHandle<Result<()>>>,
35}
36
37impl<W: Write + Send + 'static> CsvWriter<W> {
38 async fn new_with_initialized_headers<I, T>(writer: W, headers: I) -> Result<Self>
40 where
41 I: IntoIterator<Item = T> + Send + 'static,
42 T: AsRef<[u8]>,
43 {
44 let mut writer = csv::WriterBuilder::new()
45 .has_headers(false)
46 .from_writer(writer);
47
48 let writer = tokio::task::spawn_blocking(move || {
50 writer
51 .write_record(headers)
52 .context("Failed to write headers")?;
53 Result::<_, anyhow::Error>::Ok(writer)
54 })
55 .await??;
56
57 Ok(Self {
58 csv_writer: Arc::new(Mutex::new(writer)),
59 handles: FuturesUnordered::new(),
60 })
61 }
62
63 fn write_record<I, T>(&self, csv_row: I)
65 where
66 I: IntoIterator<Item = T> + Send + 'static,
67 T: AsRef<[u8]>,
68 {
69 let writer = self.csv_writer.clone();
70 let handle = tokio::task::spawn_blocking(move || {
71 writer
72 .lock()
73 .map_err(|_| anyhow::anyhow!("Failed to lock mutex"))?
74 .write_record(csv_row)
75 .context("Failed to serialize points")
76 });
77 self.handles.push(handle);
78 }
79
80 async fn finish(mut self) -> Result<W> {
83 while let Some(handle) = self.handles.next().await {
85 handle??;
86 }
87
88 let writer = tokio::task::spawn_blocking(move || {
89 let _ = &self;
90 Arc::try_unwrap(self.csv_writer)
91 .map_err(|_| anyhow::anyhow!("Failed to extract inner writer from arc"))?
92 .into_inner()
93 .map_err(|e| {
94 anyhow::anyhow!(
95 "Failed to extract inner writer from mutex: {}",
96 e.to_string()
97 )
98 })?
99 .into_inner()
100 .map_err(|e| {
101 anyhow::anyhow!(
102 "Failed to extract inner writer from CSV writer: {}",
103 e.to_string()
104 )
105 })
106 })
107 .await??;
108 Ok(writer)
109 }
110}
111
112fn course_module_completion_info_to_grade_string(
117 input: Option<&CourseModuleCompletionWithRegistrationInfo>,
118) -> String {
119 let grade_string = input.map(|info| {
120 if let Some(grade) = info.grade {
121 return grade.to_string();
122 }
123 if info.passed {
124 return "pass".to_string();
125 };
126 "fail".to_string()
127 });
128 if let Some(grade_string) = grade_string {
129 if let Some(info) = input {
130 if !info.prerequisite_modules_completed {
131 return format!("{grade_string}*");
132 }
133 }
134 return grade_string;
135 }
136 "-".to_string()
137}
138
139pub struct CSVExportAdapter {
140 pub sender: UnboundedSender<ControllerResult<Bytes>>,
141 pub authorization_token: AuthorizationToken,
142}
143impl Write for CSVExportAdapter {
144 fn flush(&mut self) -> std::io::Result<()> {
145 Ok(())
146 }
147
148 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
149 let bytes = Bytes::copy_from_slice(buf);
150 let token = self.authorization_token;
151 self.sender
152 .send(token.authorized_ok(bytes))
153 .map_err(|e| io::Error::other(e.to_string()))?;
154 Ok(buf.len())
155 }
156}
157
158pub fn make_authorized_streamable(
176 stream: UnboundedReceiverStream<Result<AuthorizedResponse<bytes::Bytes>, ControllerError>>,
177) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
178 stream.map(|item| item.map(|item2| item2.data))
179}
180
181pub fn serializable_sqlx_result_stream_to_json_stream(
185 stream: impl Stream<Item = sqlx::Result<impl Serialize>>,
186) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
187 let res_stream = stream.enumerate().map(|(n, item)| {
188 item.map(|item2| {
189 match serde_json::to_vec(&item2) {
190 Ok(mut v) => {
191 if n == 0 {
193 v.insert(0, b'[');
195 } else {
196 v.insert(0, b',');
198 }
199 Bytes::from(v)
200 }
201 Err(e) => {
202 error!("Failed to serialize item: {}", e);
205 Bytes::from(format!(
206 "Streaming error: Failed to serialize item. Details: {:?}",
207 e
208 ))
209 }
210 }
211 })
212 .map_err(|original_error| {
213 ControllerError::new(
214 ControllerErrorType::InternalServerError,
215 original_error.to_string(),
216 Some(original_error.into()),
217 )
218 })
219 });
220 res_stream.chain(tokio_stream::iter(vec![Ok(Bytes::from_static(b"]"))]))
222}
223
224#[async_trait]
225pub trait CsvExportDataLoader {
226 async fn load_data(
227 &self,
228 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
229 conn: &mut PgConnection,
230 token: AuthorizationToken,
231 ) -> anyhow::Result<CSVExportAdapter>;
232}
233
234pub async fn general_export(
235 pool: web::Data<PgPool>,
236 content_disposition: &str,
237 data_loader: impl CsvExportDataLoader + std::marker::Send + 'static,
238 token: AuthorizationToken,
239) -> ControllerResult<HttpResponse> {
240 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<ControllerResult<Bytes>>();
241 let mut handle_conn = pool.acquire().await?;
243 let _handle = tokio::spawn(async move {
244 let fut = data_loader.load_data(sender, &mut handle_conn, token);
245 let res = fut.await;
246 if let Err(err) = res {
247 tracing::error!("Failed to export: {}", err);
248 }
249 });
250
251 token.authorized_ok(
253 HttpResponse::Ok()
254 .append_header(("Content-Disposition", content_disposition))
255 .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
256 receiver,
257 ))),
258 )
259}
260
261#[cfg(test)]
262mod test {
263 use std::{collections::HashMap, io::Cursor};
264
265 use headless_lms_models::{
266 exercise_slides,
267 exercise_task_gradings::ExerciseTaskGradingResult,
268 exercise_tasks::{self, NewExerciseTask},
269 exercises::{self, GradingProgress},
270 library::grading::{
271 GradingPolicy, StudentExerciseSlideSubmission, StudentExerciseTaskSubmission,
272 },
273 user_exercise_states,
274 user_exercise_states::ExerciseWithUserState,
275 users,
276 };
277 use models::chapters::{self, NewChapter};
278 use serde_json::Value;
279
280 use super::*;
281 use crate::{
282 domain::{
283 csv_export::points::export_course_instance_points,
284 models_requests::{self, JwtKey},
285 },
286 test_helper::*,
287 };
288
289 #[actix_web::test]
290 async fn exports() {
291 insert_data!(:tx, :user, :org, :course, :instance, :course_module, :chapter, :page, :exercise, :slide, :task);
292
293 let u2 = users::insert(
294 tx.as_mut(),
295 PKeyPolicy::Generate,
296 "second@example.org",
297 None,
298 None,
299 )
300 .await
301 .unwrap();
302 let c2 = chapters::insert(
303 tx.as_mut(),
304 PKeyPolicy::Generate,
305 &NewChapter {
306 name: "".to_string(),
307 color: Some("#065853".to_string()),
308 course_id: course,
309 chapter_number: 2,
310 front_page_id: None,
311 opens_at: None,
312 deadline: None,
313 course_module_id: Some(course_module.id),
314 },
315 )
316 .await
317 .unwrap();
318
319 let e2 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 0)
320 .await
321 .unwrap();
322 let s2 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e2, 0)
323 .await
324 .unwrap();
325 let et2 = exercise_tasks::insert(
326 tx.as_mut(),
327 PKeyPolicy::Generate,
328 NewExerciseTask {
329 exercise_slide_id: s2,
330 exercise_type: "".to_string(),
331 assignment: vec![],
332 public_spec: Some(Value::Null),
333 private_spec: Some(Value::Null),
334 model_solution_spec: Some(Value::Null),
335 order_number: 1,
336 },
337 )
338 .await
339 .unwrap();
340
341 let e3 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 1)
342 .await
343 .unwrap();
344 let s3 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e3, 0)
345 .await
346 .unwrap();
347 let et3 = exercise_tasks::insert(
348 tx.as_mut(),
349 PKeyPolicy::Generate,
350 NewExerciseTask {
351 exercise_slide_id: s3,
352 exercise_type: "".to_string(),
353 assignment: vec![],
354 public_spec: Some(Value::Null),
355 private_spec: Some(Value::Null),
356 model_solution_spec: Some(Value::Null),
357 order_number: 2,
358 },
359 )
360 .await
361 .unwrap();
362 submit_and_grade(tx.as_mut(), exercise, slide, task, user, instance.id, 12.34).await;
363 submit_and_grade(tx.as_mut(), e2, s2, et2, user, instance.id, 23.45).await;
364 submit_and_grade(tx.as_mut(), e2, s2, et2, u2, instance.id, 34.56).await;
365 submit_and_grade(tx.as_mut(), e3, s3, et3, u2, instance.id, 45.67).await;
366
367 let buf = vec![];
368 let buf = export_course_instance_points(tx.as_mut(), instance.id, buf)
369 .await
370 .unwrap();
371 let buf = Cursor::new(buf);
372
373 let mut reader = csv::Reader::from_reader(buf);
374 let mut count = 0;
375 for record in reader.records() {
376 count += 1;
377 let record = record.unwrap();
378 println!("{}", record.as_slice());
379 let user_id = Uuid::parse_str(&record[0]).unwrap();
380 let first = record[1].parse::<f32>().unwrap();
381 let second = record[2].parse::<f32>().unwrap();
382 if user_id == user {
383 assert!((first - 0.1234).abs() < 0.1 && (second - 0.2345).abs() < 0.1);
384 } else if user_id == u2 {
385 assert!((first - 0.0).abs() < 0.1 && (second - 0.8023).abs() < 0.1);
386 } else {
387 panic!("unexpected user id");
388 }
389 }
390 assert_eq!(count, 2)
391 }
392
393 async fn submit_and_grade(
394 tx: &mut PgConnection,
395 ex: Uuid,
396 ex_slide: Uuid,
397 et: Uuid,
398 u: Uuid,
399 ci: Uuid,
400 score_given: f32,
401 ) {
402 let exercise = exercises::get_by_id(tx, ex).await.unwrap();
403 user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(ci), None)
404 .await
405 .unwrap();
406 user_exercise_states::upsert_selected_exercise_slide_id(
407 tx,
408 u,
409 ex,
410 Some(ci),
411 None,
412 Some(ex_slide),
413 )
414 .await
415 .unwrap();
416 let user_exercise_state =
417 user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(ci), None)
418 .await
419 .unwrap();
420 let mut exercise_with_user_state =
421 ExerciseWithUserState::new(exercise, user_exercise_state).unwrap();
422 let jwt_key = Arc::new(JwtKey::test_key());
423 headless_lms_models::library::grading::grade_user_submission(
424 tx,
425 &mut exercise_with_user_state,
426 &StudentExerciseSlideSubmission {
427 exercise_slide_id: ex_slide,
428 exercise_task_submissions: vec![StudentExerciseTaskSubmission {
429 exercise_task_id: et,
430 data_json: Value::Null,
431 }],
432 },
433 GradingPolicy::Fixed(HashMap::from([(
434 et,
435 ExerciseTaskGradingResult {
436 feedback_json: None,
437 feedback_text: None,
438 grading_progress: GradingProgress::FullyGraded,
439 score_given,
440 score_maximum: 100,
441 set_user_variables: Some(HashMap::new()),
442 },
443 )])),
444 models_requests::fetch_service_info,
445 models_requests::make_grading_request_sender(jwt_key),
446 )
447 .await
448 .unwrap();
449 }
450}