1pub mod code_giveaway_codes;
2pub mod course_instance_export;
3pub mod course_research_form_questions_answers_export;
4pub mod exercise_tasks_export;
5pub mod points;
6pub mod submissions;
7pub mod user_exercise_states_export;
8pub mod users_export;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use csv::Writer;
13use futures::{Stream, StreamExt, stream::FuturesUnordered};
14
15use async_trait::async_trait;
16
17use models::course_module_completions::CourseModuleCompletionWithRegistrationInfo;
18use serde::Serialize;
19use sqlx::PgConnection;
20use std::{
21 io,
22 io::Write,
23 sync::{Arc, Mutex},
24};
25use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::prelude::*;
29
30use super::authorization::{AuthorizationToken, AuthorizedResponse};
31struct CsvWriter<W: Write> {
33 csv_writer: Arc<Mutex<Writer<W>>>,
34 handles: FuturesUnordered<JoinHandle<Result<()>>>,
35}
36
37impl<W: Write + Send + 'static> CsvWriter<W> {
38 async fn new_with_initialized_headers<I, T>(writer: W, headers: I) -> Result<Self>
40 where
41 I: IntoIterator<Item = T> + Send + 'static,
42 T: AsRef<[u8]>,
43 {
44 let mut writer = csv::WriterBuilder::new()
45 .has_headers(false)
46 .from_writer(writer);
47
48 let writer = tokio::task::spawn_blocking(move || {
50 writer
51 .write_record(headers)
52 .context("Failed to write headers")?;
53 Result::<_, anyhow::Error>::Ok(writer)
54 })
55 .await??;
56
57 Ok(Self {
58 csv_writer: Arc::new(Mutex::new(writer)),
59 handles: FuturesUnordered::new(),
60 })
61 }
62
63 fn write_record<I, T>(&self, csv_row: I)
65 where
66 I: IntoIterator<Item = T> + Send + 'static,
67 T: AsRef<[u8]>,
68 {
69 let writer = self.csv_writer.clone();
70 let handle = tokio::task::spawn_blocking(move || {
71 writer
72 .lock()
73 .map_err(|_| anyhow::anyhow!("Failed to lock mutex"))?
74 .write_record(csv_row)
75 .context("Failed to serialize points")
76 });
77 self.handles.push(handle);
78 }
79
80 async fn finish(mut self) -> Result<W> {
83 while let Some(handle) = self.handles.next().await {
85 handle??;
86 }
87
88 let writer = tokio::task::spawn_blocking(move || {
89 let _ = &self;
90 Arc::try_unwrap(self.csv_writer)
91 .map_err(|_| anyhow::anyhow!("Failed to extract inner writer from arc"))?
92 .into_inner()
93 .map_err(|e| anyhow::anyhow!("Failed to extract inner writer from mutex: {}", e))?
94 .into_inner()
95 .map_err(|e| {
96 anyhow::anyhow!("Failed to extract inner writer from CSV writer: {}", e)
97 })
98 })
99 .await??;
100 Ok(writer)
101 }
102}
103
104fn course_module_completion_info_to_grade_string(
109 input: Option<&CourseModuleCompletionWithRegistrationInfo>,
110) -> String {
111 let grade_string = input.map(|info| {
112 if let Some(grade) = info.grade {
113 return grade.to_string();
114 }
115 if info.passed {
116 return "pass".to_string();
117 };
118 "fail".to_string()
119 });
120 if let Some(grade_string) = grade_string {
121 if let Some(info) = input
122 && !info.prerequisite_modules_completed
123 {
124 return format!("{grade_string}*");
125 }
126 return grade_string;
127 }
128 "-".to_string()
129}
130
131pub struct CSVExportAdapter {
132 pub sender: UnboundedSender<ControllerResult<Bytes>>,
133 pub authorization_token: AuthorizationToken,
134}
135impl Write for CSVExportAdapter {
136 fn flush(&mut self) -> std::io::Result<()> {
137 Ok(())
138 }
139
140 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
141 let bytes = Bytes::copy_from_slice(buf);
142 let token = self.authorization_token;
143 self.sender
144 .send(token.authorized_ok(bytes))
145 .map_err(|e| io::Error::other(e.to_string()))?;
146 Ok(buf.len())
147 }
148}
149
150pub fn make_authorized_streamable(
168 stream: UnboundedReceiverStream<Result<AuthorizedResponse<bytes::Bytes>, ControllerError>>,
169) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
170 stream.map(|item| item.map(|item2| item2.data))
171}
172
173pub fn serializable_sqlx_result_stream_to_json_stream(
177 stream: impl Stream<Item = sqlx::Result<impl Serialize>>,
178) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
179 let res_stream = stream.enumerate().map(|(n, item)| {
180 item.map(|item2| {
181 match serde_json::to_vec(&item2) {
182 Ok(mut v) => {
183 if n == 0 {
185 v.insert(0, b'[');
187 } else {
188 v.insert(0, b',');
190 }
191 Bytes::from(v)
192 }
193 Err(e) => {
194 error!("Failed to serialize item: {}", e);
197 Bytes::from(format!(
198 "Streaming error: Failed to serialize item. Details: {:?}",
199 e
200 ))
201 }
202 }
203 })
204 .map_err(|original_error| {
205 ControllerError::new(
206 ControllerErrorType::InternalServerError,
207 original_error.to_string(),
208 Some(original_error.into()),
209 )
210 })
211 });
212 res_stream.chain(tokio_stream::iter(vec![Ok(Bytes::from_static(b"]"))]))
214}
215
216#[async_trait]
217pub trait CsvExportDataLoader {
218 async fn load_data(
219 &self,
220 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
221 conn: &mut PgConnection,
222 token: AuthorizationToken,
223 ) -> anyhow::Result<CSVExportAdapter>;
224}
225
226pub async fn general_export(
227 pool: web::Data<PgPool>,
228 content_disposition: &str,
229 data_loader: impl CsvExportDataLoader + std::marker::Send + 'static,
230 token: AuthorizationToken,
231) -> ControllerResult<HttpResponse> {
232 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<ControllerResult<Bytes>>();
233 let mut handle_conn = pool.acquire().await?;
235 let _handle = tokio::spawn(async move {
236 let fut = data_loader.load_data(sender, &mut handle_conn, token);
237 let res = fut.await;
238 if let Err(err) = res {
239 tracing::error!("Failed to export: {}", err);
240 }
241 });
242
243 token.authorized_ok(
245 HttpResponse::Ok()
246 .append_header(("Content-Disposition", content_disposition))
247 .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
248 receiver,
249 ))),
250 )
251}
252
253#[cfg(test)]
254mod test {
255 use std::{collections::HashMap, io::Cursor};
256
257 use headless_lms_models::{
258 course_instance_enrollments, exercise_slides,
259 exercise_task_gradings::ExerciseTaskGradingResult,
260 exercise_tasks::{self, NewExerciseTask},
261 exercises::{self, GradingProgress},
262 library::grading::{
263 GradingPolicy, StudentExerciseSlideSubmission, StudentExerciseTaskSubmission,
264 },
265 user_exercise_states,
266 user_exercise_states::ExerciseWithUserState,
267 users,
268 };
269 use models::chapters::{self, NewChapter};
270 use serde_json::Value;
271
272 use super::*;
273 use crate::{
274 domain::{
275 csv_export::points::export_course_instance_points,
276 models_requests::{self, JwtKey},
277 },
278 test_helper::*,
279 };
280
281 #[actix_web::test]
282 async fn exports() {
283 insert_data!(:tx, :user, :org, :course, :instance, :course_module, :chapter, :page, :exercise, :slide, :task);
284
285 let u2 = users::insert(
286 tx.as_mut(),
287 PKeyPolicy::Generate,
288 "second@example.org",
289 None,
290 None,
291 )
292 .await
293 .unwrap();
294
295 course_instance_enrollments::insert(tx.as_mut(), user, course, instance.id)
296 .await
297 .unwrap();
298 course_instance_enrollments::insert(tx.as_mut(), u2, course, instance.id)
299 .await
300 .unwrap();
301
302 let c2 = chapters::insert(
303 tx.as_mut(),
304 PKeyPolicy::Generate,
305 &NewChapter {
306 name: "".to_string(),
307 color: Some("#065853".to_string()),
308 course_id: course,
309 chapter_number: 2,
310 front_page_id: None,
311 opens_at: None,
312 deadline: None,
313 course_module_id: Some(course_module.id),
314 },
315 )
316 .await
317 .unwrap();
318
319 let e2 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 0)
320 .await
321 .unwrap();
322 let s2 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e2, 0)
323 .await
324 .unwrap();
325 let et2 = exercise_tasks::insert(
326 tx.as_mut(),
327 PKeyPolicy::Generate,
328 NewExerciseTask {
329 exercise_slide_id: s2,
330 exercise_type: "".to_string(),
331 assignment: vec![],
332 public_spec: Some(Value::Null),
333 private_spec: Some(Value::Null),
334 model_solution_spec: Some(Value::Null),
335 order_number: 1,
336 },
337 )
338 .await
339 .unwrap();
340
341 let e3 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 1)
342 .await
343 .unwrap();
344 let s3 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e3, 0)
345 .await
346 .unwrap();
347 let et3 = exercise_tasks::insert(
348 tx.as_mut(),
349 PKeyPolicy::Generate,
350 NewExerciseTask {
351 exercise_slide_id: s3,
352 exercise_type: "".to_string(),
353 assignment: vec![],
354 public_spec: Some(Value::Null),
355 private_spec: Some(Value::Null),
356 model_solution_spec: Some(Value::Null),
357 order_number: 2,
358 },
359 )
360 .await
361 .unwrap();
362 submit_and_grade(tx.as_mut(), exercise, slide, task, user, course, 12.34).await;
363 submit_and_grade(tx.as_mut(), e2, s2, et2, user, course, 23.45).await;
364 submit_and_grade(tx.as_mut(), e2, s2, et2, u2, course, 34.56).await;
365 submit_and_grade(tx.as_mut(), e3, s3, et3, u2, course, 45.67).await;
366
367 let buf = vec![];
368 let buf = export_course_instance_points(tx.as_mut(), instance.id, buf)
369 .await
370 .unwrap();
371 let buf = Cursor::new(buf);
372
373 let mut reader = csv::Reader::from_reader(buf);
374 let mut count = 0;
375 for record in reader.records() {
376 count += 1;
377 let record = record.unwrap();
378 println!("{}", record.as_slice());
379 let user_id = Uuid::parse_str(&record[0]).unwrap();
380 let first = record[1].parse::<f32>().unwrap();
381 let second = record[2].parse::<f32>().unwrap();
382 if user_id == user {
383 assert!((first - 0.1234).abs() < 0.1 && (second - 0.2345).abs() < 0.1);
384 } else if user_id == u2 {
385 assert!((first - 0.0).abs() < 0.1 && (second - 0.8023).abs() < 0.1);
386 } else {
387 panic!("unexpected user id");
388 }
389 }
390 assert_eq!(count, 2)
391 }
392
393 async fn submit_and_grade(
394 tx: &mut PgConnection,
395 ex: Uuid,
396 ex_slide: Uuid,
397 et: Uuid,
398 u: Uuid,
399 course_id: Uuid,
400 score_given: f32,
401 ) {
402 let exercise = exercises::get_by_id(tx, ex).await.unwrap();
403 user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(course_id), None)
404 .await
405 .unwrap();
406 user_exercise_states::upsert_selected_exercise_slide_id(
407 tx,
408 u,
409 ex,
410 Some(course_id),
411 None,
412 Some(ex_slide),
413 )
414 .await
415 .unwrap();
416 let user_exercise_state = user_exercise_states::get_or_create_user_exercise_state(
417 tx,
418 u,
419 ex,
420 Some(course_id),
421 None,
422 )
423 .await
424 .unwrap();
425 let mut exercise_with_user_state =
426 ExerciseWithUserState::new(exercise, user_exercise_state).unwrap();
427 let jwt_key = Arc::new(JwtKey::test_key());
428 headless_lms_models::library::grading::grade_user_submission(
429 tx,
430 &mut exercise_with_user_state,
431 &StudentExerciseSlideSubmission {
432 exercise_slide_id: ex_slide,
433 exercise_task_submissions: vec![StudentExerciseTaskSubmission {
434 exercise_task_id: et,
435 data_json: Value::Null,
436 }],
437 },
438 GradingPolicy::Fixed(HashMap::from([(
439 et,
440 ExerciseTaskGradingResult {
441 feedback_json: None,
442 feedback_text: None,
443 grading_progress: GradingProgress::FullyGraded,
444 score_given,
445 score_maximum: 100,
446 set_user_variables: Some(HashMap::new()),
447 },
448 )])),
449 models_requests::fetch_service_info,
450 models_requests::make_grading_request_sender(jwt_key),
451 )
452 .await
453 .unwrap();
454 }
455}