headless_lms_utils/file_store/
google_cloud_file_store.rs

1use std::path::{Path, PathBuf};
2
3use crate::prelude::*;
4use async_trait::async_trait;
5use bytes::Bytes;
6use cloud_storage::Client;
7use futures::{StreamExt, future::try_join};
8use tokio_stream::wrappers::ReceiverStream;
9
10use super::{FileStore, GenericPayload, generate_cache_folder_dir, path_to_str};
11
12const BUFFER_SIZE: usize = 512;
13
14pub struct GoogleCloudFileStore {
15    bucket_name: String,
16    client: Client,
17    pub cache_files_path: PathBuf,
18}
19
20impl GoogleCloudFileStore {
21    /// Needs to not be async because of how this is used in worker factories
22    #[instrument]
23    pub fn new(bucket_name: String) -> UtilResult<Self> {
24        let client = Client::default();
25        let cache_files_path = generate_cache_folder_dir()?;
26
27        Ok(Self {
28            bucket_name,
29            client,
30            cache_files_path,
31        })
32    }
33}
34
35#[async_trait(?Send)]
36impl FileStore for GoogleCloudFileStore {
37    async fn upload(&self, path: &Path, file: Vec<u8>, mime_type: &str) -> UtilResult<()> {
38        self.client
39            .object()
40            .create(&self.bucket_name, file, path_to_str(path)?, mime_type)
41            .await?;
42        Ok(())
43    }
44
45    async fn download(&self, path: &Path) -> UtilResult<Vec<u8>> {
46        let res = self
47            .client
48            .object()
49            .download(&self.bucket_name, path_to_str(path)?)
50            .await?;
51        Ok(res)
52    }
53
54    async fn get_direct_download_url(&self, path: &Path) -> UtilResult<String> {
55        let object = self
56            .client
57            .object()
58            .read(&self.bucket_name, path_to_str(path)?)
59            .await?;
60        let url = object.download_url(300)?;
61        Ok(url)
62    }
63
64    async fn delete(&self, path: &Path) -> UtilResult<()> {
65        self.client
66            .object()
67            .delete(&self.bucket_name, path_to_str(path)?)
68            .await?;
69        Ok(())
70    }
71
72    async fn upload_stream(
73        &self,
74        path: &Path,
75        mut contents: GenericPayload,
76        mime_type: &str,
77    ) -> UtilResult<()> {
78        let object_client = self.client.object();
79        let (sender, receiver) = tokio::sync::mpsc::channel(BUFFER_SIZE);
80        let receiver_stream = ReceiverStream::new(receiver);
81        let send_fut = async {
82            while let Some(bytes) = contents.next().await {
83                sender
84                    .send(bytes)
85                    .await
86                    .map_err(|e| cloud_storage::Error::Other(e.to_string()))?;
87            }
88            drop(sender);
89            Ok(())
90        };
91        let recv_fut = object_client.create_streamed(
92            &self.bucket_name,
93            receiver_stream,
94            None,
95            path_to_str(path)?,
96            mime_type,
97        );
98        try_join(send_fut, recv_fut).await?;
99        Ok(())
100    }
101
102    async fn download_stream(
103        &self,
104        path: &Path,
105    ) -> UtilResult<Box<dyn futures::Stream<Item = std::io::Result<bytes::Bytes>>>> {
106        let stream = self
107            .client
108            .object()
109            .download_streamed(&self.bucket_name, path_to_str(path)?)
110            .await?;
111        let stream_with_corrected_type = stream
112            // cloud_storage download_streamed returns the bytes one by one which is not optimal for us
113            // that's why why group the singular bytes to chunks and convert those chunks to Bytes objects.
114            .chunks(BUFFER_SIZE)
115            .map(|chunked_bytes_results| {
116                // Turn Vec<Result<u8>> -> Result<Vec<u8>>
117                let with_combined_result = chunked_bytes_results
118                    .into_iter()
119                    .collect::<Result<Vec<_>, _>>();
120                with_combined_result
121                    .map(Bytes::from)
122                    .map_err(std::io::Error::other)
123            });
124        Ok(Box::new(stream_with_corrected_type))
125    }
126
127    fn get_cache_files_folder_path(&self) -> UtilResult<&Path> {
128        Ok(&self.cache_files_path)
129    }
130}