cloud_storage/client/
object.rs

1use futures_util::{stream, Stream, TryStream};
2use reqwest::StatusCode;
3
4use crate::{
5    error::GoogleResponse,
6    object::{percent_encode, ComposeRequest, ObjectList, RewriteResponse, SizedByteStream},
7    ListRequest, Object,
8};
9
10// Object uploads has its own url for some reason
11const BASE_URL: &str = "https://storage.googleapis.com/upload/storage/v1/b";
12
13/// Operations on [`Object`](Object)s.
14#[derive(Debug)]
15pub struct ObjectClient<'a>(pub(super) &'a super::Client);
16
17impl<'a> ObjectClient<'a> {
18    /// Create a new object.
19    /// Upload a file as that is loaded in memory to google cloud storage, where it will be
20    /// interpreted according to the mime type you specified.
21    /// ## Example
22    /// ```rust,no_run
23    /// # #[tokio::main]
24    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
25    /// # fn read_cute_cat(_in: &str) -> Vec<u8> { vec![0, 1] }
26    /// use cloud_storage::Client;
27    /// use cloud_storage::Object;
28    ///
29    /// let file: Vec<u8> = read_cute_cat("cat.png");
30    /// let client = Client::default();
31    /// client.object().create("cat-photos", file, "recently read cat.png", "image/png").await?;
32    /// # Ok(())
33    /// # }
34    /// ```
35    pub async fn create(
36        &self,
37        bucket: &str,
38        file: Vec<u8>,
39        filename: &str,
40        mime_type: &str,
41    ) -> crate::Result<Object> {
42        use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
43
44        let url = &format!(
45            "{}/{}/o?uploadType=media&name={}",
46            BASE_URL,
47            percent_encode(bucket),
48            percent_encode(filename),
49        );
50        let mut headers = self.0.get_headers().await?;
51        headers.insert(CONTENT_TYPE, mime_type.parse()?);
52        headers.insert(CONTENT_LENGTH, file.len().to_string().parse()?);
53        let response = self
54            .0
55            .client
56            .post(url)
57            .headers(headers)
58            .body(file)
59            .send()
60            .await?;
61        if response.status() == 200 {
62            Ok(serde_json::from_str(&response.text().await?)?)
63        } else {
64            Err(crate::Error::new(&response.text().await?))
65        }
66    }
67
68    /// Create a new object. This works in the same way as `ObjectClient::create`, except it does not need
69    /// to load the entire file in ram.
70    /// ## Example
71    /// ```rust,no_run
72    /// # #[tokio::main]
73    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    /// use cloud_storage::Client;
75    /// use cloud_storage::Object;
76    ///
77    /// let client = Client::default();
78    /// let file = reqwest::Client::new()
79    ///     .get("https://my_domain.rs/nice_cat_photo.png")
80    ///     .send()
81    ///     .await?
82    ///     .bytes_stream();
83    /// client.object().create_streamed("cat-photos", file, 10, "recently read cat.png", "image/png").await?;
84    /// # Ok(())
85    /// # }
86    /// ```
87    pub async fn create_streamed<S>(
88        &self,
89        bucket: &str,
90        stream: S,
91        length: impl Into<Option<u64>>,
92        filename: &str,
93        mime_type: &str,
94    ) -> crate::Result<Object>
95    where
96        S: TryStream + Send + Sync + 'static,
97        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
98        bytes::Bytes: From<S::Ok>,
99    {
100        use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
101
102        let url = &format!(
103            "{}/{}/o?uploadType=media&name={}",
104            BASE_URL,
105            percent_encode(bucket),
106            percent_encode(filename),
107        );
108        let mut headers = self.0.get_headers().await?;
109        headers.insert(CONTENT_TYPE, mime_type.parse()?);
110        if let Some(length) = length.into() {
111            headers.insert(CONTENT_LENGTH, length.into());
112        }
113
114        let body = reqwest::Body::wrap_stream(stream);
115        let response = self
116            .0
117            .client
118            .post(url)
119            .headers(headers)
120            .body(body)
121            .send()
122            .await?;
123        if response.status() == 200 {
124            Ok(serde_json::from_str(&response.text().await?)?)
125        } else {
126            Err(crate::Error::new(&response.text().await?))
127        }
128    }
129
130    /// Obtain a list of objects within this Bucket.
131    /// ### Example
132    /// ```no_run
133    /// # #[tokio::main]
134    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
135    /// use cloud_storage::Client;
136    /// use cloud_storage::{Object, ListRequest};
137    ///
138    /// let client = Client::default();
139    /// let all_objects = client.object().list("my_bucket", ListRequest::default()).await?;
140    /// # Ok(())
141    /// # }
142    /// ```
143    pub async fn list(
144        &self,
145        bucket: &'a str,
146        list_request: ListRequest,
147    ) -> crate::Result<impl Stream<Item = crate::Result<ObjectList>> + 'a> {
148        enum ListState {
149            Start(ListRequest),
150            HasMore(ListRequest),
151            Done,
152        }
153        use ListState::*;
154        impl ListState {
155            fn into_has_more(self) -> Option<ListState> {
156                match self {
157                    Start(req) | HasMore(req) => Some(HasMore(req)),
158                    Done => None,
159                }
160            }
161
162            fn req_mut(&mut self) -> Option<&mut ListRequest> {
163                match self {
164                    Start(ref mut req) | HasMore(ref mut req) => Some(req),
165                    Done => None,
166                }
167            }
168        }
169
170        let client = self.0;
171
172        Ok(stream::unfold(
173            ListState::Start(list_request),
174            move |mut state| async move {
175                let url = format!("{}/b/{}/o", crate::BASE_URL, percent_encode(bucket));
176                let headers = match client.get_headers().await {
177                    Ok(h) => h,
178                    Err(e) => return Some((Err(e), state)),
179                };
180                let req = state.req_mut()?;
181                if req.max_results == Some(0) {
182                    return None;
183                }
184
185                let response = client
186                    .client
187                    .get(&url)
188                    .query(req)
189                    .headers(headers)
190                    .send()
191                    .await;
192
193                let response = match response {
194                    Ok(r) if r.status() == 200 => r,
195                    Ok(r) => {
196                        let e = match r.json::<crate::error::GoogleErrorResponse>().await {
197                            Ok(err_res) => err_res.into(),
198                            Err(serde_err) => serde_err.into(),
199                        };
200                        return Some((Err(e), state));
201                    }
202                    Err(e) => return Some((Err(e.into()), state)),
203                };
204
205                let result: GoogleResponse<ObjectList> = match response.json().await {
206                    Ok(json) => json,
207                    Err(e) => return Some((Err(e.into()), state)),
208                };
209
210                let response_body = match result {
211                    GoogleResponse::Success(success) => success,
212                    GoogleResponse::Error(e) => return Some((Err(e.into()), state)),
213                };
214
215                let next_state = if let Some(ref page_token) = response_body.next_page_token {
216                    req.page_token = Some(page_token.clone());
217                    req.max_results = req
218                        .max_results
219                        .map(|rem| rem.saturating_sub(response_body.items.len()));
220                    state.into_has_more()?
221                } else {
222                    Done
223                };
224
225                Some((Ok(response_body), next_state))
226            },
227        ))
228    }
229
230    /// Obtains a single object with the specified name in the specified bucket.
231    /// ### Example
232    /// ```no_run
233    /// # #[tokio::main]
234    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
235    /// use cloud_storage::Client;
236    /// use cloud_storage::Object;
237    ///
238    /// let client = Client::default();
239    /// let object = client.object().read("my_bucket", "path/to/my/file.png").await?;
240    /// # Ok(())
241    /// # }
242    /// ```
243    pub async fn read(&self, bucket: &str, file_name: &str) -> crate::Result<Object> {
244        let url = format!(
245            "{}/b/{}/o/{}",
246            crate::BASE_URL,
247            percent_encode(bucket),
248            percent_encode(file_name),
249        );
250        let result: GoogleResponse<Object> = self
251            .0
252            .client
253            .get(&url)
254            .headers(self.0.get_headers().await?)
255            .send()
256            .await?
257            .json()
258            .await?;
259        match result {
260            GoogleResponse::Success(s) => Ok(s),
261            GoogleResponse::Error(e) => Err(e.into()),
262        }
263    }
264
265    /// Download the content of the object with the specified name in the specified bucket.
266    /// ### Example
267    /// ```no_run
268    /// # #[tokio::main]
269    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270    /// use cloud_storage::Client;
271    /// use cloud_storage::Object;
272    ///
273    /// let client = Client::default();
274    /// let bytes = client.object().download("my_bucket", "path/to/my/file.png").await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    pub async fn download(&self, bucket: &str, file_name: &str) -> crate::Result<Vec<u8>> {
279        let url = format!(
280            "{}/b/{}/o/{}?alt=media",
281            crate::BASE_URL,
282            percent_encode(bucket),
283            percent_encode(file_name),
284        );
285        let resp = self
286            .0
287            .client
288            .get(&url)
289            .headers(self.0.get_headers().await?)
290            .send()
291            .await?;
292        if resp.status() == StatusCode::NOT_FOUND {
293            Err(crate::Error::Other(resp.text().await?))
294        } else {
295            Ok(resp.error_for_status()?.bytes().await?.to_vec())
296        }
297    }
298
299    /// Download the content of the object with the specified name in the specified bucket, without
300    /// allocating the whole file into a vector.
301    /// ### Example
302    /// ```no_run
303    /// # #[tokio::main]
304    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
305    /// use cloud_storage::Client;
306    /// use cloud_storage::Object;
307    /// use futures_util::stream::StreamExt;
308    /// use tokio::fs::File;
309    /// use tokio::io::{AsyncWriteExt, BufWriter};
310    ///
311    /// let client = Client::default();
312    /// let mut stream = client.object().download_streamed("my_bucket", "path/to/my/file.png").await?;
313    /// let mut file = BufWriter::new(File::create("file.png").await.unwrap());
314    /// while let Some(byte) = stream.next().await {
315    ///     file.write_all(&[byte.unwrap()]).await.unwrap();
316    /// }
317    /// file.flush().await?;
318    /// # Ok(())
319    /// # }
320    /// ```
321    pub async fn download_streamed(
322        &self,
323        bucket: &str,
324        file_name: &str,
325    ) -> crate::Result<impl Stream<Item = crate::Result<u8>> + Unpin> {
326        use futures_util::{StreamExt, TryStreamExt};
327        let url = format!(
328            "{}/b/{}/o/{}?alt=media",
329            crate::BASE_URL,
330            percent_encode(bucket),
331            percent_encode(file_name),
332        );
333        let response = self
334            .0
335            .client
336            .get(&url)
337            .headers(self.0.get_headers().await?)
338            .send()
339            .await?
340            .error_for_status()?;
341        let size = response.content_length();
342        let bytes = response
343            .bytes_stream()
344            .map(|chunk| chunk.map(|c| futures_util::stream::iter(c.into_iter().map(Ok))))
345            .try_flatten();
346        Ok(SizedByteStream::new(bytes, size))
347    }
348
349    /// Updates a single object with the specified name in the specified bucket with the new
350    /// information in `object`.
351    ///
352    /// Note that if the `name` or `bucket` fields are changed, the object will not be found.
353    /// See [`rewrite`] or [`copy`] for similar operations.
354    /// ### Example
355    /// ```no_run
356    /// # #[tokio::main]
357    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
358    /// use cloud_storage::Client;
359    /// use cloud_storage::Object;
360    ///
361    /// let client = Client::default();
362    /// let mut object = client.object().read("my_bucket", "path/to/my/file.png").await?;
363    /// object.content_type = Some("application/xml".to_string());
364    /// client.object().update(&object).await?;
365    /// # Ok(())
366    /// # }
367    /// ```
368    pub async fn update(&self, object: &Object) -> crate::Result<Object> {
369        let url = format!(
370            "{}/b/{}/o/{}",
371            crate::BASE_URL,
372            percent_encode(&object.bucket),
373            percent_encode(&object.name),
374        );
375        let result: GoogleResponse<Object> = self
376            .0
377            .client
378            .put(&url)
379            .headers(self.0.get_headers().await?)
380            .json(&object)
381            .send()
382            .await?
383            .json()
384            .await?;
385        match result {
386            GoogleResponse::Success(s) => Ok(s),
387            GoogleResponse::Error(e) => Err(e.into()),
388        }
389    }
390
391    /// Deletes a single object with the specified name in the specified bucket.
392    /// ### Example
393    /// ```no_run
394    /// # #[tokio::main]
395    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
396    /// use cloud_storage::Client;
397    /// use cloud_storage::Object;
398    ///
399    /// let client = Client::default();
400    /// client.object().delete("my_bucket", "path/to/my/file.png").await?;
401    /// # Ok(())
402    /// # }
403    /// ```
404    pub async fn delete(&self, bucket: &str, file_name: &str) -> crate::Result<()> {
405        let url = format!(
406            "{}/b/{}/o/{}",
407            crate::BASE_URL,
408            percent_encode(bucket),
409            percent_encode(file_name),
410        );
411        let response = self
412            .0
413            .client
414            .delete(&url)
415            .headers(self.0.get_headers().await?)
416            .send()
417            .await?;
418        if response.status().is_success() {
419            Ok(())
420        } else {
421            Err(crate::Error::Google(response.json().await?))
422        }
423    }
424
425    /// Concatenates the contents of multiple objects into one.
426    /// ### Example
427    /// ```no_run
428    /// # #[tokio::main]
429    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
430    /// use cloud_storage::Client;
431    /// use cloud_storage::object::{Object, ComposeRequest, SourceObject};
432    ///
433    /// let client = Client::default();
434    /// let obj1 = client.object().read("my_bucket", "file1").await?;
435    /// let obj2 = client.object().read("my_bucket", "file2").await?;
436    /// let compose_request = ComposeRequest {
437    ///     kind: "storage#composeRequest".to_string(),
438    ///     source_objects: vec![
439    ///         SourceObject {
440    ///             name: obj1.name.clone(),
441    ///             generation: None,
442    ///             object_preconditions: None,
443    ///         },
444    ///         SourceObject {
445    ///             name: obj2.name.clone(),
446    ///             generation: None,
447    ///             object_preconditions: None,
448    ///         },
449    ///     ],
450    ///     destination: None,
451    /// };
452    /// let obj3 = client.object().compose("my_bucket", &compose_request, "test-concatted-file").await?;
453    /// // obj3 is now a file with the content of obj1 and obj2 concatted together.
454    /// # Ok(())
455    /// # }
456    /// ```
457    pub async fn compose(
458        &self,
459        bucket: &str,
460        req: &ComposeRequest,
461        destination_object: &str,
462    ) -> crate::Result<Object> {
463        let url = format!(
464            "{}/b/{}/o/{}/compose",
465            crate::BASE_URL,
466            percent_encode(bucket),
467            percent_encode(destination_object)
468        );
469        let result: GoogleResponse<Object> = self
470            .0
471            .client
472            .post(&url)
473            .headers(self.0.get_headers().await?)
474            .json(req)
475            .send()
476            .await?
477            .json()
478            .await?;
479        match result {
480            GoogleResponse::Success(s) => Ok(s),
481            GoogleResponse::Error(e) => Err(e.into()),
482        }
483    }
484
485    /// Copy this object to the target bucket and path.
486    /// ### Example
487    /// ```no_run
488    /// # #[tokio::main]
489    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
490    /// use cloud_storage::Client;
491    /// use cloud_storage::object::{Object, ComposeRequest};
492    ///
493    /// let client = Client::default();
494    /// let obj1 = client.object().read("my_bucket", "file1").await?;
495    /// let obj2 = client.object().copy(&obj1, "my_other_bucket", "file2").await?;
496    /// // obj2 is now a copy of obj1.
497    /// # Ok(())
498    /// # }
499    /// ```
500    pub async fn copy(
501        &self,
502        object: &Object,
503        destination_bucket: &str,
504        path: &str,
505    ) -> crate::Result<Object> {
506        use reqwest::header::CONTENT_LENGTH;
507
508        let url = format!(
509            "{base}/b/{sBucket}/o/{sObject}/copyTo/b/{dBucket}/o/{dObject}",
510            base = crate::BASE_URL,
511            sBucket = percent_encode(&object.bucket),
512            sObject = percent_encode(&object.name),
513            dBucket = percent_encode(destination_bucket),
514            dObject = percent_encode(path),
515        );
516        let mut headers = self.0.get_headers().await?;
517        headers.insert(CONTENT_LENGTH, "0".parse()?);
518        let result: GoogleResponse<Object> = self
519            .0
520            .client
521            .post(&url)
522            .headers(headers)
523            .send()
524            .await?
525            .json()
526            .await?;
527        match result {
528            GoogleResponse::Success(s) => Ok(s),
529            GoogleResponse::Error(e) => Err(e.into()),
530        }
531    }
532
533    /// Moves a file from the current location to the target bucket and path.
534    ///
535    /// ## Limitations
536    /// This function does not yet support rewriting objects to another
537    /// * Geographical Location,
538    /// * Encryption,
539    /// * Storage class.
540    /// These limitations mean that for now, the rewrite and the copy methods do the same thing.
541    /// ### Example
542    /// ```no_run
543    /// # #[tokio::main]
544    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
545    /// use cloud_storage::Client;
546    /// use cloud_storage::object::Object;
547    ///
548    /// let client = Client::default();
549    /// let obj1 = client.object().read("my_bucket", "file1").await?;
550    /// let obj2 = client.object().rewrite(&obj1, "my_other_bucket", "file2").await?;
551    /// // obj2 is now a copy of obj1.
552    /// # Ok(())
553    /// # }
554    /// ```
555    pub async fn rewrite(
556        &self,
557        object: &Object,
558        destination_bucket: &str,
559        path: &str,
560    ) -> crate::Result<Object> {
561        use reqwest::header::CONTENT_LENGTH;
562
563        let url = format!(
564            "{base}/b/{sBucket}/o/{sObject}/rewriteTo/b/{dBucket}/o/{dObject}",
565            base = crate::BASE_URL,
566            sBucket = percent_encode(&object.bucket),
567            sObject = percent_encode(&object.name),
568            dBucket = percent_encode(destination_bucket),
569            dObject = percent_encode(path),
570        );
571        let mut headers = self.0.get_headers().await?;
572        headers.insert(CONTENT_LENGTH, "0".parse()?);
573        let s = self
574            .0
575            .client
576            .post(&url)
577            .headers(headers)
578            .send()
579            .await?
580            .text()
581            .await?;
582
583        let result: RewriteResponse = serde_json::from_str(&s).unwrap();
584        Ok(result.resource)
585        // match result {
586        // GoogleResponse::Success(s) => Ok(s.resource),
587        // GoogleResponse::Error(e) => Err(e.into()),
588        // }
589    }
590}