headless_lms_server/domain/csv_export/
mod.rs

1pub mod code_giveaway_codes;
2pub mod course_instance_export;
3pub mod course_research_form_questions_answers_export;
4pub mod exercise_tasks_export;
5pub mod points;
6pub mod submissions;
7pub mod user_exericse_states_export;
8pub mod users_export;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use csv::Writer;
13use futures::{Stream, StreamExt, stream::FuturesUnordered};
14
15use async_trait::async_trait;
16
17use models::course_module_completions::CourseModuleCompletionWithRegistrationInfo;
18use serde::Serialize;
19use sqlx::PgConnection;
20use std::{
21    io,
22    io::Write,
23    sync::{Arc, Mutex},
24};
25use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::prelude::*;
29
30use super::authorization::{AuthorizationToken, AuthorizedResponse};
31/// Convenience struct for creating CSV data.
32struct CsvWriter<W: Write> {
33    csv_writer: Arc<Mutex<Writer<W>>>,
34    handles: FuturesUnordered<JoinHandle<Result<()>>>,
35}
36
37impl<W: Write + Send + 'static> CsvWriter<W> {
38    /// Creates a new CsvWriter, and also writes the given headers before returning.
39    async fn new_with_initialized_headers<I, T>(writer: W, headers: I) -> Result<Self>
40    where
41        I: IntoIterator<Item = T> + Send + 'static,
42        T: AsRef<[u8]>,
43    {
44        let mut writer = csv::WriterBuilder::new()
45            .has_headers(false)
46            .from_writer(writer);
47
48        // write headers first
49        let writer = tokio::task::spawn_blocking(move || {
50            writer
51                .write_record(headers)
52                .context("Failed to write headers")?;
53            Result::<_, anyhow::Error>::Ok(writer)
54        })
55        .await??;
56
57        Ok(Self {
58            csv_writer: Arc::new(Mutex::new(writer)),
59            handles: FuturesUnordered::new(),
60        })
61    }
62
63    /// Spawns a task that writes a single CSV record
64    fn write_record<I, T>(&self, csv_row: I)
65    where
66        I: IntoIterator<Item = T> + Send + 'static,
67        T: AsRef<[u8]>,
68    {
69        let writer = self.csv_writer.clone();
70        let handle = tokio::task::spawn_blocking(move || {
71            writer
72                .lock()
73                .map_err(|_| anyhow::anyhow!("Failed to lock mutex"))?
74                .write_record(csv_row)
75                .context("Failed to serialize points")
76        });
77        self.handles.push(handle);
78    }
79
80    /// Waits for handles to finish, flushes the writer and extracts the inner writer.
81    /// Should always be called before dropping the writer to make sure writing the CSV finishes properly.
82    async fn finish(mut self) -> Result<W> {
83        // ensure every task is finished before the writer is extracted
84        while let Some(handle) = self.handles.next().await {
85            handle??;
86        }
87
88        let writer = tokio::task::spawn_blocking(move || {
89            let _ = &self;
90            Arc::try_unwrap(self.csv_writer)
91                .map_err(|_| anyhow::anyhow!("Failed to extract inner writer from arc"))?
92                .into_inner()
93                .map_err(|e| {
94                    anyhow::anyhow!(
95                        "Failed to extract inner writer from mutex: {}",
96                        e.to_string()
97                    )
98                })?
99                .into_inner()
100                .map_err(|e| {
101                    anyhow::anyhow!(
102                        "Failed to extract inner writer from CSV writer: {}",
103                        e.to_string()
104                    )
105                })
106        })
107        .await??;
108        Ok(writer)
109    }
110}
111
112/**
113 * For csv export. Return the grade as a number if there is a numeric grade. If the grade is not numeric, returns pass/fail/
114 * If course module has not been completed yet, returns "-".
115 */
116fn course_module_completion_info_to_grade_string(
117    input: Option<&CourseModuleCompletionWithRegistrationInfo>,
118) -> String {
119    let grade_string = input.map(|info| {
120        if let Some(grade) = info.grade {
121            return grade.to_string();
122        }
123        if info.passed {
124            return "pass".to_string();
125        };
126        "fail".to_string()
127    });
128    if let Some(grade_string) = grade_string {
129        if let Some(info) = input {
130            if !info.prerequisite_modules_completed {
131                return format!("{grade_string}*");
132            }
133        }
134        return grade_string;
135    }
136    "-".to_string()
137}
138
139pub struct CSVExportAdapter {
140    pub sender: UnboundedSender<ControllerResult<Bytes>>,
141    pub authorization_token: AuthorizationToken,
142}
143impl Write for CSVExportAdapter {
144    fn flush(&mut self) -> std::io::Result<()> {
145        Ok(())
146    }
147
148    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
149        let bytes = Bytes::copy_from_slice(buf);
150        let token = self.authorization_token;
151        self.sender
152            .send(token.authorized_ok(bytes))
153            .map_err(|e| io::Error::other(e.to_string()))?;
154        Ok(buf.len())
155    }
156}
157
158/** Without this one, actix cannot stream our authorized streams as responses
159
160```ignore
161HttpResponse::Ok()
162    .append_header((
163        "Content-Disposition",
164        format!(
165            "attachment; filename=\"Exam: {} - Submissions {}.csv\"",
166            exam.name,
167            Utc::now().format("%Y-%m-%d")
168        ),
169    ))
170    .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
171        receiver,
172    ))),
173```
174*/
175pub fn make_authorized_streamable(
176    stream: UnboundedReceiverStream<Result<AuthorizedResponse<bytes::Bytes>, ControllerError>>,
177) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
178    stream.map(|item| item.map(|item2| item2.data))
179}
180
181/**
182  For streaming arrays of json objects.
183*/
184pub fn serializable_sqlx_result_stream_to_json_stream(
185    stream: impl Stream<Item = sqlx::Result<impl Serialize>>,
186) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
187    let res_stream = stream.enumerate().map(|(n, item)| {
188        item.map(|item2| {
189            match serde_json::to_vec(&item2) {
190                Ok(mut v) => {
191                    // Only item index available, we don't know the length of the stream
192                    if n == 0 {
193                        // Start of array character to the beginning of the stream
194                        v.insert(0, b'[');
195                    } else {
196                        // Separator character before every item excluding the first item
197                        v.insert(0, b',');
198                    }
199                    Bytes::from(v)
200                }
201                Err(e) => {
202                    // Since we're already streaming a response, we have no way to change the status code of the response anymore.
203                    // Our best option at this point is to write the error to the response, hopefully causing the response to be invalid json.
204                    error!("Failed to serialize item: {}", e);
205                    Bytes::from(format!(
206                        "Streaming error: Failed to serialize item. Details: {:?}",
207                        e
208                    ))
209                }
210            }
211        })
212        .map_err(|original_error| {
213            ControllerError::new(
214                ControllerErrorType::InternalServerError,
215                original_error.to_string(),
216                Some(original_error.into()),
217            )
218        })
219    });
220    // Chaining the end of the json array character here because in the previous map we don't know the length of the stream
221    res_stream.chain(tokio_stream::iter(vec![Ok(Bytes::from_static(b"]"))]))
222}
223
224#[async_trait]
225pub trait CsvExportDataLoader {
226    async fn load_data(
227        &self,
228        sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
229        conn: &mut PgConnection,
230        token: AuthorizationToken,
231    ) -> anyhow::Result<CSVExportAdapter>;
232}
233
234pub async fn general_export(
235    pool: web::Data<PgPool>,
236    content_disposition: &str,
237    data_loader: impl CsvExportDataLoader + std::marker::Send + 'static,
238    token: AuthorizationToken,
239) -> ControllerResult<HttpResponse> {
240    let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<ControllerResult<Bytes>>();
241    // spawn handle that writes the csv row by row into the sender
242    let mut handle_conn = pool.acquire().await?;
243    let _handle = tokio::spawn(async move {
244        let fut = data_loader.load_data(sender, &mut handle_conn, token);
245        let res = fut.await;
246        if let Err(err) = res {
247            tracing::error!("Failed to export: {}", err);
248        }
249    });
250
251    // return response that streams data from the receiver
252    token.authorized_ok(
253        HttpResponse::Ok()
254            .append_header(("Content-Disposition", content_disposition))
255            .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
256                receiver,
257            ))),
258    )
259}
260
261#[cfg(test)]
262mod test {
263    use std::{collections::HashMap, io::Cursor};
264
265    use headless_lms_models::{
266        exercise_slides,
267        exercise_task_gradings::ExerciseTaskGradingResult,
268        exercise_tasks::{self, NewExerciseTask},
269        exercises::{self, GradingProgress},
270        library::grading::{
271            GradingPolicy, StudentExerciseSlideSubmission, StudentExerciseTaskSubmission,
272        },
273        user_exercise_states,
274        user_exercise_states::ExerciseWithUserState,
275        users,
276    };
277    use models::chapters::{self, NewChapter};
278    use serde_json::Value;
279
280    use super::*;
281    use crate::{
282        domain::{
283            csv_export::points::export_course_instance_points,
284            models_requests::{self, JwtKey},
285        },
286        test_helper::*,
287    };
288
289    #[actix_web::test]
290    async fn exports() {
291        insert_data!(:tx, :user, :org, :course, :instance, :course_module, :chapter, :page, :exercise, :slide, :task);
292
293        let u2 = users::insert(
294            tx.as_mut(),
295            PKeyPolicy::Generate,
296            "second@example.org",
297            None,
298            None,
299        )
300        .await
301        .unwrap();
302        let c2 = chapters::insert(
303            tx.as_mut(),
304            PKeyPolicy::Generate,
305            &NewChapter {
306                name: "".to_string(),
307                color: Some("#065853".to_string()),
308                course_id: course,
309                chapter_number: 2,
310                front_page_id: None,
311                opens_at: None,
312                deadline: None,
313                course_module_id: Some(course_module.id),
314            },
315        )
316        .await
317        .unwrap();
318
319        let e2 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 0)
320            .await
321            .unwrap();
322        let s2 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e2, 0)
323            .await
324            .unwrap();
325        let et2 = exercise_tasks::insert(
326            tx.as_mut(),
327            PKeyPolicy::Generate,
328            NewExerciseTask {
329                exercise_slide_id: s2,
330                exercise_type: "".to_string(),
331                assignment: vec![],
332                public_spec: Some(Value::Null),
333                private_spec: Some(Value::Null),
334                model_solution_spec: Some(Value::Null),
335                order_number: 1,
336            },
337        )
338        .await
339        .unwrap();
340
341        let e3 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 1)
342            .await
343            .unwrap();
344        let s3 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e3, 0)
345            .await
346            .unwrap();
347        let et3 = exercise_tasks::insert(
348            tx.as_mut(),
349            PKeyPolicy::Generate,
350            NewExerciseTask {
351                exercise_slide_id: s3,
352                exercise_type: "".to_string(),
353                assignment: vec![],
354                public_spec: Some(Value::Null),
355                private_spec: Some(Value::Null),
356                model_solution_spec: Some(Value::Null),
357                order_number: 2,
358            },
359        )
360        .await
361        .unwrap();
362        submit_and_grade(tx.as_mut(), exercise, slide, task, user, instance.id, 12.34).await;
363        submit_and_grade(tx.as_mut(), e2, s2, et2, user, instance.id, 23.45).await;
364        submit_and_grade(tx.as_mut(), e2, s2, et2, u2, instance.id, 34.56).await;
365        submit_and_grade(tx.as_mut(), e3, s3, et3, u2, instance.id, 45.67).await;
366
367        let buf = vec![];
368        let buf = export_course_instance_points(tx.as_mut(), instance.id, buf)
369            .await
370            .unwrap();
371        let buf = Cursor::new(buf);
372
373        let mut reader = csv::Reader::from_reader(buf);
374        let mut count = 0;
375        for record in reader.records() {
376            count += 1;
377            let record = record.unwrap();
378            println!("{}", record.as_slice());
379            let user_id = Uuid::parse_str(&record[0]).unwrap();
380            let first = record[1].parse::<f32>().unwrap();
381            let second = record[2].parse::<f32>().unwrap();
382            if user_id == user {
383                assert!((first - 0.1234).abs() < 0.1 && (second - 0.2345).abs() < 0.1);
384            } else if user_id == u2 {
385                assert!((first - 0.0).abs() < 0.1 && (second - 0.8023).abs() < 0.1);
386            } else {
387                panic!("unexpected user id");
388            }
389        }
390        assert_eq!(count, 2)
391    }
392
393    async fn submit_and_grade(
394        tx: &mut PgConnection,
395        ex: Uuid,
396        ex_slide: Uuid,
397        et: Uuid,
398        u: Uuid,
399        ci: Uuid,
400        score_given: f32,
401    ) {
402        let exercise = exercises::get_by_id(tx, ex).await.unwrap();
403        user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(ci), None)
404            .await
405            .unwrap();
406        user_exercise_states::upsert_selected_exercise_slide_id(
407            tx,
408            u,
409            ex,
410            Some(ci),
411            None,
412            Some(ex_slide),
413        )
414        .await
415        .unwrap();
416        let user_exercise_state =
417            user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(ci), None)
418                .await
419                .unwrap();
420        let mut exercise_with_user_state =
421            ExerciseWithUserState::new(exercise, user_exercise_state).unwrap();
422        let jwt_key = Arc::new(JwtKey::test_key());
423        headless_lms_models::library::grading::grade_user_submission(
424            tx,
425            &mut exercise_with_user_state,
426            &StudentExerciseSlideSubmission {
427                exercise_slide_id: ex_slide,
428                exercise_task_submissions: vec![StudentExerciseTaskSubmission {
429                    exercise_task_id: et,
430                    data_json: Value::Null,
431                }],
432            },
433            GradingPolicy::Fixed(HashMap::from([(
434                et,
435                ExerciseTaskGradingResult {
436                    feedback_json: None,
437                    feedback_text: None,
438                    grading_progress: GradingProgress::FullyGraded,
439                    score_given,
440                    score_maximum: 100,
441                    set_user_variables: Some(HashMap::new()),
442                },
443            )])),
444            models_requests::fetch_service_info,
445            models_requests::make_grading_request_sender(jwt_key),
446        )
447        .await
448        .unwrap();
449    }
450}