1use crate::prelude::*;
2use serde_json::json;
3
4const API_VERSION: &str = "2024-07-01";
5
6#[derive(Debug, Deserialize)]
7struct IndexerStatusResponse {
8 pub status: String,
9 #[serde(rename = "lastResult")]
10 pub last_result: Option<LastResult>,
11}
12
13#[derive(Debug, Deserialize)]
14struct LastResult {
15 pub status: String,
16 pub errors: Vec<IndexerError>,
17 pub warnings: Vec<IndexerWarning>,
18}
19
20#[derive(Debug, Deserialize)]
21struct IndexerError {
22 pub key: Option<String>,
23 pub name: Option<String>,
24 pub message: Option<String>,
25 pub details: Option<String>,
26 #[serde(rename = "documentationLink")]
27 pub documentation_link: Option<String>,
28}
29
30#[derive(Debug, Deserialize)]
31struct IndexerWarning {
32 key: Option<String>,
33 name: Option<String>,
34 message: Option<String>,
35 details: Option<String>,
36 #[serde(rename = "documentationLink")]
37 documentation_link: Option<String>,
38}
39
40pub async fn does_search_indexer_exist(
41 indexer_name: &str,
42 app_config: &ApplicationConfiguration,
43) -> anyhow::Result<bool> {
44 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
45 anyhow::anyhow!("Azure configuration is missing from the application configuration")
46 })?;
47
48 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
49 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
50 })?;
51 let mut url = search_config.search_endpoint.clone();
52 url.set_path(&format!("indexers('{}')", indexer_name));
53 url.set_query(Some(&format!("api-version={}", API_VERSION)));
54
55 let response = REQWEST_CLIENT
56 .get(url)
57 .header("Content-Type", "application/json")
58 .header("api-key", search_config.search_api_key.clone())
59 .send()
60 .await?;
61
62 if response.status().is_success() {
63 Ok(true)
64 } else if response.status() == 404 {
65 Ok(false)
66 } else {
67 let status = response.status();
68 let error_text = response.text().await?;
69 Err(anyhow::anyhow!(
70 "Error checking if index exists. Status: {}. Error: {}",
71 status,
72 error_text
73 ))
74 }
75}
76
77pub async fn create_search_indexer(
78 indexer_name: &str,
79 data_source_name: &str,
80 skillset_name: &str,
81 target_index_name: &str,
82 app_config: &ApplicationConfiguration,
83) -> anyhow::Result<()> {
84 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
85 anyhow::anyhow!("Azure configuration is missing from the application configuration")
86 })?;
87
88 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
89 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
90 })?;
91
92 let mut url = search_config.search_endpoint.clone();
93 url.set_path(&format!("indexers/{}", indexer_name));
94 url.set_query(Some(&format!("api-version={}", API_VERSION)));
95
96 let indexer_definition = json!({
97 "name": indexer_name,
98 "description": null,
99 "dataSourceName": data_source_name,
100 "skillsetName": skillset_name,
101 "targetIndexName": target_index_name,
102 "disabled": null,
103 "schedule": null,
104 "parameters": {
105 "batchSize": null,
106 "maxFailedItems": null,
107 "maxFailedItemsPerBatch": null,
108 "base64EncodeKeys": null,
109 "configuration": {
110 "dataToExtract": "contentAndMetadata"
111 }
112 },
113 "fieldMappings": [
114 {
115 "sourceFieldName": "metadata_storage_path",
116 "targetFieldName": "chunk_id",
117 "mappingFunction": { "name": "base64Encode" }
118 },
119 ],
120 "outputFieldMappings": [
121
122 ],
123 "encryptionKey": null
124 });
125
126 let response = REQWEST_CLIENT
127 .put(url)
128 .header("Content-Type", "application/json")
129 .header("api-key", search_config.search_api_key.clone())
130 .json(&indexer_definition)
131 .send()
132 .await?;
133
134 if response.status().is_success() {
135 Ok(())
136 } else {
137 let status = response.status();
138 let error_text = response.text().await?;
139 Err(anyhow::anyhow!(
140 "Error creating search indexer. Status: {}. Error: {}",
141 status,
142 error_text
143 ))
144 }
145}
146
147pub async fn run_search_indexer_now(
148 indexer_name: &str,
149 app_config: &ApplicationConfiguration,
150) -> anyhow::Result<()> {
151 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
152 anyhow::anyhow!("Azure configuration is missing from the application configuration")
153 })?;
154
155 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
156 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
157 })?;
158
159 let mut url = search_config.search_endpoint.clone();
160 url.set_path(&format!("indexers/{}/run", indexer_name));
161 url.set_query(Some(&format!("api-version={}", API_VERSION)));
162
163 let response = REQWEST_CLIENT
164 .post(url)
165 .header("Content-Type", "application/json")
166 .header("api-key", search_config.search_api_key.clone())
167 .send()
168 .await?;
169
170 if response.status().is_success() {
171 Ok(())
172 } else {
173 let status = response.status();
174 let error_text = response.text().await?;
175 Err(anyhow::anyhow!(
176 "Error triggering search indexer. Status: {}. Error: {}",
177 status,
178 error_text
179 ))
180 }
181}
182
183pub async fn check_search_indexer_status(
197 indexer_name: &str,
198 app_config: &ApplicationConfiguration,
199) -> anyhow::Result<bool> {
200 let azure_config = app_config.azure_configuration.as_ref().ok_or_else(|| {
201 anyhow::anyhow!("Azure configuration is missing from the application configuration")
202 })?;
203
204 let search_config = azure_config.search_config.as_ref().ok_or_else(|| {
205 anyhow::anyhow!("Azure search configuration is missing from the Azure configuration")
206 })?;
207
208 let mut url = search_config.search_endpoint.clone();
209 url.set_path(&format!("indexers('{}')/search.status", indexer_name));
210 url.set_query(Some(&format!("api-version={}", API_VERSION)));
211
212 let response = REQWEST_CLIENT
213 .get(url)
214 .header("Content-Type", "application/json")
215 .header("api-key", search_config.search_api_key.clone())
216 .send()
217 .await?;
218
219 if response.status().is_success() {
220 let response_text = response.text().await?;
221 let indexer_status: IndexerStatusResponse = match serde_json::from_str(&response_text) {
222 Ok(status) => status,
223 Err(e) => {
224 error!("Failed to parse indexer status JSON: {}", e);
225 error!(
226 "{}",
227 serde_json::to_string_pretty(&response_text)
228 .unwrap_or_else(|_| "Invalid JSON".to_string())
229 );
230 return Err(anyhow::anyhow!(
231 "Failed to parse indexer status JSON: {}",
232 e
233 ));
234 }
235 };
236
237 let is_running = indexer_status.status.eq_ignore_ascii_case("running");
239
240 let last_result_in_progress = indexer_status
242 .last_result
243 .as_ref()
244 .is_some_and(|lr| lr.status.eq_ignore_ascii_case("inprogress"));
245
246 if !is_running {
247 info!("Indexer '{}' is not running normally.", indexer_name);
248 }
249
250 if last_result_in_progress {
251 warn!(
252 "Last execution of indexer '{}' is in progress.",
253 indexer_name
254 );
255 }
256
257 if let Some(last_result) = &indexer_status.last_result {
258 if !last_result.errors.is_empty() {
259 error!("Errors in the last execution:");
260 for error in &last_result.errors {
261 error!(
262 " - **Key**: {}\n **Name**: {}\n **Message**: {}\n **Details**: {}\n **Documentation**: {}\n",
263 error.key.as_deref().unwrap_or("N/A"),
264 error.name.as_deref().unwrap_or("N/A"),
265 error.message.as_deref().unwrap_or("N/A"),
266 error.details.as_deref().unwrap_or("N/A"),
267 error.documentation_link.as_deref().unwrap_or("N/A"),
268 );
269 }
270 }
271
272 if !last_result.warnings.is_empty() {
273 warn!("Warnings in the last execution:");
274 for warning in &last_result.warnings {
275 warn!(
276 " - **Key**: {}\n **Name**: {}\n **Message**: {}\n **Details**: {}\n **Documentation**: {}\n",
277 warning.key.as_deref().unwrap_or("N/A"),
278 warning.name.as_deref().unwrap_or("N/A"),
279 warning.message.as_deref().unwrap_or("N/A"),
280 warning.details.as_deref().unwrap_or("N/A"),
281 warning.documentation_link.as_deref().unwrap_or("N/A"),
282 );
283 }
284 }
285 } else {
286 warn!(
287 "No last result information available for indexer '{}'. Assuming the index is not ready yet.",
288 indexer_name
289 );
290 return Ok(false);
291 }
292
293 if is_running && !last_result_in_progress {
294 Ok(true)
295 } else {
296 Ok(false)
297 }
298 } else if response.status() == reqwest::StatusCode::NOT_FOUND {
299 error!("Indexer '{}' does not exist.", indexer_name);
300 Ok(false)
301 } else {
302 let status = response.status();
303 let error_text = response
304 .text()
305 .await
306 .unwrap_or_else(|_| "No error text".to_string());
307 error!(
308 "Error fetching indexer status. Status: {}. Error: {}",
309 status, error_text
310 );
311 Err(anyhow::anyhow!(
312 "Error fetching indexer status. Status: {}. Error: {}",
313 status,
314 error_text
315 ))
316 }
317}