1pub mod code_giveaway_codes;
2pub mod course_instance_export;
3pub mod course_research_form_questions_answers_export;
4pub mod exercise_tasks_export;
5pub mod points;
6pub mod submissions;
7pub mod user_exercise_states_export;
8pub mod users_export;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use csv::Writer;
13use futures::{Stream, StreamExt, stream::FuturesUnordered};
14
15use async_trait::async_trait;
16
17use models::course_module_completions::CourseModuleCompletionWithRegistrationInfo;
18use serde::Serialize;
19use sqlx::PgConnection;
20use std::{
21 io,
22 io::Write,
23 sync::{Arc, Mutex},
24};
25use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::prelude::*;
29
30use super::authorization::{AuthorizationToken, AuthorizedResponse};
31struct CsvWriter<W: Write> {
33 csv_writer: Arc<Mutex<Writer<W>>>,
34 handles: FuturesUnordered<JoinHandle<Result<()>>>,
35}
36
37impl<W: Write + Send + 'static> CsvWriter<W> {
38 async fn new_with_initialized_headers<I, T>(writer: W, headers: I) -> Result<Self>
40 where
41 I: IntoIterator<Item = T> + Send + 'static,
42 T: AsRef<[u8]>,
43 {
44 let mut writer = csv::WriterBuilder::new()
45 .has_headers(false)
46 .from_writer(writer);
47
48 let writer = tokio::task::spawn_blocking(move || {
50 writer
51 .write_record(headers)
52 .context("Failed to write headers")?;
53 Result::<_, anyhow::Error>::Ok(writer)
54 })
55 .await??;
56
57 Ok(Self {
58 csv_writer: Arc::new(Mutex::new(writer)),
59 handles: FuturesUnordered::new(),
60 })
61 }
62
63 fn write_record<I, T>(&self, csv_row: I)
65 where
66 I: IntoIterator<Item = T> + Send + 'static,
67 T: AsRef<[u8]>,
68 {
69 let writer = self.csv_writer.clone();
70 let handle = tokio::task::spawn_blocking(move || {
71 writer
72 .lock()
73 .map_err(|_| anyhow::anyhow!("Failed to lock mutex"))?
74 .write_record(csv_row)
75 .context("Failed to serialize points")
76 });
77 self.handles.push(handle);
78 }
79
80 async fn finish(mut self) -> Result<W> {
83 while let Some(handle) = self.handles.next().await {
85 handle??;
86 }
87
88 let writer = tokio::task::spawn_blocking(move || {
89 let _ = &self;
90 Arc::try_unwrap(self.csv_writer)
91 .map_err(|_| anyhow::anyhow!("Failed to extract inner writer from arc"))?
92 .into_inner()
93 .map_err(|e| anyhow::anyhow!("Failed to extract inner writer from mutex: {}", e))?
94 .into_inner()
95 .map_err(|e| {
96 anyhow::anyhow!("Failed to extract inner writer from CSV writer: {}", e)
97 })
98 })
99 .await??;
100 Ok(writer)
101 }
102}
103
104fn course_module_completion_info_to_grade_string(
109 input: Option<&CourseModuleCompletionWithRegistrationInfo>,
110) -> String {
111 let grade_string = input.map(|info| {
112 if let Some(grade) = info.grade {
113 return grade.to_string();
114 }
115 if info.passed {
116 return "pass".to_string();
117 };
118 "fail".to_string()
119 });
120 if let Some(grade_string) = grade_string {
121 if let Some(info) = input {
122 if !info.prerequisite_modules_completed {
123 return format!("{grade_string}*");
124 }
125 }
126 return grade_string;
127 }
128 "-".to_string()
129}
130
131pub struct CSVExportAdapter {
132 pub sender: UnboundedSender<ControllerResult<Bytes>>,
133 pub authorization_token: AuthorizationToken,
134}
135impl Write for CSVExportAdapter {
136 fn flush(&mut self) -> std::io::Result<()> {
137 Ok(())
138 }
139
140 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
141 let bytes = Bytes::copy_from_slice(buf);
142 let token = self.authorization_token;
143 self.sender
144 .send(token.authorized_ok(bytes))
145 .map_err(|e| io::Error::other(e.to_string()))?;
146 Ok(buf.len())
147 }
148}
149
150pub fn make_authorized_streamable(
168 stream: UnboundedReceiverStream<Result<AuthorizedResponse<bytes::Bytes>, ControllerError>>,
169) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
170 stream.map(|item| item.map(|item2| item2.data))
171}
172
173pub fn serializable_sqlx_result_stream_to_json_stream(
177 stream: impl Stream<Item = sqlx::Result<impl Serialize>>,
178) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
179 let res_stream = stream.enumerate().map(|(n, item)| {
180 item.map(|item2| {
181 match serde_json::to_vec(&item2) {
182 Ok(mut v) => {
183 if n == 0 {
185 v.insert(0, b'[');
187 } else {
188 v.insert(0, b',');
190 }
191 Bytes::from(v)
192 }
193 Err(e) => {
194 error!("Failed to serialize item: {}", e);
197 Bytes::from(format!(
198 "Streaming error: Failed to serialize item. Details: {:?}",
199 e
200 ))
201 }
202 }
203 })
204 .map_err(|original_error| {
205 ControllerError::new(
206 ControllerErrorType::InternalServerError,
207 original_error.to_string(),
208 Some(original_error.into()),
209 )
210 })
211 });
212 res_stream.chain(tokio_stream::iter(vec![Ok(Bytes::from_static(b"]"))]))
214}
215
216#[async_trait]
217pub trait CsvExportDataLoader {
218 async fn load_data(
219 &self,
220 sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
221 conn: &mut PgConnection,
222 token: AuthorizationToken,
223 ) -> anyhow::Result<CSVExportAdapter>;
224}
225
226pub async fn general_export(
227 pool: web::Data<PgPool>,
228 content_disposition: &str,
229 data_loader: impl CsvExportDataLoader + std::marker::Send + 'static,
230 token: AuthorizationToken,
231) -> ControllerResult<HttpResponse> {
232 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<ControllerResult<Bytes>>();
233 let mut handle_conn = pool.acquire().await?;
235 let _handle = tokio::spawn(async move {
236 let fut = data_loader.load_data(sender, &mut handle_conn, token);
237 let res = fut.await;
238 if let Err(err) = res {
239 tracing::error!("Failed to export: {}", err);
240 }
241 });
242
243 token.authorized_ok(
245 HttpResponse::Ok()
246 .append_header(("Content-Disposition", content_disposition))
247 .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
248 receiver,
249 ))),
250 )
251}
252
253#[cfg(test)]
254mod test {
255 use std::{collections::HashMap, io::Cursor};
256
257 use headless_lms_models::{
258 exercise_slides,
259 exercise_task_gradings::ExerciseTaskGradingResult,
260 exercise_tasks::{self, NewExerciseTask},
261 exercises::{self, GradingProgress},
262 library::grading::{
263 GradingPolicy, StudentExerciseSlideSubmission, StudentExerciseTaskSubmission,
264 },
265 user_exercise_states,
266 user_exercise_states::ExerciseWithUserState,
267 users,
268 };
269 use models::chapters::{self, NewChapter};
270 use serde_json::Value;
271
272 use super::*;
273 use crate::{
274 domain::{
275 csv_export::points::export_course_instance_points,
276 models_requests::{self, JwtKey},
277 },
278 test_helper::*,
279 };
280
281 #[actix_web::test]
282 async fn exports() {
283 insert_data!(:tx, :user, :org, :course, :instance, :course_module, :chapter, :page, :exercise, :slide, :task);
284
285 let u2 = users::insert(
286 tx.as_mut(),
287 PKeyPolicy::Generate,
288 "second@example.org",
289 None,
290 None,
291 )
292 .await
293 .unwrap();
294 let c2 = chapters::insert(
295 tx.as_mut(),
296 PKeyPolicy::Generate,
297 &NewChapter {
298 name: "".to_string(),
299 color: Some("#065853".to_string()),
300 course_id: course,
301 chapter_number: 2,
302 front_page_id: None,
303 opens_at: None,
304 deadline: None,
305 course_module_id: Some(course_module.id),
306 },
307 )
308 .await
309 .unwrap();
310
311 let e2 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 0)
312 .await
313 .unwrap();
314 let s2 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e2, 0)
315 .await
316 .unwrap();
317 let et2 = exercise_tasks::insert(
318 tx.as_mut(),
319 PKeyPolicy::Generate,
320 NewExerciseTask {
321 exercise_slide_id: s2,
322 exercise_type: "".to_string(),
323 assignment: vec![],
324 public_spec: Some(Value::Null),
325 private_spec: Some(Value::Null),
326 model_solution_spec: Some(Value::Null),
327 order_number: 1,
328 },
329 )
330 .await
331 .unwrap();
332
333 let e3 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 1)
334 .await
335 .unwrap();
336 let s3 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e3, 0)
337 .await
338 .unwrap();
339 let et3 = exercise_tasks::insert(
340 tx.as_mut(),
341 PKeyPolicy::Generate,
342 NewExerciseTask {
343 exercise_slide_id: s3,
344 exercise_type: "".to_string(),
345 assignment: vec![],
346 public_spec: Some(Value::Null),
347 private_spec: Some(Value::Null),
348 model_solution_spec: Some(Value::Null),
349 order_number: 2,
350 },
351 )
352 .await
353 .unwrap();
354 submit_and_grade(tx.as_mut(), exercise, slide, task, user, course, 12.34).await;
355 submit_and_grade(tx.as_mut(), e2, s2, et2, user, course, 23.45).await;
356 submit_and_grade(tx.as_mut(), e2, s2, et2, u2, course, 34.56).await;
357 submit_and_grade(tx.as_mut(), e3, s3, et3, u2, course, 45.67).await;
358
359 let buf = vec![];
360 let buf = export_course_instance_points(tx.as_mut(), instance.id, buf)
361 .await
362 .unwrap();
363 let buf = Cursor::new(buf);
364
365 let mut reader = csv::Reader::from_reader(buf);
366 let mut count = 0;
367 for record in reader.records() {
368 count += 1;
369 let record = record.unwrap();
370 println!("{}", record.as_slice());
371 let user_id = Uuid::parse_str(&record[0]).unwrap();
372 let first = record[1].parse::<f32>().unwrap();
373 let second = record[2].parse::<f32>().unwrap();
374 if user_id == user {
375 assert!((first - 0.1234).abs() < 0.1 && (second - 0.2345).abs() < 0.1);
376 } else if user_id == u2 {
377 assert!((first - 0.0).abs() < 0.1 && (second - 0.8023).abs() < 0.1);
378 } else {
379 panic!("unexpected user id");
380 }
381 }
382 assert_eq!(count, 2)
383 }
384
385 async fn submit_and_grade(
386 tx: &mut PgConnection,
387 ex: Uuid,
388 ex_slide: Uuid,
389 et: Uuid,
390 u: Uuid,
391 course_id: Uuid,
392 score_given: f32,
393 ) {
394 let exercise = exercises::get_by_id(tx, ex).await.unwrap();
395 user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(course_id), None)
396 .await
397 .unwrap();
398 user_exercise_states::upsert_selected_exercise_slide_id(
399 tx,
400 u,
401 ex,
402 Some(course_id),
403 None,
404 Some(ex_slide),
405 )
406 .await
407 .unwrap();
408 let user_exercise_state = user_exercise_states::get_or_create_user_exercise_state(
409 tx,
410 u,
411 ex,
412 Some(course_id),
413 None,
414 )
415 .await
416 .unwrap();
417 let mut exercise_with_user_state =
418 ExerciseWithUserState::new(exercise, user_exercise_state).unwrap();
419 let jwt_key = Arc::new(JwtKey::test_key());
420 headless_lms_models::library::grading::grade_user_submission(
421 tx,
422 &mut exercise_with_user_state,
423 &StudentExerciseSlideSubmission {
424 exercise_slide_id: ex_slide,
425 exercise_task_submissions: vec![StudentExerciseTaskSubmission {
426 exercise_task_id: et,
427 data_json: Value::Null,
428 }],
429 },
430 GradingPolicy::Fixed(HashMap::from([(
431 et,
432 ExerciseTaskGradingResult {
433 feedback_json: None,
434 feedback_text: None,
435 grading_progress: GradingProgress::FullyGraded,
436 score_given,
437 score_maximum: 100,
438 set_user_variables: Some(HashMap::new()),
439 },
440 )])),
441 models_requests::fetch_service_info,
442 models_requests::make_grading_request_sender(jwt_key),
443 )
444 .await
445 .unwrap();
446 }
447}