cloud_storage/client/object.rs
1use futures_util::{stream, Stream, TryStream};
2use reqwest::StatusCode;
3
4use crate::{
5 error::GoogleResponse,
6 object::{percent_encode, ComposeRequest, ObjectList, RewriteResponse, SizedByteStream},
7 ListRequest, Object,
8};
9
10// Object uploads has its own url for some reason
11const BASE_URL: &str = "https://storage.googleapis.com/upload/storage/v1/b";
12
13/// Operations on [`Object`](Object)s.
14#[derive(Debug)]
15pub struct ObjectClient<'a>(pub(super) &'a super::Client);
16
17impl<'a> ObjectClient<'a> {
18 /// Create a new object.
19 /// Upload a file as that is loaded in memory to google cloud storage, where it will be
20 /// interpreted according to the mime type you specified.
21 /// ## Example
22 /// ```rust,no_run
23 /// # #[tokio::main]
24 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
25 /// # fn read_cute_cat(_in: &str) -> Vec<u8> { vec![0, 1] }
26 /// use cloud_storage::Client;
27 /// use cloud_storage::Object;
28 ///
29 /// let file: Vec<u8> = read_cute_cat("cat.png");
30 /// let client = Client::default();
31 /// client.object().create("cat-photos", file, "recently read cat.png", "image/png").await?;
32 /// # Ok(())
33 /// # }
34 /// ```
35 pub async fn create(
36 &self,
37 bucket: &str,
38 file: Vec<u8>,
39 filename: &str,
40 mime_type: &str,
41 ) -> crate::Result<Object> {
42 use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
43
44 let url = &format!(
45 "{}/{}/o?uploadType=media&name={}",
46 BASE_URL,
47 percent_encode(bucket),
48 percent_encode(filename),
49 );
50 let mut headers = self.0.get_headers().await?;
51 headers.insert(CONTENT_TYPE, mime_type.parse()?);
52 headers.insert(CONTENT_LENGTH, file.len().to_string().parse()?);
53 let response = self
54 .0
55 .client
56 .post(url)
57 .headers(headers)
58 .body(file)
59 .send()
60 .await?;
61 if response.status() == 200 {
62 Ok(serde_json::from_str(&response.text().await?)?)
63 } else {
64 Err(crate::Error::new(&response.text().await?))
65 }
66 }
67
68 /// Create a new object. This works in the same way as `ObjectClient::create`, except it does not need
69 /// to load the entire file in ram.
70 /// ## Example
71 /// ```rust,no_run
72 /// # #[tokio::main]
73 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 /// use cloud_storage::Client;
75 /// use cloud_storage::Object;
76 ///
77 /// let client = Client::default();
78 /// let file = reqwest::Client::new()
79 /// .get("https://my_domain.rs/nice_cat_photo.png")
80 /// .send()
81 /// .await?
82 /// .bytes_stream();
83 /// client.object().create_streamed("cat-photos", file, 10, "recently read cat.png", "image/png").await?;
84 /// # Ok(())
85 /// # }
86 /// ```
87 pub async fn create_streamed<S>(
88 &self,
89 bucket: &str,
90 stream: S,
91 length: impl Into<Option<u64>>,
92 filename: &str,
93 mime_type: &str,
94 ) -> crate::Result<Object>
95 where
96 S: TryStream + Send + Sync + 'static,
97 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
98 bytes::Bytes: From<S::Ok>,
99 {
100 use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE};
101
102 let url = &format!(
103 "{}/{}/o?uploadType=media&name={}",
104 BASE_URL,
105 percent_encode(bucket),
106 percent_encode(filename),
107 );
108 let mut headers = self.0.get_headers().await?;
109 headers.insert(CONTENT_TYPE, mime_type.parse()?);
110 if let Some(length) = length.into() {
111 headers.insert(CONTENT_LENGTH, length.into());
112 }
113
114 let body = reqwest::Body::wrap_stream(stream);
115 let response = self
116 .0
117 .client
118 .post(url)
119 .headers(headers)
120 .body(body)
121 .send()
122 .await?;
123 if response.status() == 200 {
124 Ok(serde_json::from_str(&response.text().await?)?)
125 } else {
126 Err(crate::Error::new(&response.text().await?))
127 }
128 }
129
130 /// Obtain a list of objects within this Bucket.
131 /// ### Example
132 /// ```no_run
133 /// # #[tokio::main]
134 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
135 /// use cloud_storage::Client;
136 /// use cloud_storage::{Object, ListRequest};
137 ///
138 /// let client = Client::default();
139 /// let all_objects = client.object().list("my_bucket", ListRequest::default()).await?;
140 /// # Ok(())
141 /// # }
142 /// ```
143 pub async fn list(
144 &self,
145 bucket: &'a str,
146 list_request: ListRequest,
147 ) -> crate::Result<impl Stream<Item = crate::Result<ObjectList>> + 'a> {
148 enum ListState {
149 Start(ListRequest),
150 HasMore(ListRequest),
151 Done,
152 }
153 use ListState::*;
154 impl ListState {
155 fn into_has_more(self) -> Option<ListState> {
156 match self {
157 Start(req) | HasMore(req) => Some(HasMore(req)),
158 Done => None,
159 }
160 }
161
162 fn req_mut(&mut self) -> Option<&mut ListRequest> {
163 match self {
164 Start(ref mut req) | HasMore(ref mut req) => Some(req),
165 Done => None,
166 }
167 }
168 }
169
170 let client = self.0;
171
172 Ok(stream::unfold(
173 ListState::Start(list_request),
174 move |mut state| async move {
175 let url = format!("{}/b/{}/o", crate::BASE_URL, percent_encode(bucket));
176 let headers = match client.get_headers().await {
177 Ok(h) => h,
178 Err(e) => return Some((Err(e), state)),
179 };
180 let req = state.req_mut()?;
181 if req.max_results == Some(0) {
182 return None;
183 }
184
185 let response = client
186 .client
187 .get(&url)
188 .query(req)
189 .headers(headers)
190 .send()
191 .await;
192
193 let response = match response {
194 Ok(r) if r.status() == 200 => r,
195 Ok(r) => {
196 let e = match r.json::<crate::error::GoogleErrorResponse>().await {
197 Ok(err_res) => err_res.into(),
198 Err(serde_err) => serde_err.into(),
199 };
200 return Some((Err(e), state));
201 }
202 Err(e) => return Some((Err(e.into()), state)),
203 };
204
205 let result: GoogleResponse<ObjectList> = match response.json().await {
206 Ok(json) => json,
207 Err(e) => return Some((Err(e.into()), state)),
208 };
209
210 let response_body = match result {
211 GoogleResponse::Success(success) => success,
212 GoogleResponse::Error(e) => return Some((Err(e.into()), state)),
213 };
214
215 let next_state = if let Some(ref page_token) = response_body.next_page_token {
216 req.page_token = Some(page_token.clone());
217 req.max_results = req
218 .max_results
219 .map(|rem| rem.saturating_sub(response_body.items.len()));
220 state.into_has_more()?
221 } else {
222 Done
223 };
224
225 Some((Ok(response_body), next_state))
226 },
227 ))
228 }
229
230 /// Obtains a single object with the specified name in the specified bucket.
231 /// ### Example
232 /// ```no_run
233 /// # #[tokio::main]
234 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
235 /// use cloud_storage::Client;
236 /// use cloud_storage::Object;
237 ///
238 /// let client = Client::default();
239 /// let object = client.object().read("my_bucket", "path/to/my/file.png").await?;
240 /// # Ok(())
241 /// # }
242 /// ```
243 pub async fn read(&self, bucket: &str, file_name: &str) -> crate::Result<Object> {
244 let url = format!(
245 "{}/b/{}/o/{}",
246 crate::BASE_URL,
247 percent_encode(bucket),
248 percent_encode(file_name),
249 );
250 let result: GoogleResponse<Object> = self
251 .0
252 .client
253 .get(&url)
254 .headers(self.0.get_headers().await?)
255 .send()
256 .await?
257 .json()
258 .await?;
259 match result {
260 GoogleResponse::Success(s) => Ok(s),
261 GoogleResponse::Error(e) => Err(e.into()),
262 }
263 }
264
265 /// Download the content of the object with the specified name in the specified bucket.
266 /// ### Example
267 /// ```no_run
268 /// # #[tokio::main]
269 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
270 /// use cloud_storage::Client;
271 /// use cloud_storage::Object;
272 ///
273 /// let client = Client::default();
274 /// let bytes = client.object().download("my_bucket", "path/to/my/file.png").await?;
275 /// # Ok(())
276 /// # }
277 /// ```
278 pub async fn download(&self, bucket: &str, file_name: &str) -> crate::Result<Vec<u8>> {
279 let url = format!(
280 "{}/b/{}/o/{}?alt=media",
281 crate::BASE_URL,
282 percent_encode(bucket),
283 percent_encode(file_name),
284 );
285 let resp = self
286 .0
287 .client
288 .get(&url)
289 .headers(self.0.get_headers().await?)
290 .send()
291 .await?;
292 if resp.status() == StatusCode::NOT_FOUND {
293 Err(crate::Error::Other(resp.text().await?))
294 } else {
295 Ok(resp.error_for_status()?.bytes().await?.to_vec())
296 }
297 }
298
299 /// Download the content of the object with the specified name in the specified bucket, without
300 /// allocating the whole file into a vector.
301 /// ### Example
302 /// ```no_run
303 /// # #[tokio::main]
304 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
305 /// use cloud_storage::Client;
306 /// use cloud_storage::Object;
307 /// use futures_util::stream::StreamExt;
308 /// use tokio::fs::File;
309 /// use tokio::io::{AsyncWriteExt, BufWriter};
310 ///
311 /// let client = Client::default();
312 /// let mut stream = client.object().download_streamed("my_bucket", "path/to/my/file.png").await?;
313 /// let mut file = BufWriter::new(File::create("file.png").await.unwrap());
314 /// while let Some(byte) = stream.next().await {
315 /// file.write_all(&[byte.unwrap()]).await.unwrap();
316 /// }
317 /// file.flush().await?;
318 /// # Ok(())
319 /// # }
320 /// ```
321 pub async fn download_streamed(
322 &self,
323 bucket: &str,
324 file_name: &str,
325 ) -> crate::Result<impl Stream<Item = crate::Result<u8>> + Unpin> {
326 use futures_util::{StreamExt, TryStreamExt};
327 let url = format!(
328 "{}/b/{}/o/{}?alt=media",
329 crate::BASE_URL,
330 percent_encode(bucket),
331 percent_encode(file_name),
332 );
333 let response = self
334 .0
335 .client
336 .get(&url)
337 .headers(self.0.get_headers().await?)
338 .send()
339 .await?
340 .error_for_status()?;
341 let size = response.content_length();
342 let bytes = response
343 .bytes_stream()
344 .map(|chunk| chunk.map(|c| futures_util::stream::iter(c.into_iter().map(Ok))))
345 .try_flatten();
346 Ok(SizedByteStream::new(bytes, size))
347 }
348
349 /// Updates a single object with the specified name in the specified bucket with the new
350 /// information in `object`.
351 ///
352 /// Note that if the `name` or `bucket` fields are changed, the object will not be found.
353 /// See [`rewrite`] or [`copy`] for similar operations.
354 /// ### Example
355 /// ```no_run
356 /// # #[tokio::main]
357 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
358 /// use cloud_storage::Client;
359 /// use cloud_storage::Object;
360 ///
361 /// let client = Client::default();
362 /// let mut object = client.object().read("my_bucket", "path/to/my/file.png").await?;
363 /// object.content_type = Some("application/xml".to_string());
364 /// client.object().update(&object).await?;
365 /// # Ok(())
366 /// # }
367 /// ```
368 pub async fn update(&self, object: &Object) -> crate::Result<Object> {
369 let url = format!(
370 "{}/b/{}/o/{}",
371 crate::BASE_URL,
372 percent_encode(&object.bucket),
373 percent_encode(&object.name),
374 );
375 let result: GoogleResponse<Object> = self
376 .0
377 .client
378 .put(&url)
379 .headers(self.0.get_headers().await?)
380 .json(&object)
381 .send()
382 .await?
383 .json()
384 .await?;
385 match result {
386 GoogleResponse::Success(s) => Ok(s),
387 GoogleResponse::Error(e) => Err(e.into()),
388 }
389 }
390
391 /// Deletes a single object with the specified name in the specified bucket.
392 /// ### Example
393 /// ```no_run
394 /// # #[tokio::main]
395 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
396 /// use cloud_storage::Client;
397 /// use cloud_storage::Object;
398 ///
399 /// let client = Client::default();
400 /// client.object().delete("my_bucket", "path/to/my/file.png").await?;
401 /// # Ok(())
402 /// # }
403 /// ```
404 pub async fn delete(&self, bucket: &str, file_name: &str) -> crate::Result<()> {
405 let url = format!(
406 "{}/b/{}/o/{}",
407 crate::BASE_URL,
408 percent_encode(bucket),
409 percent_encode(file_name),
410 );
411 let response = self
412 .0
413 .client
414 .delete(&url)
415 .headers(self.0.get_headers().await?)
416 .send()
417 .await?;
418 if response.status().is_success() {
419 Ok(())
420 } else {
421 Err(crate::Error::Google(response.json().await?))
422 }
423 }
424
425 /// Concatenates the contents of multiple objects into one.
426 /// ### Example
427 /// ```no_run
428 /// # #[tokio::main]
429 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
430 /// use cloud_storage::Client;
431 /// use cloud_storage::object::{Object, ComposeRequest, SourceObject};
432 ///
433 /// let client = Client::default();
434 /// let obj1 = client.object().read("my_bucket", "file1").await?;
435 /// let obj2 = client.object().read("my_bucket", "file2").await?;
436 /// let compose_request = ComposeRequest {
437 /// kind: "storage#composeRequest".to_string(),
438 /// source_objects: vec![
439 /// SourceObject {
440 /// name: obj1.name.clone(),
441 /// generation: None,
442 /// object_preconditions: None,
443 /// },
444 /// SourceObject {
445 /// name: obj2.name.clone(),
446 /// generation: None,
447 /// object_preconditions: None,
448 /// },
449 /// ],
450 /// destination: None,
451 /// };
452 /// let obj3 = client.object().compose("my_bucket", &compose_request, "test-concatted-file").await?;
453 /// // obj3 is now a file with the content of obj1 and obj2 concatted together.
454 /// # Ok(())
455 /// # }
456 /// ```
457 pub async fn compose(
458 &self,
459 bucket: &str,
460 req: &ComposeRequest,
461 destination_object: &str,
462 ) -> crate::Result<Object> {
463 let url = format!(
464 "{}/b/{}/o/{}/compose",
465 crate::BASE_URL,
466 percent_encode(bucket),
467 percent_encode(destination_object)
468 );
469 let result: GoogleResponse<Object> = self
470 .0
471 .client
472 .post(&url)
473 .headers(self.0.get_headers().await?)
474 .json(req)
475 .send()
476 .await?
477 .json()
478 .await?;
479 match result {
480 GoogleResponse::Success(s) => Ok(s),
481 GoogleResponse::Error(e) => Err(e.into()),
482 }
483 }
484
485 /// Copy this object to the target bucket and path.
486 /// ### Example
487 /// ```no_run
488 /// # #[tokio::main]
489 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
490 /// use cloud_storage::Client;
491 /// use cloud_storage::object::{Object, ComposeRequest};
492 ///
493 /// let client = Client::default();
494 /// let obj1 = client.object().read("my_bucket", "file1").await?;
495 /// let obj2 = client.object().copy(&obj1, "my_other_bucket", "file2").await?;
496 /// // obj2 is now a copy of obj1.
497 /// # Ok(())
498 /// # }
499 /// ```
500 pub async fn copy(
501 &self,
502 object: &Object,
503 destination_bucket: &str,
504 path: &str,
505 ) -> crate::Result<Object> {
506 use reqwest::header::CONTENT_LENGTH;
507
508 let url = format!(
509 "{base}/b/{sBucket}/o/{sObject}/copyTo/b/{dBucket}/o/{dObject}",
510 base = crate::BASE_URL,
511 sBucket = percent_encode(&object.bucket),
512 sObject = percent_encode(&object.name),
513 dBucket = percent_encode(destination_bucket),
514 dObject = percent_encode(path),
515 );
516 let mut headers = self.0.get_headers().await?;
517 headers.insert(CONTENT_LENGTH, "0".parse()?);
518 let result: GoogleResponse<Object> = self
519 .0
520 .client
521 .post(&url)
522 .headers(headers)
523 .send()
524 .await?
525 .json()
526 .await?;
527 match result {
528 GoogleResponse::Success(s) => Ok(s),
529 GoogleResponse::Error(e) => Err(e.into()),
530 }
531 }
532
533 /// Moves a file from the current location to the target bucket and path.
534 ///
535 /// ## Limitations
536 /// This function does not yet support rewriting objects to another
537 /// * Geographical Location,
538 /// * Encryption,
539 /// * Storage class.
540 /// These limitations mean that for now, the rewrite and the copy methods do the same thing.
541 /// ### Example
542 /// ```no_run
543 /// # #[tokio::main]
544 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
545 /// use cloud_storage::Client;
546 /// use cloud_storage::object::Object;
547 ///
548 /// let client = Client::default();
549 /// let obj1 = client.object().read("my_bucket", "file1").await?;
550 /// let obj2 = client.object().rewrite(&obj1, "my_other_bucket", "file2").await?;
551 /// // obj2 is now a copy of obj1.
552 /// # Ok(())
553 /// # }
554 /// ```
555 pub async fn rewrite(
556 &self,
557 object: &Object,
558 destination_bucket: &str,
559 path: &str,
560 ) -> crate::Result<Object> {
561 use reqwest::header::CONTENT_LENGTH;
562
563 let url = format!(
564 "{base}/b/{sBucket}/o/{sObject}/rewriteTo/b/{dBucket}/o/{dObject}",
565 base = crate::BASE_URL,
566 sBucket = percent_encode(&object.bucket),
567 sObject = percent_encode(&object.name),
568 dBucket = percent_encode(destination_bucket),
569 dObject = percent_encode(path),
570 );
571 let mut headers = self.0.get_headers().await?;
572 headers.insert(CONTENT_LENGTH, "0".parse()?);
573 let s = self
574 .0
575 .client
576 .post(&url)
577 .headers(headers)
578 .send()
579 .await?
580 .text()
581 .await?;
582
583 let result: RewriteResponse = serde_json::from_str(&s).unwrap();
584 Ok(result.resource)
585 // match result {
586 // GoogleResponse::Success(s) => Ok(s.resource),
587 // GoogleResponse::Error(e) => Err(e.into()),
588 // }
589 }
590}