1use crate::prelude::*;
2use secrecy::ExposeSecret;
3use serde_json::json;
4
5const API_VERSION: &str = "2024-07-01";
6
7#[derive(Debug, Deserialize)]
8struct IndexerStatusResponse {
9 pub status: String,
10 #[serde(rename = "lastResult")]
11 pub last_result: Option<LastResult>,
12}
13
14#[derive(Debug, Deserialize)]
15struct LastResult {
16 pub status: String,
17 pub errors: Vec<IndexerError>,
18 pub warnings: Vec<IndexerWarning>,
19}
20
21#[derive(Debug, Deserialize)]
22struct IndexerError {
23 pub key: Option<String>,
24 pub name: Option<String>,
25 pub message: Option<String>,
26 pub details: Option<String>,
27 #[serde(rename = "documentationLink")]
28 pub documentation_link: Option<String>,
29}
30
31#[derive(Debug, Deserialize)]
32struct IndexerWarning {
33 key: Option<String>,
34 name: Option<String>,
35 message: Option<String>,
36 details: Option<String>,
37 #[serde(rename = "documentationLink")]
38 documentation_link: Option<String>,
39}
40
41pub async fn does_search_indexer_exist(
42 indexer_name: &str,
43 app_config: &ApplicationConfiguration,
44) -> anyhow::Result<bool> {
45 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
46 anyhow::anyhow!("Azure configuration is missing from the application configuration")
47 })?;
48
49 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
50 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
51 })?;
52 let mut url = search_config.search_endpoint.clone();
53 url.set_path(&format!("indexers('{}')", indexer_name));
54 url.set_query(Some(&format!("api-version={}", API_VERSION)));
55
56 let response = REQWEST_CLIENT
57 .get(url)
58 .header("Content-Type", "application/json")
59 .header("api-key", search_config.search_api_key.expose_secret())
60 .send()
61 .await?;
62
63 if response.status().is_success() {
64 Ok(true)
65 } else if response.status() == 404 {
66 Ok(false)
67 } else {
68 let status = response.status();
69 let error_text = response.text().await?;
70 Err(anyhow::anyhow!(
71 "Error checking if index exists. Status: {}. Error: {}",
72 status,
73 error_text
74 ))
75 }
76}
77
78pub async fn create_search_indexer(
79 indexer_name: &str,
80 data_source_name: &str,
81 skillset_name: &str,
82 target_index_name: &str,
83 app_config: &ApplicationConfiguration,
84) -> anyhow::Result<()> {
85 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
86 anyhow::anyhow!("Azure configuration is missing from the application configuration")
87 })?;
88
89 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
90 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
91 })?;
92
93 let mut url = search_config.search_endpoint.clone();
94 url.set_path(&format!("indexers/{}", indexer_name));
95 url.set_query(Some(&format!("api-version={}", API_VERSION)));
96
97 let indexer_definition = json!({
98 "name": indexer_name,
99 "description": null,
100 "dataSourceName": data_source_name,
101 "skillsetName": skillset_name,
102 "targetIndexName": target_index_name,
103 "disabled": null,
104 "schedule": null,
105 "parameters": {
106 "batchSize": null,
107 "maxFailedItems": null,
108 "maxFailedItemsPerBatch": null,
109 "base64EncodeKeys": null,
110 "configuration": {
111 "dataToExtract": "contentAndMetadata"
112 }
113 },
114 "fieldMappings": [
115 {
116 "sourceFieldName": "metadata_storage_path",
117 "targetFieldName": "chunk_id",
118 "mappingFunction": { "name": "base64Encode" }
119 },
120 ],
121 "outputFieldMappings": [
122
123 ],
124 "encryptionKey": null
125 });
126
127 let response = REQWEST_CLIENT
128 .put(url)
129 .header("Content-Type", "application/json")
130 .header("api-key", search_config.search_api_key.expose_secret())
131 .json(&indexer_definition)
132 .send()
133 .await?;
134
135 if response.status().is_success() {
136 Ok(())
137 } else {
138 let status = response.status();
139 let error_text = response.text().await?;
140 Err(anyhow::anyhow!(
141 "Error creating search indexer. Status: {}. Error: {}",
142 status,
143 error_text
144 ))
145 }
146}
147
148pub async fn run_search_indexer_now(
149 indexer_name: &str,
150 app_config: &ApplicationConfiguration,
151) -> anyhow::Result<()> {
152 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
153 anyhow::anyhow!("Azure configuration is missing from the application configuration")
154 })?;
155
156 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
157 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
158 })?;
159
160 let mut url = search_config.search_endpoint.clone();
161 url.set_path(&format!("indexers/{}/run", indexer_name));
162 url.set_query(Some(&format!("api-version={}", API_VERSION)));
163
164 let response = REQWEST_CLIENT
165 .post(url)
166 .header("Content-Type", "application/json")
167 .header("api-key", search_config.search_api_key.expose_secret())
168 .send()
169 .await?;
170
171 if response.status().is_success() {
172 Ok(())
173 } else {
174 let status = response.status();
175 let error_text = response.text().await?;
176 Err(anyhow::anyhow!(
177 "Error triggering search indexer. Status: {}. Error: {}",
178 status,
179 error_text
180 ))
181 }
182}
183
184pub async fn check_search_indexer_status(
198 indexer_name: &str,
199 app_config: &ApplicationConfiguration,
200) -> anyhow::Result<bool> {
201 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
202 anyhow::anyhow!("Azure configuration is missing from the application configuration")
203 })?;
204
205 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
206 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
207 })?;
208
209 let mut url = search_config.search_endpoint.clone();
210 url.set_path(&format!("indexers('{}')/search.status", indexer_name));
211 url.set_query(Some(&format!("api-version={}", API_VERSION)));
212
213 let response = REQWEST_CLIENT
214 .get(url)
215 .header("Content-Type", "application/json")
216 .header("api-key", search_config.search_api_key.expose_secret())
217 .send()
218 .await?;
219
220 if response.status().is_success() {
221 let response_text = response.text().await?;
222 let indexer_status: IndexerStatusResponse = match serde_json::from_str(&response_text) {
223 Ok(status) => status,
224 Err(e) => {
225 error!("Failed to parse indexer status JSON: {}", e);
226 error!(
227 "{}",
228 serde_json::to_string_pretty(&response_text)
229 .unwrap_or_else(|_| "Invalid JSON".to_string())
230 );
231 return Err(anyhow::anyhow!(
232 "Failed to parse indexer status JSON: {}",
233 e
234 ));
235 }
236 };
237
238 let is_running = indexer_status.status.eq_ignore_ascii_case("running");
240
241 let last_result_in_progress = indexer_status
243 .last_result
244 .as_ref()
245 .is_some_and(|lr| lr.status.eq_ignore_ascii_case("inprogress"));
246
247 if !is_running {
248 info!("Indexer '{}' is not running normally.", indexer_name);
249 }
250
251 if last_result_in_progress {
252 warn!(
253 "Last execution of indexer '{}' is in progress.",
254 indexer_name
255 );
256 }
257
258 if let Some(last_result) = &indexer_status.last_result {
259 if !last_result.errors.is_empty() {
260 error!("Errors in the last execution:");
261 for error in &last_result.errors {
262 error!(
263 " - **Key**: {}\n **Name**: {}\n **Message**: {}\n **Details**: {}\n **Documentation**: {}\n",
264 error.key.as_deref().unwrap_or("N/A"),
265 error.name.as_deref().unwrap_or("N/A"),
266 error.message.as_deref().unwrap_or("N/A"),
267 error.details.as_deref().unwrap_or("N/A"),
268 error.documentation_link.as_deref().unwrap_or("N/A"),
269 );
270 }
271 }
272
273 if !last_result.warnings.is_empty() {
274 warn!("Warnings in the last execution:");
275 for warning in &last_result.warnings {
276 warn!(
277 " - **Key**: {}\n **Name**: {}\n **Message**: {}\n **Details**: {}\n **Documentation**: {}\n",
278 warning.key.as_deref().unwrap_or("N/A"),
279 warning.name.as_deref().unwrap_or("N/A"),
280 warning.message.as_deref().unwrap_or("N/A"),
281 warning.details.as_deref().unwrap_or("N/A"),
282 warning.documentation_link.as_deref().unwrap_or("N/A"),
283 );
284 }
285 }
286 } else {
287 warn!(
288 "No last result information available for indexer '{}'. Assuming the index is not ready yet.",
289 indexer_name
290 );
291 return Ok(false);
292 }
293
294 if is_running && !last_result_in_progress {
295 Ok(true)
296 } else {
297 Ok(false)
298 }
299 } else if response.status() == reqwest::StatusCode::NOT_FOUND {
300 error!("Indexer '{}' does not exist.", indexer_name);
301 Ok(false)
302 } else {
303 let status = response.status();
304 let error_text = response
305 .text()
306 .await
307 .unwrap_or_else(|_| "No error text".to_string());
308 error!(
309 "Error fetching indexer status. Status: {}. Error: {}",
310 status, error_text
311 );
312 Err(anyhow::anyhow!(
313 "Error fetching indexer status. Status: {}. Error: {}",
314 status,
315 error_text
316 ))
317 }
318}