headless_lms_utils/file_store/
google_cloud_file_store.rs1use std::path::{Path, PathBuf};
2
3use crate::prelude::*;
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::StreamExt;
7use google_cloud_auth::credentials::{
8 Builder as CredentialsBuilder, Credentials,
9 service_account::Builder as ServiceAccountCredentialsBuilder,
10};
11use google_cloud_auth::signer::Signer;
12use google_cloud_storage::builder::storage::SignedUrlBuilder;
13use google_cloud_storage::client::{Storage, StorageControl};
14use serde_json::Value;
15use std::{env, io};
16
17use super::{FileStore, GenericPayload, generate_cache_folder_dir, path_to_str};
18
19pub struct GoogleCloudFileStore {
20 bucket_name: String,
21 storage_client: Storage,
22 control_client: StorageControl,
23 signer: Signer,
24 pub cache_files_path: PathBuf,
25}
26
27impl GoogleCloudFileStore {
28 #[instrument]
30 pub async fn new(bucket_name: String) -> UtilResult<Self> {
31 let (credentials, signer) = Self::build_credentials_and_signer()?;
32 let storage_client = match credentials.clone() {
33 Some(credentials) => Storage::builder()
34 .with_credentials(credentials)
35 .build()
36 .await
37 .map_err(Self::map_init_error)?,
38 None => Storage::builder()
39 .build()
40 .await
41 .map_err(Self::map_init_error)?,
42 };
43 let control_client = match credentials {
44 Some(credentials) => StorageControl::builder()
45 .with_credentials(credentials)
46 .build()
47 .await
48 .map_err(Self::map_init_error)?,
49 None => StorageControl::builder()
50 .build()
51 .await
52 .map_err(Self::map_init_error)?,
53 };
54 let cache_files_path = generate_cache_folder_dir()?;
55
56 Ok(Self {
57 bucket_name,
58 storage_client,
59 control_client,
60 signer,
61 cache_files_path,
62 })
63 }
64
65 fn bucket_resource_name(&self) -> String {
67 format!("projects/_/buckets/{}", self.bucket_name)
68 }
69
70 fn build_credentials_and_signer() -> UtilResult<(Option<Credentials>, Signer)> {
73 if let Some(credentials_json) = Self::credentials_json_from_env()? {
74 let credentials = ServiceAccountCredentialsBuilder::new(credentials_json.clone())
75 .build()
76 .map_err(Self::map_init_error)?;
77 let signer = ServiceAccountCredentialsBuilder::new(credentials_json)
78 .build_signer()
79 .map_err(Self::map_init_error)?;
80 return Ok((Some(credentials), signer));
81 }
82
83 let signer = CredentialsBuilder::default()
84 .build_signer()
85 .map_err(Self::map_init_error)?;
86 Ok((None, signer))
87 }
88
89 fn credentials_json_from_env() -> UtilResult<Option<Value>> {
90 let raw_json = env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON")
91 .or_else(|_| env::var("SERVICE_ACCOUNT_JSON"))
92 .ok();
93 raw_json
94 .map(|value| serde_json::from_str(&value).map_err(Self::map_init_error))
95 .transpose()
96 }
97
98 fn map_init_error<E: std::error::Error + Send + Sync + 'static>(error: E) -> UtilError {
100 UtilError::new(
101 UtilErrorType::CloudStorage,
102 error.to_string(),
103 Some(error.into()),
104 )
105 }
106}
107
108#[async_trait(?Send)]
109impl FileStore for GoogleCloudFileStore {
110 async fn upload(&self, path: &Path, file: Vec<u8>, mime_type: &str) -> UtilResult<()> {
111 self.storage_client
112 .write_object(
113 self.bucket_resource_name(),
114 path_to_str(path)?,
115 Bytes::from(file),
116 )
117 .set_content_type(mime_type)
118 .send_buffered()
119 .await?;
120 Ok(())
121 }
122
123 async fn download(&self, path: &Path) -> UtilResult<Vec<u8>> {
124 let mut reader = self
125 .storage_client
126 .read_object(self.bucket_resource_name(), path_to_str(path)?)
127 .send()
128 .await?;
129 let mut out = Vec::new();
130 while let Some(chunk) = reader.next().await.transpose()? {
131 out.extend_from_slice(&chunk);
132 }
133 Ok(out)
134 }
135
136 async fn get_direct_download_url(&self, path: &Path) -> UtilResult<String> {
137 let object_name = path_to_str(path)?;
138 self.control_client
140 .get_object()
141 .set_bucket(self.bucket_resource_name())
142 .set_object(object_name)
143 .send()
144 .await?;
145 let url = SignedUrlBuilder::for_object(self.bucket_resource_name(), object_name)
146 .with_method(google_cloud_storage::http::Method::GET)
147 .with_expiration(std::time::Duration::from_secs(300))
148 .sign_with(&self.signer)
149 .await
150 .map_err(|e| {
151 UtilError::new(UtilErrorType::CloudStorage, e.to_string(), Some(e.into()))
152 })?;
153 Ok(url)
154 }
155
156 async fn delete(&self, path: &Path) -> UtilResult<()> {
157 self.control_client
158 .delete_object()
159 .set_bucket(self.bucket_resource_name())
160 .set_object(path_to_str(path)?)
161 .send()
162 .await?;
163 Ok(())
164 }
165
166 async fn upload_stream(
167 &self,
168 path: &Path,
169 mut contents: GenericPayload,
170 mime_type: &str,
171 ) -> UtilResult<()> {
172 let mut collected = Vec::new();
173 while let Some(chunk) = contents.next().await {
174 let chunk = chunk
175 .map_err(|e| UtilError::new(UtilErrorType::CloudStorage, e.to_string(), None))?;
176 collected.extend_from_slice(&chunk);
177 }
178 self.storage_client
179 .write_object(
180 self.bucket_resource_name(),
181 path_to_str(path)?,
182 Bytes::from(collected),
183 )
184 .set_content_type(mime_type)
185 .send_buffered()
186 .await?;
187 Ok(())
188 }
189
190 async fn download_stream(
191 &self,
192 path: &Path,
193 ) -> UtilResult<Box<dyn futures::Stream<Item = std::io::Result<bytes::Bytes>>>> {
194 let mut reader = self
195 .storage_client
196 .read_object(self.bucket_resource_name(), path_to_str(path)?)
197 .send()
198 .await?;
199 let stream = async_stream::stream! {
200 while let Some(item) = reader.next().await {
201 yield item.map_err(io::Error::other);
202 }
203 };
204 Ok(Box::new(stream))
205 }
206
207 fn get_cache_files_folder_path(&self) -> UtilResult<&Path> {
208 Ok(&self.cache_files_path)
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use std::sync::Mutex;
216
217 static ENV_MUTEX: Mutex<()> = Mutex::new(());
218
219 struct EnvRestore {
220 google_application_credentials_json: Option<String>,
221 service_account_json: Option<String>,
222 }
223
224 impl EnvRestore {
225 fn capture() -> Self {
226 Self {
227 google_application_credentials_json: env::var(
228 "GOOGLE_APPLICATION_CREDENTIALS_JSON",
229 )
230 .ok(),
231 service_account_json: env::var("SERVICE_ACCOUNT_JSON").ok(),
232 }
233 }
234 }
235
236 impl Drop for EnvRestore {
237 fn drop(&mut self) {
238 unsafe {
239 match &self.google_application_credentials_json {
240 Some(value) => env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", value),
241 None => env::remove_var("GOOGLE_APPLICATION_CREDENTIALS_JSON"),
242 }
243 match &self.service_account_json {
244 Some(value) => env::set_var("SERVICE_ACCOUNT_JSON", value),
245 None => env::remove_var("SERVICE_ACCOUNT_JSON"),
246 }
247 }
248 }
249 }
250
251 #[tokio::test]
252 async fn uses_google_application_credentials_json_contents() {
253 let signer = {
254 let _guard = ENV_MUTEX.lock().expect("environment mutex was poisoned");
255 let _restore = EnvRestore::capture();
256 unsafe {
257 env::set_var(
258 "GOOGLE_APPLICATION_CREDENTIALS_JSON",
259 r#"{
260 "type": "service_account",
261 "client_email": "json-creds@example.iam.gserviceaccount.com",
262 "private_key_id": "test-private-key-id",
263 "private_key": "not-a-real-private-key",
264 "project_id": "test-project-id"
265 }"#,
266 );
267 env::remove_var("SERVICE_ACCOUNT_JSON");
268 }
269
270 GoogleCloudFileStore::build_credentials_and_signer()
271 .expect("raw JSON credentials should build a signer")
272 .1
273 };
274
275 let client_email = signer
276 .client_email()
277 .await
278 .expect("service account signer should expose client email without IAM API");
279 assert_eq!(client_email, "json-creds@example.iam.gserviceaccount.com");
280 }
281}