Skip to main content

headless_lms_utils/file_store/
google_cloud_file_store.rs

1use std::path::{Path, PathBuf};
2
3use crate::prelude::*;
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures::StreamExt;
7use google_cloud_auth::credentials::{
8    Builder as CredentialsBuilder, Credentials,
9    service_account::Builder as ServiceAccountCredentialsBuilder,
10};
11use google_cloud_auth::signer::Signer;
12use google_cloud_storage::builder::storage::SignedUrlBuilder;
13use google_cloud_storage::client::{Storage, StorageControl};
14use serde_json::Value;
15use std::{env, io};
16
17use super::{FileStore, GenericPayload, generate_cache_folder_dir, path_to_str};
18
19pub struct GoogleCloudFileStore {
20    bucket_name: String,
21    storage_client: Storage,
22    control_client: StorageControl,
23    signer: Signer,
24    pub cache_files_path: PathBuf,
25}
26
27impl GoogleCloudFileStore {
28    /// Creates a Google Cloud file store using ADC-backed SDK clients.
29    #[instrument]
30    pub async fn new(bucket_name: String) -> UtilResult<Self> {
31        let (credentials, signer) = Self::build_credentials_and_signer()?;
32        let storage_client = match credentials.clone() {
33            Some(credentials) => Storage::builder()
34                .with_credentials(credentials)
35                .build()
36                .await
37                .map_err(Self::map_init_error)?,
38            None => Storage::builder()
39                .build()
40                .await
41                .map_err(Self::map_init_error)?,
42        };
43        let control_client = match credentials {
44            Some(credentials) => StorageControl::builder()
45                .with_credentials(credentials)
46                .build()
47                .await
48                .map_err(Self::map_init_error)?,
49            None => StorageControl::builder()
50                .build()
51                .await
52                .map_err(Self::map_init_error)?,
53        };
54        let cache_files_path = generate_cache_folder_dir()?;
55
56        Ok(Self {
57            bucket_name,
58            storage_client,
59            control_client,
60            signer,
61            cache_files_path,
62        })
63    }
64
65    /// Converts a raw bucket id into the GCS API resource name format.
66    fn bucket_resource_name(&self) -> String {
67        format!("projects/_/buckets/{}", self.bucket_name)
68    }
69
70    /// Builds Google Cloud credentials while preserving the legacy `cloud-storage`
71    /// crate's support for JSON credentials supplied directly in an environment variable.
72    fn build_credentials_and_signer() -> UtilResult<(Option<Credentials>, Signer)> {
73        if let Some(credentials_json) = Self::credentials_json_from_env()? {
74            let credentials = ServiceAccountCredentialsBuilder::new(credentials_json.clone())
75                .build()
76                .map_err(Self::map_init_error)?;
77            let signer = ServiceAccountCredentialsBuilder::new(credentials_json)
78                .build_signer()
79                .map_err(Self::map_init_error)?;
80            return Ok((Some(credentials), signer));
81        }
82
83        let signer = CredentialsBuilder::default()
84            .build_signer()
85            .map_err(Self::map_init_error)?;
86        Ok((None, signer))
87    }
88
89    fn credentials_json_from_env() -> UtilResult<Option<Value>> {
90        let raw_json = env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON")
91            .or_else(|_| env::var("SERVICE_ACCOUNT_JSON"))
92            .ok();
93        raw_json
94            .map(|value| serde_json::from_str(&value).map_err(Self::map_init_error))
95            .transpose()
96    }
97
98    /// Maps SDK initialization errors into the cloud storage util error.
99    fn map_init_error<E: std::error::Error + Send + Sync + 'static>(error: E) -> UtilError {
100        UtilError::new(
101            UtilErrorType::CloudStorage,
102            error.to_string(),
103            Some(error.into()),
104        )
105    }
106}
107
108#[async_trait(?Send)]
109impl FileStore for GoogleCloudFileStore {
110    async fn upload(&self, path: &Path, file: Vec<u8>, mime_type: &str) -> UtilResult<()> {
111        self.storage_client
112            .write_object(
113                self.bucket_resource_name(),
114                path_to_str(path)?,
115                Bytes::from(file),
116            )
117            .set_content_type(mime_type)
118            .send_buffered()
119            .await?;
120        Ok(())
121    }
122
123    async fn download(&self, path: &Path) -> UtilResult<Vec<u8>> {
124        let mut reader = self
125            .storage_client
126            .read_object(self.bucket_resource_name(), path_to_str(path)?)
127            .send()
128            .await?;
129        let mut out = Vec::new();
130        while let Some(chunk) = reader.next().await.transpose()? {
131            out.extend_from_slice(&chunk);
132        }
133        Ok(out)
134    }
135
136    async fn get_direct_download_url(&self, path: &Path) -> UtilResult<String> {
137        let object_name = path_to_str(path)?;
138        // Keep old behavior: verify object exists before returning a URL.
139        self.control_client
140            .get_object()
141            .set_bucket(self.bucket_resource_name())
142            .set_object(object_name)
143            .send()
144            .await?;
145        let url = SignedUrlBuilder::for_object(self.bucket_resource_name(), object_name)
146            .with_method(google_cloud_storage::http::Method::GET)
147            .with_expiration(std::time::Duration::from_secs(300))
148            .sign_with(&self.signer)
149            .await
150            .map_err(|e| {
151                UtilError::new(UtilErrorType::CloudStorage, e.to_string(), Some(e.into()))
152            })?;
153        Ok(url)
154    }
155
156    async fn delete(&self, path: &Path) -> UtilResult<()> {
157        self.control_client
158            .delete_object()
159            .set_bucket(self.bucket_resource_name())
160            .set_object(path_to_str(path)?)
161            .send()
162            .await?;
163        Ok(())
164    }
165
166    async fn upload_stream(
167        &self,
168        path: &Path,
169        mut contents: GenericPayload,
170        mime_type: &str,
171    ) -> UtilResult<()> {
172        let mut collected = Vec::new();
173        while let Some(chunk) = contents.next().await {
174            let chunk = chunk
175                .map_err(|e| UtilError::new(UtilErrorType::CloudStorage, e.to_string(), None))?;
176            collected.extend_from_slice(&chunk);
177        }
178        self.storage_client
179            .write_object(
180                self.bucket_resource_name(),
181                path_to_str(path)?,
182                Bytes::from(collected),
183            )
184            .set_content_type(mime_type)
185            .send_buffered()
186            .await?;
187        Ok(())
188    }
189
190    async fn download_stream(
191        &self,
192        path: &Path,
193    ) -> UtilResult<Box<dyn futures::Stream<Item = std::io::Result<bytes::Bytes>>>> {
194        let mut reader = self
195            .storage_client
196            .read_object(self.bucket_resource_name(), path_to_str(path)?)
197            .send()
198            .await?;
199        let stream = async_stream::stream! {
200            while let Some(item) = reader.next().await {
201                yield item.map_err(io::Error::other);
202            }
203        };
204        Ok(Box::new(stream))
205    }
206
207    fn get_cache_files_folder_path(&self) -> UtilResult<&Path> {
208        Ok(&self.cache_files_path)
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use std::sync::Mutex;
216
217    static ENV_MUTEX: Mutex<()> = Mutex::new(());
218
219    struct EnvRestore {
220        google_application_credentials_json: Option<String>,
221        service_account_json: Option<String>,
222    }
223
224    impl EnvRestore {
225        fn capture() -> Self {
226            Self {
227                google_application_credentials_json: env::var(
228                    "GOOGLE_APPLICATION_CREDENTIALS_JSON",
229                )
230                .ok(),
231                service_account_json: env::var("SERVICE_ACCOUNT_JSON").ok(),
232            }
233        }
234    }
235
236    impl Drop for EnvRestore {
237        fn drop(&mut self) {
238            unsafe {
239                match &self.google_application_credentials_json {
240                    Some(value) => env::set_var("GOOGLE_APPLICATION_CREDENTIALS_JSON", value),
241                    None => env::remove_var("GOOGLE_APPLICATION_CREDENTIALS_JSON"),
242                }
243                match &self.service_account_json {
244                    Some(value) => env::set_var("SERVICE_ACCOUNT_JSON", value),
245                    None => env::remove_var("SERVICE_ACCOUNT_JSON"),
246                }
247            }
248        }
249    }
250
251    #[tokio::test]
252    async fn uses_google_application_credentials_json_contents() {
253        let signer = {
254            let _guard = ENV_MUTEX.lock().expect("environment mutex was poisoned");
255            let _restore = EnvRestore::capture();
256            unsafe {
257                env::set_var(
258                    "GOOGLE_APPLICATION_CREDENTIALS_JSON",
259                    r#"{
260                        "type": "service_account",
261                        "client_email": "json-creds@example.iam.gserviceaccount.com",
262                        "private_key_id": "test-private-key-id",
263                        "private_key": "not-a-real-private-key",
264                        "project_id": "test-project-id"
265                    }"#,
266                );
267                env::remove_var("SERVICE_ACCOUNT_JSON");
268            }
269
270            GoogleCloudFileStore::build_credentials_and_signer()
271                .expect("raw JSON credentials should build a signer")
272                .1
273        };
274
275        let client_email = signer
276            .client_email()
277            .await
278            .expect("service account signer should expose client email without IAM API");
279        assert_eq!(client_email, "json-creds@example.iam.gserviceaccount.com");
280    }
281}