headless_lms_utils/file_store/
google_cloud_file_store.rs1use std::path::{Path, PathBuf};
2
3use crate::prelude::*;
4use async_trait::async_trait;
5use bytes::Bytes;
6use cloud_storage::Client;
7use futures::{StreamExt, future::try_join};
8use tokio_stream::wrappers::ReceiverStream;
9
10use super::{FileStore, GenericPayload, generate_cache_folder_dir, path_to_str};
11
12const BUFFER_SIZE: usize = 512;
13
14pub struct GoogleCloudFileStore {
15 bucket_name: String,
16 client: Client,
17 pub cache_files_path: PathBuf,
18}
19
20impl GoogleCloudFileStore {
21 #[instrument]
23 pub fn new(bucket_name: String) -> UtilResult<Self> {
24 let client = Client::default();
25 let cache_files_path = generate_cache_folder_dir()?;
26
27 Ok(Self {
28 bucket_name,
29 client,
30 cache_files_path,
31 })
32 }
33}
34
35#[async_trait(?Send)]
36impl FileStore for GoogleCloudFileStore {
37 async fn upload(&self, path: &Path, file: Vec<u8>, mime_type: &str) -> UtilResult<()> {
38 self.client
39 .object()
40 .create(&self.bucket_name, file, path_to_str(path)?, mime_type)
41 .await?;
42 Ok(())
43 }
44
45 async fn download(&self, path: &Path) -> UtilResult<Vec<u8>> {
46 let res = self
47 .client
48 .object()
49 .download(&self.bucket_name, path_to_str(path)?)
50 .await?;
51 Ok(res)
52 }
53
54 async fn get_direct_download_url(&self, path: &Path) -> UtilResult<String> {
55 let object = self
56 .client
57 .object()
58 .read(&self.bucket_name, path_to_str(path)?)
59 .await?;
60 let url = object.download_url(300)?;
61 Ok(url)
62 }
63
64 async fn delete(&self, path: &Path) -> UtilResult<()> {
65 self.client
66 .object()
67 .delete(&self.bucket_name, path_to_str(path)?)
68 .await?;
69 Ok(())
70 }
71
72 async fn upload_stream(
73 &self,
74 path: &Path,
75 mut contents: GenericPayload,
76 mime_type: &str,
77 ) -> UtilResult<()> {
78 let object_client = self.client.object();
79 let (sender, receiver) = tokio::sync::mpsc::channel(BUFFER_SIZE);
80 let receiver_stream = ReceiverStream::new(receiver);
81 let send_fut = async {
82 while let Some(bytes) = contents.next().await {
83 sender
84 .send(bytes)
85 .await
86 .map_err(|e| cloud_storage::Error::Other(e.to_string()))?;
87 }
88 drop(sender);
89 Ok(())
90 };
91 let recv_fut = object_client.create_streamed(
92 &self.bucket_name,
93 receiver_stream,
94 None,
95 path_to_str(path)?,
96 mime_type,
97 );
98 try_join(send_fut, recv_fut).await?;
99 Ok(())
100 }
101
102 async fn download_stream(
103 &self,
104 path: &Path,
105 ) -> UtilResult<Box<dyn futures::Stream<Item = std::io::Result<bytes::Bytes>>>> {
106 let stream = self
107 .client
108 .object()
109 .download_streamed(&self.bucket_name, path_to_str(path)?)
110 .await?;
111 let stream_with_corrected_type = stream
112 .chunks(BUFFER_SIZE)
115 .map(|chunked_bytes_results| {
116 let with_combined_result = chunked_bytes_results
118 .into_iter()
119 .collect::<Result<Vec<_>, _>>();
120 with_combined_result
121 .map(Bytes::from)
122 .map_err(std::io::Error::other)
123 });
124 Ok(Box::new(stream_with_corrected_type))
125 }
126
127 fn get_cache_files_folder_path(&self) -> UtilResult<&Path> {
128 Ok(&self.cache_files_path)
129 }
130}