Skip to main content

headless_lms_chatbot/
azure_search_indexer.rs

1use crate::prelude::*;
2use secrecy::ExposeSecret;
3use serde_json::json;
4
5const API_VERSION: &str = "2024-07-01";
6
7#[derive(Debug, Deserialize)]
8struct IndexerStatusResponse {
9    pub status: String,
10    #[serde(rename = "lastResult")]
11    pub last_result: Option<LastResult>,
12}
13
14#[derive(Debug, Deserialize)]
15struct LastResult {
16    pub status: String,
17    pub errors: Vec<IndexerError>,
18    pub warnings: Vec<IndexerWarning>,
19}
20
21#[derive(Debug, Deserialize)]
22struct IndexerError {
23    pub key: Option<String>,
24    pub name: Option<String>,
25    pub message: Option<String>,
26    pub details: Option<String>,
27    #[serde(rename = "documentationLink")]
28    pub documentation_link: Option<String>,
29}
30
31#[derive(Debug, Deserialize)]
32struct IndexerWarning {
33    key: Option<String>,
34    name: Option<String>,
35    message: Option<String>,
36    details: Option<String>,
37    #[serde(rename = "documentationLink")]
38    documentation_link: Option<String>,
39}
40
41pub async fn does_search_indexer_exist(
42    indexer_name: &str,
43    app_config: &ApplicationConfiguration,
44) -> anyhow::Result<bool> {
45    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
46        anyhow::anyhow!("Azure configuration is missing from the application configuration")
47    })?;
48
49    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
50        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
51    })?;
52    let mut url = search_config.search_endpoint.clone();
53    url.set_path(&format!("indexers('{}')", indexer_name));
54    url.set_query(Some(&format!("api-version={}", API_VERSION)));
55
56    let response = REQWEST_CLIENT
57        .get(url)
58        .header("Content-Type", "application/json")
59        .header("api-key", search_config.search_api_key.expose_secret())
60        .send()
61        .await?;
62
63    if response.status().is_success() {
64        Ok(true)
65    } else if response.status() == 404 {
66        Ok(false)
67    } else {
68        let status = response.status();
69        let error_text = response.text().await?;
70        Err(anyhow::anyhow!(
71            "Error checking if index exists. Status: {}. Error: {}",
72            status,
73            error_text
74        ))
75    }
76}
77
78pub async fn create_search_indexer(
79    indexer_name: &str,
80    data_source_name: &str,
81    skillset_name: &str,
82    target_index_name: &str,
83    app_config: &ApplicationConfiguration,
84) -> anyhow::Result<()> {
85    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
86        anyhow::anyhow!("Azure configuration is missing from the application configuration")
87    })?;
88
89    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
90        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
91    })?;
92
93    let mut url = search_config.search_endpoint.clone();
94    url.set_path(&format!("indexers/{}", indexer_name));
95    url.set_query(Some(&format!("api-version={}", API_VERSION)));
96
97    let indexer_definition = json!({
98        "name": indexer_name,
99        "description": null,
100        "dataSourceName": data_source_name,
101        "skillsetName": skillset_name,
102        "targetIndexName": target_index_name,
103        "disabled": null,
104        "schedule": null,
105        "parameters": {
106            "batchSize": null,
107            "maxFailedItems": null,
108            "maxFailedItemsPerBatch": null,
109            "base64EncodeKeys": null,
110            "configuration": {
111                "dataToExtract": "contentAndMetadata"
112            }
113        },
114        "fieldMappings": [
115            {
116                "sourceFieldName": "metadata_storage_path",
117                "targetFieldName": "chunk_id",
118                "mappingFunction": { "name": "base64Encode" }
119            },
120        ],
121        "outputFieldMappings": [
122
123        ],
124        "encryptionKey": null
125    });
126
127    let response = REQWEST_CLIENT
128        .put(url)
129        .header("Content-Type", "application/json")
130        .header("api-key", search_config.search_api_key.expose_secret())
131        .json(&indexer_definition)
132        .send()
133        .await?;
134
135    if response.status().is_success() {
136        Ok(())
137    } else {
138        let status = response.status();
139        let error_text = response.text().await?;
140        Err(anyhow::anyhow!(
141            "Error creating search indexer. Status: {}. Error: {}",
142            status,
143            error_text
144        ))
145    }
146}
147
148pub async fn run_search_indexer_now(
149    indexer_name: &str,
150    app_config: &ApplicationConfiguration,
151) -> anyhow::Result<()> {
152    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
153        anyhow::anyhow!("Azure configuration is missing from the application configuration")
154    })?;
155
156    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
157        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
158    })?;
159
160    let mut url = search_config.search_endpoint.clone();
161    url.set_path(&format!("indexers/{}/run", indexer_name));
162    url.set_query(Some(&format!("api-version={}", API_VERSION)));
163
164    let response = REQWEST_CLIENT
165        .post(url)
166        .header("Content-Type", "application/json")
167        .header("api-key", search_config.search_api_key.expose_secret())
168        .send()
169        .await?;
170
171    if response.status().is_success() {
172        Ok(())
173    } else {
174        let status = response.status();
175        let error_text = response.text().await?;
176        Err(anyhow::anyhow!(
177            "Error triggering search indexer. Status: {}. Error: {}",
178            status,
179            error_text
180        ))
181    }
182}
183
184/// Checks if the search indexer exists, is not running, and the last execution is not in progress.
185/// Also prints any errors and warnings from the last execution in a nicely formatted manner.
186///
187/// # Arguments
188///
189/// * `indexer_name` - The name of the indexer to check.
190/// * `app_config` - The application configuration containing Azure settings.
191///
192/// # Returns
193///
194/// * `Ok(true)` if the indexer exists, is not running, and the last execution is not in progress.
195/// * `Ok(false)` if the indexer is running or the last execution is in progress.
196/// * An error if the indexer does not exist or if there's an issue fetching its status.
197pub async fn check_search_indexer_status(
198    indexer_name: &str,
199    app_config: &ApplicationConfiguration,
200) -> anyhow::Result<bool> {
201    let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
202        anyhow::anyhow!("Azure configuration is missing from the application configuration")
203    })?;
204
205    let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
206        anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
207    })?;
208
209    let mut url = search_config.search_endpoint.clone();
210    url.set_path(&format!("indexers('{}')/search.status", indexer_name));
211    url.set_query(Some(&format!("api-version={}", API_VERSION)));
212
213    let response = REQWEST_CLIENT
214        .get(url)
215        .header("Content-Type", "application/json")
216        .header("api-key", search_config.search_api_key.expose_secret())
217        .send()
218        .await?;
219
220    if response.status().is_success() {
221        let response_text = response.text().await?;
222        let indexer_status: IndexerStatusResponse = match serde_json::from_str(&response_text) {
223            Ok(status) => status,
224            Err(e) => {
225                error!("Failed to parse indexer status JSON: {}", e);
226                error!(
227                    "{}",
228                    serde_json::to_string_pretty(&response_text)
229                        .unwrap_or_else(|_| "Invalid JSON".to_string())
230                );
231                return Err(anyhow::anyhow!(
232                    "Failed to parse indexer status JSON: {}",
233                    e
234                ));
235            }
236        };
237
238        // If the indexer is not running, it is not healthy
239        let is_running = indexer_status.status.eq_ignore_ascii_case("running");
240
241        // If the last run is in progress, we cannot start a new run.
242        let last_result_in_progress = indexer_status
243            .last_result
244            .as_ref()
245            .is_some_and(|lr| lr.status.eq_ignore_ascii_case("inprogress"));
246
247        if !is_running {
248            info!("Indexer '{}' is not running normally.", indexer_name);
249        }
250
251        if last_result_in_progress {
252            warn!(
253                "Last execution of indexer '{}' is in progress.",
254                indexer_name
255            );
256        }
257
258        if let Some(last_result) = &indexer_status.last_result {
259            if !last_result.errors.is_empty() {
260                error!("Errors in the last execution:");
261                for error in &last_result.errors {
262                    error!(
263                        "  - **Key**: {}\n    **Name**: {}\n    **Message**: {}\n    **Details**: {}\n    **Documentation**: {}\n",
264                        error.key.as_deref().unwrap_or("N/A"),
265                        error.name.as_deref().unwrap_or("N/A"),
266                        error.message.as_deref().unwrap_or("N/A"),
267                        error.details.as_deref().unwrap_or("N/A"),
268                        error.documentation_link.as_deref().unwrap_or("N/A"),
269                    );
270                }
271            }
272
273            if !last_result.warnings.is_empty() {
274                warn!("Warnings in the last execution:");
275                for warning in &last_result.warnings {
276                    warn!(
277                        "  - **Key**: {}\n    **Name**: {}\n    **Message**: {}\n    **Details**: {}\n    **Documentation**: {}\n",
278                        warning.key.as_deref().unwrap_or("N/A"),
279                        warning.name.as_deref().unwrap_or("N/A"),
280                        warning.message.as_deref().unwrap_or("N/A"),
281                        warning.details.as_deref().unwrap_or("N/A"),
282                        warning.documentation_link.as_deref().unwrap_or("N/A"),
283                    );
284                }
285            }
286        } else {
287            warn!(
288                "No last result information available for indexer '{}'. Assuming the index is not ready yet.",
289                indexer_name
290            );
291            return Ok(false);
292        }
293
294        if is_running && !last_result_in_progress {
295            Ok(true)
296        } else {
297            Ok(false)
298        }
299    } else if response.status() == reqwest::StatusCode::NOT_FOUND {
300        error!("Indexer '{}' does not exist.", indexer_name);
301        Ok(false)
302    } else {
303        let status = response.status();
304        let error_text = response
305            .text()
306            .await
307            .unwrap_or_else(|_| "No error text".to_string());
308        error!(
309            "Error fetching indexer status. Status: {}. Error: {}",
310            status, error_text
311        );
312        Err(anyhow::anyhow!(
313            "Error fetching indexer status. Status: {}. Error: {}",
314            status,
315            error_text
316        ))
317    }
318}