headless_lms_chatbot/
azure_search_indexer.rs

1use crate::prelude::*;
2use serde_json::json;
3
4const API_VERSION: &str = "2024-07-01";
5
6#[derive(Debug, Deserialize)]
7struct IndexerStatusResponse {
8    pub status: String,
9    #[serde(rename = "lastResult")]
10    pub last_result: Option<LastResult>,
11}
12
13#[derive(Debug, Deserialize)]
14struct LastResult {
15    pub status: String,
16    pub errors: Vec<IndexerError>,
17    pub warnings: Vec<IndexerWarning>,
18}
19
20#[derive(Debug, Deserialize)]
21struct IndexerError {
22    pub key: Option<String>,
23    pub name: Option<String>,
24    pub message: Option<String>,
25    pub details: Option<String>,
26    #[serde(rename = "documentationLink")]
27    pub documentation_link: Option<String>,
28}
29
30#[derive(Debug, Deserialize)]
31struct IndexerWarning {
32    key: Option<String>,
33    name: Option<String>,
34    message: Option<String>,
35    details: Option<String>,
36    #[serde(rename = "documentationLink")]
37    documentation_link: Option<String>,
38}
39
40pub async fn does_search_indexer_exist(
41    indexer_name: &str,
42    app_config: &ApplicationConfiguration,
43) -> anyhow::Result<bool> {
44    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
45        anyhow::anyhow!("Azure configuration is missing from the application configuration")
46    })?;
47
48    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
49        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
50    })?;
51    let mut url = search_config.search_endpoint.clone();
52    url.set_path(&format!("indexers('{}')", indexer_name));
53    url.set_query(Some(&format!("api-version={}", API_VERSION)));
54
55    let response = REQWEST_CLIENT
56        .get(url)
57        .header("Content-Type", "application/json")
58        .header("api-key", search_config.search_api_key.clone())
59        .send()
60        .await?;
61
62    if response.status().is_success() {
63        Ok(true)
64    } else if response.status() == 404 {
65        Ok(false)
66    } else {
67        let status = response.status();
68        let error_text = response.text().await?;
69        Err(anyhow::anyhow!(
70            "Error checking if index exists. Status: {}. Error: {}",
71            status,
72            error_text
73        ))
74    }
75}
76
77pub async fn create_search_indexer(
78    indexer_name: &str,
79    data_source_name: &str,
80    skillset_name: &str,
81    target_index_name: &str,
82    app_config: &ApplicationConfiguration,
83) -> anyhow::Result<()> {
84    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
85        anyhow::anyhow!("Azure configuration is missing from the application configuration")
86    })?;
87
88    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
89        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
90    })?;
91
92    let mut url = search_config.search_endpoint.clone();
93    url.set_path(&format!("indexers/{}", indexer_name));
94    url.set_query(Some(&format!("api-version={}", API_VERSION)));
95
96    let indexer_definition = json!({
97        "name": indexer_name,
98        "description": null,
99        "dataSourceName": data_source_name,
100        "skillsetName": skillset_name,
101        "targetIndexName": target_index_name,
102        "disabled": null,
103        "schedule": null,
104        "parameters": {
105            "batchSize": null,
106            "maxFailedItems": null,
107            "maxFailedItemsPerBatch": null,
108            "base64EncodeKeys": null,
109            "configuration": {
110                "dataToExtract": "contentAndMetadata"
111            }
112        },
113        "fieldMappings": [],
114        "outputFieldMappings": [],
115        "encryptionKey": null
116    });
117
118    let response = REQWEST_CLIENT
119        .put(url)
120        .header("Content-Type", "application/json")
121        .header("api-key", search_config.search_api_key.clone())
122        .json(&indexer_definition)
123        .send()
124        .await?;
125
126    if response.status().is_success() {
127        Ok(())
128    } else {
129        let status = response.status();
130        let error_text = response.text().await?;
131        Err(anyhow::anyhow!(
132            "Error creating search indexer. Status: {}. Error: {}",
133            status,
134            error_text
135        ))
136    }
137}
138
139pub async fn run_search_indexer_now(
140    indexer_name: &str,
141    app_config: &ApplicationConfiguration,
142) -> anyhow::Result<()> {
143    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
144        anyhow::anyhow!("Azure configuration is missing from the application configuration")
145    })?;
146
147    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
148        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
149    })?;
150
151    let mut url = search_config.search_endpoint.clone();
152    url.set_path(&format!("indexers/{}/run", indexer_name));
153    url.set_query(Some(&format!("api-version={}", API_VERSION)));
154
155    let response = REQWEST_CLIENT
156        .post(url)
157        .header("Content-Type", "application/json")
158        .header("api-key", search_config.search_api_key.clone())
159        .send()
160        .await?;
161
162    if response.status().is_success() {
163        Ok(())
164    } else {
165        let status = response.status();
166        let error_text = response.text().await?;
167        Err(anyhow::anyhow!(
168            "Error triggering search indexer. Status: {}. Error: {}",
169            status,
170            error_text
171        ))
172    }
173}
174
175/// Checks if the search indexer exists, is not running, and the last execution is not in progress.
176/// Also prints any errors and warnings from the last execution in a nicely formatted manner.
177///
178/// # Arguments
179///
180/// * `indexer_name` - The name of the indexer to check.
181/// * `app_config` - The application configuration containing Azure settings.
182///
183/// # Returns
184///
185/// * `Ok(true)` if the indexer exists, is not running, and the last execution is not in progress.
186/// * `Ok(false)` if the indexer is running or the last execution is in progress.
187/// * An error if the indexer does not exist or if there's an issue fetching its status.
188pub async fn check_search_indexer_status(
189    indexer_name: &str,
190    app_config: &ApplicationConfiguration,
191) -> anyhow::Result<bool> {
192    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
193        anyhow::anyhow!("Azure configuration is missing from the application configuration")
194    })?;
195
196    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
197        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
198    })?;
199
200    let mut url = search_config.search_endpoint.clone();
201    url.set_path(&format!("indexers('{}')/search.status", indexer_name));
202    url.set_query(Some(&format!("api-version={}", API_VERSION)));
203
204    let response = REQWEST_CLIENT
205        .get(url)
206        .header("Content-Type", "application/json")
207        .header("api-key", search_config.search_api_key.clone())
208        .send()
209        .await?;
210
211    if response.status().is_success() {
212        let response_text = response.text().await?;
213        let indexer_status: IndexerStatusResponse = match serde_json::from_str(&response_text) {
214            Ok(status) => status,
215            Err(e) => {
216                error!("Failed to parse indexer status JSON: {}", e);
217                error!(
218                    "{}",
219                    serde_json::to_string_pretty(&response_text)
220                        .unwrap_or_else(|_| "Invalid JSON".to_string())
221                );
222                return Err(anyhow::anyhow!(
223                    "Failed to parse indexer status JSON: {}",
224                    e
225                ));
226            }
227        };
228
229        // If the indexer is not running, it is not healthy
230        let is_running = indexer_status.status.eq_ignore_ascii_case("running");
231
232        // If the last run is in progress, we cannot start a new run.
233        let last_result_in_progress = indexer_status
234            .last_result
235            .as_ref()
236            .is_some_and(|lr| lr.status.eq_ignore_ascii_case("inprogress"));
237
238        if !is_running {
239            info!("Indexer '{}' is not running normally.", indexer_name);
240        }
241
242        if last_result_in_progress {
243            warn!(
244                "Last execution of indexer '{}' is in progress.",
245                indexer_name
246            );
247        }
248
249        if let Some(last_result) = &indexer_status.last_result {
250            if !last_result.errors.is_empty() {
251                error!("Errors in the last execution:");
252                for error in &last_result.errors {
253                    error!(
254                        "  - **Key**: {}\n    **Name**: {}\n    **Message**: {}\n    **Details**: {}\n    **Documentation**: {}\n",
255                        error.key.as_deref().unwrap_or("N/A"),
256                        error.name.as_deref().unwrap_or("N/A"),
257                        error.message.as_deref().unwrap_or("N/A"),
258                        error.details.as_deref().unwrap_or("N/A"),
259                        error.documentation_link.as_deref().unwrap_or("N/A"),
260                    );
261                }
262            }
263
264            if !last_result.warnings.is_empty() {
265                warn!("Warnings in the last execution:");
266                for warning in &last_result.warnings {
267                    warn!(
268                        "  - **Key**: {}\n    **Name**: {}\n    **Message**: {}\n    **Details**: {}\n    **Documentation**: {}\n",
269                        warning.key.as_deref().unwrap_or("N/A"),
270                        warning.name.as_deref().unwrap_or("N/A"),
271                        warning.message.as_deref().unwrap_or("N/A"),
272                        warning.details.as_deref().unwrap_or("N/A"),
273                        warning.documentation_link.as_deref().unwrap_or("N/A"),
274                    );
275                }
276            }
277        } else {
278            warn!(
279                "No last result information available for indexer '{}'. Assuming the index is not ready yet.",
280                indexer_name
281            );
282            return Ok(false);
283        }
284
285        if is_running && !last_result_in_progress {
286            Ok(true)
287        } else {
288            Ok(false)
289        }
290    } else if response.status() == reqwest::StatusCode::NOT_FOUND {
291        error!("Indexer '{}' does not exist.", indexer_name);
292        Ok(false)
293    } else {
294        let status = response.status();
295        let error_text = response
296            .text()
297            .await
298            .unwrap_or_else(|_| "No error text".to_string());
299        error!(
300            "Error fetching indexer status. Status: {}. Error: {}",
301            status, error_text
302        );
303        Err(anyhow::anyhow!(
304            "Error fetching indexer status. Status: {}. Error: {}",
305            status,
306            error_text
307        ))
308    }
309}