headless_lms_server/domain/csv_export/
mod.rs

1pub mod code_giveaway_codes;
2pub mod course_instance_export;
3pub mod course_research_form_questions_answers_export;
4pub mod exercise_tasks_export;
5pub mod points;
6pub mod submissions;
7pub mod user_exercise_states_export;
8pub mod users_export;
9
10use anyhow::{Context, Result};
11use bytes::Bytes;
12use csv::Writer;
13use futures::{Stream, StreamExt, stream::FuturesUnordered};
14
15use async_trait::async_trait;
16
17use models::course_module_completions::CourseModuleCompletionWithRegistrationInfo;
18use serde::Serialize;
19use sqlx::PgConnection;
20use std::{
21    io,
22    io::Write,
23    sync::{Arc, Mutex},
24};
25use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28use crate::prelude::*;
29
30use super::authorization::{AuthorizationToken, AuthorizedResponse};
31/// Convenience struct for creating CSV data.
32struct CsvWriter<W: Write> {
33    csv_writer: Arc<Mutex<Writer<W>>>,
34    handles: FuturesUnordered<JoinHandle<Result<()>>>,
35}
36
37impl<W: Write + Send + 'static> CsvWriter<W> {
38    /// Creates a new CsvWriter, and also writes the given headers before returning.
39    async fn new_with_initialized_headers<I, T>(writer: W, headers: I) -> Result<Self>
40    where
41        I: IntoIterator<Item = T> + Send + 'static,
42        T: AsRef<[u8]>,
43    {
44        let mut writer = csv::WriterBuilder::new()
45            .has_headers(false)
46            .from_writer(writer);
47
48        // write headers first
49        let writer = tokio::task::spawn_blocking(move || {
50            writer
51                .write_record(headers)
52                .context("Failed to write headers")?;
53            Result::<_, anyhow::Error>::Ok(writer)
54        })
55        .await??;
56
57        Ok(Self {
58            csv_writer: Arc::new(Mutex::new(writer)),
59            handles: FuturesUnordered::new(),
60        })
61    }
62
63    /// Spawns a task that writes a single CSV record
64    fn write_record<I, T>(&self, csv_row: I)
65    where
66        I: IntoIterator<Item = T> + Send + 'static,
67        T: AsRef<[u8]>,
68    {
69        let writer = self.csv_writer.clone();
70        let handle = tokio::task::spawn_blocking(move || {
71            writer
72                .lock()
73                .map_err(|_| anyhow::anyhow!("Failed to lock mutex"))?
74                .write_record(csv_row)
75                .context("Failed to serialize points")
76        });
77        self.handles.push(handle);
78    }
79
80    /// Waits for handles to finish, flushes the writer and extracts the inner writer.
81    /// Should always be called before dropping the writer to make sure writing the CSV finishes properly.
82    async fn finish(mut self) -> Result<W> {
83        // ensure every task is finished before the writer is extracted
84        while let Some(handle) = self.handles.next().await {
85            handle??;
86        }
87
88        let writer = tokio::task::spawn_blocking(move || {
89            let _ = &self;
90            Arc::try_unwrap(self.csv_writer)
91                .map_err(|_| anyhow::anyhow!("Failed to extract inner writer from arc"))?
92                .into_inner()
93                .map_err(|e| anyhow::anyhow!("Failed to extract inner writer from mutex: {}", e))?
94                .into_inner()
95                .map_err(|e| {
96                    anyhow::anyhow!("Failed to extract inner writer from CSV writer: {}", e)
97                })
98        })
99        .await??;
100        Ok(writer)
101    }
102}
103
104/**
105 * For csv export. Return the grade as a number if there is a numeric grade. If the grade is not numeric, returns pass/fail/
106 * If course module has not been completed yet, returns "-".
107 */
108fn course_module_completion_info_to_grade_string(
109    input: Option<&CourseModuleCompletionWithRegistrationInfo>,
110) -> String {
111    let grade_string = input.map(|info| {
112        if let Some(grade) = info.grade {
113            return grade.to_string();
114        }
115        if info.passed {
116            return "pass".to_string();
117        };
118        "fail".to_string()
119    });
120    if let Some(grade_string) = grade_string {
121        if let Some(info) = input {
122            if !info.prerequisite_modules_completed {
123                return format!("{grade_string}*");
124            }
125        }
126        return grade_string;
127    }
128    "-".to_string()
129}
130
131pub struct CSVExportAdapter {
132    pub sender: UnboundedSender<ControllerResult<Bytes>>,
133    pub authorization_token: AuthorizationToken,
134}
135impl Write for CSVExportAdapter {
136    fn flush(&mut self) -> std::io::Result<()> {
137        Ok(())
138    }
139
140    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
141        let bytes = Bytes::copy_from_slice(buf);
142        let token = self.authorization_token;
143        self.sender
144            .send(token.authorized_ok(bytes))
145            .map_err(|e| io::Error::other(e.to_string()))?;
146        Ok(buf.len())
147    }
148}
149
150/** Without this one, actix cannot stream our authorized streams as responses
151
152```ignore
153HttpResponse::Ok()
154    .append_header((
155        "Content-Disposition",
156        format!(
157            "attachment; filename=\"Exam: {} - Submissions {}.csv\"",
158            exam.name,
159            Utc::now().format("%Y-%m-%d")
160        ),
161    ))
162    .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
163        receiver,
164    ))),
165```
166*/
167pub fn make_authorized_streamable(
168    stream: UnboundedReceiverStream<Result<AuthorizedResponse<bytes::Bytes>, ControllerError>>,
169) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
170    stream.map(|item| item.map(|item2| item2.data))
171}
172
173/**
174  For streaming arrays of json objects.
175*/
176pub fn serializable_sqlx_result_stream_to_json_stream(
177    stream: impl Stream<Item = sqlx::Result<impl Serialize>>,
178) -> impl Stream<Item = Result<bytes::Bytes, ControllerError>> {
179    let res_stream = stream.enumerate().map(|(n, item)| {
180        item.map(|item2| {
181            match serde_json::to_vec(&item2) {
182                Ok(mut v) => {
183                    // Only item index available, we don't know the length of the stream
184                    if n == 0 {
185                        // Start of array character to the beginning of the stream
186                        v.insert(0, b'[');
187                    } else {
188                        // Separator character before every item excluding the first item
189                        v.insert(0, b',');
190                    }
191                    Bytes::from(v)
192                }
193                Err(e) => {
194                    // Since we're already streaming a response, we have no way to change the status code of the response anymore.
195                    // Our best option at this point is to write the error to the response, hopefully causing the response to be invalid json.
196                    error!("Failed to serialize item: {}", e);
197                    Bytes::from(format!(
198                        "Streaming error: Failed to serialize item. Details: {:?}",
199                        e
200                    ))
201                }
202            }
203        })
204        .map_err(|original_error| {
205            ControllerError::new(
206                ControllerErrorType::InternalServerError,
207                original_error.to_string(),
208                Some(original_error.into()),
209            )
210        })
211    });
212    // Chaining the end of the json array character here because in the previous map we don't know the length of the stream
213    res_stream.chain(tokio_stream::iter(vec![Ok(Bytes::from_static(b"]"))]))
214}
215
216#[async_trait]
217pub trait CsvExportDataLoader {
218    async fn load_data(
219        &self,
220        sender: UnboundedSender<Result<AuthorizedResponse<Bytes>, ControllerError>>,
221        conn: &mut PgConnection,
222        token: AuthorizationToken,
223    ) -> anyhow::Result<CSVExportAdapter>;
224}
225
226pub async fn general_export(
227    pool: web::Data<PgPool>,
228    content_disposition: &str,
229    data_loader: impl CsvExportDataLoader + std::marker::Send + 'static,
230    token: AuthorizationToken,
231) -> ControllerResult<HttpResponse> {
232    let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<ControllerResult<Bytes>>();
233    // spawn handle that writes the csv row by row into the sender
234    let mut handle_conn = pool.acquire().await?;
235    let _handle = tokio::spawn(async move {
236        let fut = data_loader.load_data(sender, &mut handle_conn, token);
237        let res = fut.await;
238        if let Err(err) = res {
239            tracing::error!("Failed to export: {}", err);
240        }
241    });
242
243    // return response that streams data from the receiver
244    token.authorized_ok(
245        HttpResponse::Ok()
246            .append_header(("Content-Disposition", content_disposition))
247            .streaming(make_authorized_streamable(UnboundedReceiverStream::new(
248                receiver,
249            ))),
250    )
251}
252
253#[cfg(test)]
254mod test {
255    use std::{collections::HashMap, io::Cursor};
256
257    use headless_lms_models::{
258        exercise_slides,
259        exercise_task_gradings::ExerciseTaskGradingResult,
260        exercise_tasks::{self, NewExerciseTask},
261        exercises::{self, GradingProgress},
262        library::grading::{
263            GradingPolicy, StudentExerciseSlideSubmission, StudentExerciseTaskSubmission,
264        },
265        user_exercise_states,
266        user_exercise_states::ExerciseWithUserState,
267        users,
268    };
269    use models::chapters::{self, NewChapter};
270    use serde_json::Value;
271
272    use super::*;
273    use crate::{
274        domain::{
275            csv_export::points::export_course_instance_points,
276            models_requests::{self, JwtKey},
277        },
278        test_helper::*,
279    };
280
281    #[actix_web::test]
282    async fn exports() {
283        insert_data!(:tx, :user, :org, :course, :instance, :course_module, :chapter, :page, :exercise, :slide, :task);
284
285        let u2 = users::insert(
286            tx.as_mut(),
287            PKeyPolicy::Generate,
288            "second@example.org",
289            None,
290            None,
291        )
292        .await
293        .unwrap();
294        let c2 = chapters::insert(
295            tx.as_mut(),
296            PKeyPolicy::Generate,
297            &NewChapter {
298                name: "".to_string(),
299                color: Some("#065853".to_string()),
300                course_id: course,
301                chapter_number: 2,
302                front_page_id: None,
303                opens_at: None,
304                deadline: None,
305                course_module_id: Some(course_module.id),
306            },
307        )
308        .await
309        .unwrap();
310
311        let e2 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 0)
312            .await
313            .unwrap();
314        let s2 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e2, 0)
315            .await
316            .unwrap();
317        let et2 = exercise_tasks::insert(
318            tx.as_mut(),
319            PKeyPolicy::Generate,
320            NewExerciseTask {
321                exercise_slide_id: s2,
322                exercise_type: "".to_string(),
323                assignment: vec![],
324                public_spec: Some(Value::Null),
325                private_spec: Some(Value::Null),
326                model_solution_spec: Some(Value::Null),
327                order_number: 1,
328            },
329        )
330        .await
331        .unwrap();
332
333        let e3 = exercises::insert(tx.as_mut(), PKeyPolicy::Generate, course, "", page, c2, 1)
334            .await
335            .unwrap();
336        let s3 = exercise_slides::insert(tx.as_mut(), PKeyPolicy::Generate, e3, 0)
337            .await
338            .unwrap();
339        let et3 = exercise_tasks::insert(
340            tx.as_mut(),
341            PKeyPolicy::Generate,
342            NewExerciseTask {
343                exercise_slide_id: s3,
344                exercise_type: "".to_string(),
345                assignment: vec![],
346                public_spec: Some(Value::Null),
347                private_spec: Some(Value::Null),
348                model_solution_spec: Some(Value::Null),
349                order_number: 2,
350            },
351        )
352        .await
353        .unwrap();
354        submit_and_grade(tx.as_mut(), exercise, slide, task, user, course, 12.34).await;
355        submit_and_grade(tx.as_mut(), e2, s2, et2, user, course, 23.45).await;
356        submit_and_grade(tx.as_mut(), e2, s2, et2, u2, course, 34.56).await;
357        submit_and_grade(tx.as_mut(), e3, s3, et3, u2, course, 45.67).await;
358
359        let buf = vec![];
360        let buf = export_course_instance_points(tx.as_mut(), instance.id, buf)
361            .await
362            .unwrap();
363        let buf = Cursor::new(buf);
364
365        let mut reader = csv::Reader::from_reader(buf);
366        let mut count = 0;
367        for record in reader.records() {
368            count += 1;
369            let record = record.unwrap();
370            println!("{}", record.as_slice());
371            let user_id = Uuid::parse_str(&record[0]).unwrap();
372            let first = record[1].parse::<f32>().unwrap();
373            let second = record[2].parse::<f32>().unwrap();
374            if user_id == user {
375                assert!((first - 0.1234).abs() < 0.1 && (second - 0.2345).abs() < 0.1);
376            } else if user_id == u2 {
377                assert!((first - 0.0).abs() < 0.1 && (second - 0.8023).abs() < 0.1);
378            } else {
379                panic!("unexpected user id");
380            }
381        }
382        assert_eq!(count, 2)
383    }
384
385    async fn submit_and_grade(
386        tx: &mut PgConnection,
387        ex: Uuid,
388        ex_slide: Uuid,
389        et: Uuid,
390        u: Uuid,
391        course_id: Uuid,
392        score_given: f32,
393    ) {
394        let exercise = exercises::get_by_id(tx, ex).await.unwrap();
395        user_exercise_states::get_or_create_user_exercise_state(tx, u, ex, Some(course_id), None)
396            .await
397            .unwrap();
398        user_exercise_states::upsert_selected_exercise_slide_id(
399            tx,
400            u,
401            ex,
402            Some(course_id),
403            None,
404            Some(ex_slide),
405        )
406        .await
407        .unwrap();
408        let user_exercise_state = user_exercise_states::get_or_create_user_exercise_state(
409            tx,
410            u,
411            ex,
412            Some(course_id),
413            None,
414        )
415        .await
416        .unwrap();
417        let mut exercise_with_user_state =
418            ExerciseWithUserState::new(exercise, user_exercise_state).unwrap();
419        let jwt_key = Arc::new(JwtKey::test_key());
420        headless_lms_models::library::grading::grade_user_submission(
421            tx,
422            &mut exercise_with_user_state,
423            &StudentExerciseSlideSubmission {
424                exercise_slide_id: ex_slide,
425                exercise_task_submissions: vec![StudentExerciseTaskSubmission {
426                    exercise_task_id: et,
427                    data_json: Value::Null,
428                }],
429            },
430            GradingPolicy::Fixed(HashMap::from([(
431                et,
432                ExerciseTaskGradingResult {
433                    feedback_json: None,
434                    feedback_text: None,
435                    grading_progress: GradingProgress::FullyGraded,
436                    score_given,
437                    score_maximum: 100,
438                    set_user_variables: Some(HashMap::new()),
439                },
440            )])),
441            models_requests::fetch_service_info,
442            models_requests::make_grading_request_sender(jwt_key),
443        )
444        .await
445        .unwrap();
446    }
447}