headless_lms_chatbot/
azure_search_indexer.rs

1use crate::prelude::*;
2use serde_json::json;
3
4const API_VERSION: &str = "2024-07-01";
5
6#[derive(Debug, Deserialize)]
7struct IndexerStatusResponse {
8    pub status: String,
9    #[serde(rename = "lastResult")]
10    pub last_result: Option<LastResult>,
11}
12
13#[derive(Debug, Deserialize)]
14struct LastResult {
15    pub status: String,
16    pub errors: Vec<IndexerError>,
17    pub warnings: Vec<IndexerWarning>,
18}
19
20#[derive(Debug, Deserialize)]
21struct IndexerError {
22    pub key: Option<String>,
23    pub name: Option<String>,
24    pub message: Option<String>,
25    pub details: Option<String>,
26    #[serde(rename = "documentationLink")]
27    pub documentation_link: Option<String>,
28}
29
30#[derive(Debug, Deserialize)]
31struct IndexerWarning {
32    key: Option<String>,
33    name: Option<String>,
34    message: Option<String>,
35    details: Option<String>,
36    #[serde(rename = "documentationLink")]
37    documentation_link: Option<String>,
38}
39
40pub async fn does_search_indexer_exist(
41    indexer_name: &str,
42    app_config: &ApplicationConfiguration,
43) -> anyhow::Result<bool> {
44    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
45        anyhow::anyhow!("Azure configuration is missing from the application configuration")
46    })?;
47
48    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
49        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
50    })?;
51    let mut url = search_config.search_endpoint.clone();
52    url.set_path(&format!("indexers('{}')", indexer_name));
53    url.set_query(Some(&format!("api-version={}", API_VERSION)));
54
55    let response = REQWEST_CLIENT
56        .get(url)
57        .header("Content-Type", "application/json")
58        .header("api-key", search_config.search_api_key.clone())
59        .send()
60        .await?;
61
62    if response.status().is_success() {
63        Ok(true)
64    } else if response.status() == 404 {
65        Ok(false)
66    } else {
67        let status = response.status();
68        let error_text = response.text().await?;
69        Err(anyhow::anyhow!(
70            "Error checking if index exists. Status: {}. Error: {}",
71            status,
72            error_text
73        ))
74    }
75}
76
77pub async fn create_search_indexer(
78    indexer_name: &str,
79    data_source_name: &str,
80    skillset_name: &str,
81    target_index_name: &str,
82    app_config: &ApplicationConfiguration,
83) -> anyhow::Result<()> {
84    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
85        anyhow::anyhow!("Azure configuration is missing from the application configuration")
86    })?;
87
88    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
89        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
90    })?;
91
92    let mut url = search_config.search_endpoint.clone();
93    url.set_path(&format!("indexers/{}", indexer_name));
94    url.set_query(Some(&format!("api-version={}", API_VERSION)));
95
96    let indexer_definition = json!({
97        "name": indexer_name,
98        "description": null,
99        "dataSourceName": data_source_name,
100        "skillsetName": skillset_name,
101        "targetIndexName": target_index_name,
102        "disabled": null,
103        "schedule": null,
104        "parameters": {
105            "batchSize": null,
106            "maxFailedItems": null,
107            "maxFailedItemsPerBatch": null,
108            "base64EncodeKeys": null,
109            "configuration": {
110                "dataToExtract": "contentAndMetadata"
111            }
112        },
113        "fieldMappings": [
114            {
115                "sourceFieldName": "metadata_storage_path",
116                "targetFieldName": "chunk_id",
117                "mappingFunction": { "name": "base64Encode" }
118            },
119        ],
120        "outputFieldMappings": [
121
122        ],
123        "encryptionKey": null
124    });
125
126    let response = REQWEST_CLIENT
127        .put(url)
128        .header("Content-Type", "application/json")
129        .header("api-key", search_config.search_api_key.clone())
130        .json(&indexer_definition)
131        .send()
132        .await?;
133
134    if response.status().is_success() {
135        Ok(())
136    } else {
137        let status = response.status();
138        let error_text = response.text().await?;
139        Err(anyhow::anyhow!(
140            "Error creating search indexer. Status: {}. Error: {}",
141            status,
142            error_text
143        ))
144    }
145}
146
147pub async fn run_search_indexer_now(
148    indexer_name: &str,
149    app_config: &ApplicationConfiguration,
150) -> anyhow::Result<()> {
151    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
152        anyhow::anyhow!("Azure configuration is missing from the application configuration")
153    })?;
154
155    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
156        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
157    })?;
158
159    let mut url = search_config.search_endpoint.clone();
160    url.set_path(&format!("indexers/{}/run", indexer_name));
161    url.set_query(Some(&format!("api-version={}", API_VERSION)));
162
163    let response = REQWEST_CLIENT
164        .post(url)
165        .header("Content-Type", "application/json")
166        .header("api-key", search_config.search_api_key.clone())
167        .send()
168        .await?;
169
170    if response.status().is_success() {
171        Ok(())
172    } else {
173        let status = response.status();
174        let error_text = response.text().await?;
175        Err(anyhow::anyhow!(
176            "Error triggering search indexer. Status: {}. Error: {}",
177            status,
178            error_text
179        ))
180    }
181}
182
183/// Checks if the search indexer exists, is not running, and the last execution is not in progress.
184/// Also prints any errors and warnings from the last execution in a nicely formatted manner.
185///
186/// # Arguments
187///
188/// * `indexer_name` - The name of the indexer to check.
189/// * `app_config` - The application configuration containing Azure settings.
190///
191/// # Returns
192///
193/// * `Ok(true)` if the indexer exists, is not running, and the last execution is not in progress.
194/// * `Ok(false)` if the indexer is running or the last execution is in progress.
195/// * An error if the indexer does not exist or if there's an issue fetching its status.
196pub async fn check_search_indexer_status(
197    indexer_name: &str,
198    app_config: &ApplicationConfiguration,
199) -> anyhow::Result<bool> {
200    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
201        anyhow::anyhow!("Azure configuration is missing from the application configuration")
202    })?;
203
204    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
205        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
206    })?;
207
208    let mut url = search_config.search_endpoint.clone();
209    url.set_path(&format!("indexers('{}')/search.status", indexer_name));
210    url.set_query(Some(&format!("api-version={}", API_VERSION)));
211
212    let response = REQWEST_CLIENT
213        .get(url)
214        .header("Content-Type", "application/json")
215        .header("api-key", search_config.search_api_key.clone())
216        .send()
217        .await?;
218
219    if response.status().is_success() {
220        let response_text = response.text().await?;
221        let indexer_status: IndexerStatusResponse = match serde_json::from_str(&response_text) {
222            Ok(status) => status,
223            Err(e) => {
224                error!("Failed to parse indexer status JSON: {}", e);
225                error!(
226                    "{}",
227                    serde_json::to_string_pretty(&response_text)
228                        .unwrap_or_else(|_| "Invalid JSON".to_string())
229                );
230                return Err(anyhow::anyhow!(
231                    "Failed to parse indexer status JSON: {}",
232                    e
233                ));
234            }
235        };
236
237        // If the indexer is not running, it is not healthy
238        let is_running = indexer_status.status.eq_ignore_ascii_case("running");
239
240        // If the last run is in progress, we cannot start a new run.
241        let last_result_in_progress = indexer_status
242            .last_result
243            .as_ref()
244            .is_some_and(|lr| lr.status.eq_ignore_ascii_case("inprogress"));
245
246        if !is_running {
247            info!("Indexer '{}' is not running normally.", indexer_name);
248        }
249
250        if last_result_in_progress {
251            warn!(
252                "Last execution of indexer '{}' is in progress.",
253                indexer_name
254            );
255        }
256
257        if let Some(last_result) = &indexer_status.last_result {
258            if !last_result.errors.is_empty() {
259                error!("Errors in the last execution:");
260                for error in &last_result.errors {
261                    error!(
262                        "  - **Key**: {}\n    **Name**: {}\n    **Message**: {}\n    **Details**: {}\n    **Documentation**: {}\n",
263                        error.key.as_deref().unwrap_or("N/A"),
264                        error.name.as_deref().unwrap_or("N/A"),
265                        error.message.as_deref().unwrap_or("N/A"),
266                        error.details.as_deref().unwrap_or("N/A"),
267                        error.documentation_link.as_deref().unwrap_or("N/A"),
268                    );
269                }
270            }
271
272            if !last_result.warnings.is_empty() {
273                warn!("Warnings in the last execution:");
274                for warning in &last_result.warnings {
275                    warn!(
276                        "  - **Key**: {}\n    **Name**: {}\n    **Message**: {}\n    **Details**: {}\n    **Documentation**: {}\n",
277                        warning.key.as_deref().unwrap_or("N/A"),
278                        warning.name.as_deref().unwrap_or("N/A"),
279                        warning.message.as_deref().unwrap_or("N/A"),
280                        warning.details.as_deref().unwrap_or("N/A"),
281                        warning.documentation_link.as_deref().unwrap_or("N/A"),
282                    );
283                }
284            }
285        } else {
286            warn!(
287                "No last result information available for indexer '{}'. Assuming the index is not ready yet.",
288                indexer_name
289            );
290            return Ok(false);
291        }
292
293        if is_running && !last_result_in_progress {
294            Ok(true)
295        } else {
296            Ok(false)
297        }
298    } else if response.status() == reqwest::StatusCode::NOT_FOUND {
299        error!("Indexer '{}' does not exist.", indexer_name);
300        Ok(false)
301    } else {
302        let status = response.status();
303        let error_text = response
304            .text()
305            .await
306            .unwrap_or_else(|_| "No error text".to_string());
307        error!(
308            "Error fetching indexer status. Status: {}. Error: {}",
309            status, error_text
310        );
311        Err(anyhow::anyhow!(
312            "Error fetching indexer status. Status: {}. Error: {}",
313            status,
314            error_text
315        ))
316    }
317}