headless_lms_utils/file_store/
local_file_store.rs

1use std::path::{Path, PathBuf};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use futures::{Stream, StreamExt};
6
7use tokio::{
8    fs::{self, OpenOptions},
9    io::{self, AsyncWriteExt, BufWriter},
10};
11use tokio_util::io::ReaderStream;
12
13use super::{FileStore, GenericPayload, generate_cache_folder_dir, path_to_str};
14use crate::prelude::*;
15
16#[derive(Debug, Clone)]
17pub struct LocalFileStore {
18    pub base_path: PathBuf,
19    pub base_url: String,
20    pub cache_files_path: PathBuf,
21}
22
23impl LocalFileStore {
24    /// Needs to not be async because of how this is used in worker factories
25    pub fn new(base_path: PathBuf, base_url: String) -> UtilResult<Self> {
26        if base_path.exists() {
27            if !base_path.is_dir() {
28                return Err(UtilError::new(
29                    UtilErrorType::Other,
30                    "Base path should be a folder".to_string(),
31                    None,
32                ));
33            }
34        } else {
35            std::fs::create_dir_all(&base_path)?;
36        }
37        let cache_files_path = generate_cache_folder_dir()?;
38        Ok(Self {
39            base_path,
40            base_url,
41            cache_files_path,
42        })
43    }
44}
45#[async_trait(?Send)]
46impl FileStore for LocalFileStore {
47    async fn upload(&self, path: &Path, contents: Vec<u8>, _mime_type: &str) -> UtilResult<()> {
48        let full_path = self.base_path.join(path);
49        if let Some(parent) = full_path.parent() {
50            fs::create_dir_all(parent).await?;
51        }
52        fs::write(full_path, contents).await?;
53        Ok(())
54    }
55
56    async fn download(&self, path: &Path) -> UtilResult<Vec<u8>> {
57        let full_path = self.base_path.join(path);
58        Ok(fs::read(full_path).await?)
59    }
60
61    async fn delete(&self, path: &Path) -> UtilResult<()> {
62        let full_path = self.base_path.join(path);
63        fs::remove_file(full_path).await?;
64        Ok(())
65    }
66
67    async fn get_direct_download_url(&self, path: &Path) -> UtilResult<String> {
68        let full_path = self.base_path.join(path);
69        if !full_path.exists() {
70            return Err(UtilError::new(
71                UtilErrorType::Other,
72                "File does not exist.".to_string(),
73                None,
74            ));
75        }
76        let path_str = path_to_str(path)?;
77        if self.base_url.ends_with('/') {
78            return Ok(format!("{}{}", self.base_url, path_str));
79        }
80        Ok(format!("{}/{}", self.base_url, path_str))
81    }
82
83    async fn upload_stream(
84        &self,
85        path: &Path,
86        mut contents: GenericPayload,
87        _mime_type: &str,
88    ) -> UtilResult<()> {
89        let full_path = self.base_path.join(path);
90        let parent_option = full_path.parent();
91        if parent_option.is_none() {
92            return Err(UtilError::new(
93                UtilErrorType::Other,
94                "Media path did not have a parent folder".to_string(),
95                None,
96            ));
97        }
98        let parent = parent_option.unwrap();
99        if parent.exists() {
100            if !parent.is_dir() {
101                return Err(UtilError::new(
102                    UtilErrorType::Other,
103                    "Base path should be a folder".to_string(),
104                    None,
105                ));
106            }
107        } else {
108            fs::create_dir_all(&parent).await?;
109        }
110        let file = OpenOptions::new()
111            .truncate(true)
112            .create(true)
113            .write(true)
114            .open(full_path)
115            .await?;
116
117        let mut buf_writer = BufWriter::new(file);
118
119        while let Some(bytes_res) = contents.next().await {
120            let bytes =
121                bytes_res.map_err(|e| UtilError::new(UtilErrorType::Other, e.to_string(), None))?;
122            buf_writer.write_all(&bytes).await?;
123        }
124
125        buf_writer.flush().await?;
126
127        Ok(())
128    }
129
130    async fn download_stream(
131        &self,
132        path: &Path,
133    ) -> UtilResult<Box<dyn Stream<Item = std::io::Result<Bytes>>>> {
134        let full_path = self.base_path.join(path);
135        let file = fs::File::open(full_path).await?;
136        let reader = io::BufReader::new(file);
137        let stream = ReaderStream::new(reader);
138        Ok(Box::new(stream))
139    }
140
141    fn get_cache_files_folder_path(&self) -> UtilResult<&Path> {
142        Ok(&self.cache_files_path)
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::{env, path::Path};
149
150    use tempdir::TempDir;
151
152    use super::LocalFileStore;
153    use crate::file_store::FileStore;
154
155    #[tokio::test]
156    async fn upload_download_delete_works() {
157        // TODO: Audit that the environment access only happens in single-threaded code.
158        unsafe { env::set_var("HEADLESS_LMS_CACHE_FILES_PATH", "/tmp") };
159        let dir = TempDir::new("test-local-filestore").expect("Failed to create a temp dir");
160        let base_path = dir.into_path();
161        let local_file_store =
162            LocalFileStore::new(base_path.clone(), "http://localhost:3000".to_string())
163                .expect("Could not create local file storage");
164
165        let path1 = Path::new("file1");
166        let test_file_contents = "Test file contents".as_bytes().to_vec();
167        // Put content to storage and read it back
168        local_file_store
169            .upload(path1, test_file_contents.clone(), "text/plain")
170            .await
171            .expect("Failed to put a file into local file storage.");
172        let retrivied_file = local_file_store
173            .download(path1)
174            .await
175            .expect("Failed to retrieve a file from local file storage");
176        assert_eq!(test_file_contents, retrivied_file);
177
178        local_file_store
179            .delete(path1)
180            .await
181            .expect("Failed to delete a file");
182
183        // After deletion getting the file should fail
184        let retrivied_file2 = local_file_store.download(path1).await;
185        assert!(retrivied_file2.is_err());
186    }
187
188    #[tokio::test]
189    async fn get_download_url_works() {
190        // TODO: Audit that the environment access only happens in single-threaded code.
191        unsafe { env::set_var("HEADLESS_LMS_CACHE_FILES_PATH", "/tmp") };
192        let dir = TempDir::new("test-local-filestore").expect("Failed to create a temp dir");
193        let base_path = dir.into_path();
194        let local_file_store =
195            LocalFileStore::new(base_path.clone(), "http://localhost:3000".to_string())
196                .expect("Could not create local file storage");
197        let test_file_contents = "Test file contents 2".as_bytes().to_vec();
198        let path1 = Path::new("file1");
199        local_file_store
200            .upload(path1, test_file_contents.clone(), "text/plain")
201            .await
202            .expect("Failed to put a file into local file storage.");
203        let url = local_file_store
204            .get_direct_download_url(path1)
205            .await
206            .expect("Failed to get a download url");
207        let expected_url = format!("http://localhost:3000/{}", path1.to_string_lossy());
208        assert_eq!(url, expected_url);
209
210        let nonexistant_file = Path::new("does-not-exist");
211        let res = local_file_store
212            .get_direct_download_url(nonexistant_file)
213            .await;
214        assert!(res.is_err());
215    }
216}