headless_lms_chatbot/
azure_blob_storage.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::collections::HashMap;

use crate::prelude::*;
use anyhow::Context;
use azure_core::prelude::Metadata;
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::*;
use bytes::Bytes;
use futures::StreamExt;
use headless_lms_utils::{ApplicationConfiguration, AzureBlobStorageConfiguration};

/// A client for interacting with Azure Blob Storage.
pub struct AzureBlobClient {
    container_client: ContainerClient,
    pub container_name: String,
}

impl AzureBlobClient {
    pub async fn new(
        app_config: &ApplicationConfiguration,
        container_name_prefix: &str,
    ) -> anyhow::Result<Self> {
        let azure_configuration = app_config
            .azure_configuration
            .as_ref()
            .context("Azure configuration is missing")?;
        let AzureBlobStorageConfiguration {
            storage_account,
            access_key,
        } = azure_configuration
            .blob_storage_config
            .clone()
            .context("Azure Blob Storage configuration is missing")?;

        let container_name = format!("{}-chatbot", container_name_prefix);

        let storage_credentials = StorageCredentials::access_key(&storage_account, access_key);
        let blob_service_client = BlobServiceClient::new(storage_account, storage_credentials);
        let container_client = blob_service_client.container_client(container_name.clone());

        Ok(AzureBlobClient {
            container_client,
            container_name,
        })
    }

    /// Ensures the container used to store the blobs exists. If it does not, the container is created.
    pub async fn ensure_container_exists(&self) -> anyhow::Result<()> {
        if self.container_client.exists().await? {
            return Ok(());
        }

        info!(
            "Azure blob storage container '{}' does not exist. Creating...",
            self.container_client.container_name()
        );
        self.container_client
            .create()
            .public_access(PublicAccess::None)
            .await?;
        Ok(())
    }

    /// Uploads a file to the specified container.
    pub async fn upload_file(
        &self,
        blob_path: &str,
        file_bytes: &[u8],
        metadata: Option<HashMap<String, Bytes>>,
    ) -> anyhow::Result<()> {
        let blob_client = self.container_client.blob_client(blob_path);

        let mut put_blob = blob_client.put_block_blob(file_bytes.to_vec());

        if let Some(meta) = metadata {
            let mut m = Metadata::new();
            for (key, value) in meta {
                m.insert(key, value);
            }
            put_blob = put_blob.metadata(m);
        }

        put_blob.await?;

        info!("Blob '{}' uploaded successfully.", blob_path);
        Ok(())
    }

    /// Deletes a file (blob) from the specified container.
    pub async fn delete_file(&self, path: &str) -> anyhow::Result<()> {
        let blob_client = self.container_client.blob_client(path);

        blob_client.delete().await?;

        info!("Blob '{}' deleted successfully.", path);
        Ok(())
    }

    pub async fn list_files_with_prefix(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
        let mut result = Vec::new();
        let prefix_owned = prefix.to_string();
        let response = self.container_client.list_blobs().prefix(prefix_owned);
        let mut stream = response.into_stream();

        while let Some(list) = stream.next().await {
            let list = list?;
            let blobs: Vec<_> = list.blobs.blobs().collect();
            for blob in blobs {
                result.push(blob.name.clone());
            }
        }
        Ok(result)
    }
}