1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use chrono::Utc;
8use dotenv::dotenv;
9use sqlx::{PgConnection, PgPool};
10use url::Url;
11use uuid::Uuid;
12
13use crate::setup_tracing;
14
15use headless_lms_chatbot::{
16 azure_blob_storage::AzureBlobClient,
17 azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
18 azure_search_index::{create_search_index, does_search_index_exist},
19 azure_search_indexer::{
20 check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
21 run_search_indexer_now,
22 },
23 azure_skillset::{create_skillset, does_skillset_exist},
24 content_cleaner::convert_material_blocks_to_markdown_with_llm,
25};
26use headless_lms_models::{
27 application_task_default_language_models::ApplicationTask,
28 chapters::DatabaseChapter,
29 page_history::PageHistory,
30 pages::{Page, PageVisibility},
31};
32use headless_lms_utils::{
33 ApplicationConfiguration,
34 document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
35 url_encoding::url_encode,
36};
37
38const SYNC_INTERVAL_SECS: u64 = 10;
39const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
40const FAILURE_COOLDOWN_SECS: i64 = 300;
41const MAX_CONSECUTIVE_FAILURES: i32 = 5;
42
43pub async fn main() -> anyhow::Result<()> {
44 initialize_environment()?;
45 let config = initialize_configuration().await?;
46 if config.app_configuration.azure_configuration.is_none() {
47 warn!("Azure configuration not provided. Not running chatbot syncer.");
48 loop {
50 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
51 }
52 }
53 if config.app_configuration.test_chatbot {
54 warn!(
55 "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
56 );
57 loop {
59 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
60 }
61 }
62
63 let db_pool = initialize_database_pool(&config.database_url).await?;
64 let mut conn = db_pool.acquire().await?;
65 let blob_client = initialize_blob_client(&config).await?;
66
67 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
68 let mut ticks = 0;
69
70 info!("Starting chatbot syncer.");
71
72 loop {
73 interval.tick().await;
74 ticks += 1;
75
76 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
77 ticks = 0;
78 info!("Still syncing for chatbot.");
79 }
80 if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
81 error!("Error during synchronization: {:?}", e);
82 }
83 }
84}
85
86fn initialize_environment() -> anyhow::Result<()> {
87 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
89 dotenv().ok();
90 setup_tracing()?;
91 Ok(())
92}
93
94struct SyncerConfig {
95 database_url: String,
96 name: String,
97 app_configuration: ApplicationConfiguration,
98}
99
100async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
101 let database_url = env::var("DATABASE_URL")
102 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
103
104 let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
105 .expect("BASE_URL must be a valid URL");
106
107 let name = base_url
108 .host_str()
109 .expect("BASE_URL must have a host")
110 .replace(".", "-");
111
112 let app_configuration = ApplicationConfiguration::try_from_env()?;
113
114 Ok(SyncerConfig {
115 database_url,
116 name,
117 app_configuration,
118 })
119}
120
121async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
123 PgPool::connect(database_url).await.map_err(|e| {
124 anyhow::anyhow!(
125 "Failed to connect to the database at {}: {:?}",
126 database_url,
127 e
128 )
129 })
130}
131
132async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
134 let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
135 blob_client.ensure_container_exists().await?;
136 Ok(blob_client)
137}
138
139async fn sync_pages(
141 conn: &mut PgConnection,
142 config: &SyncerConfig,
143 blob_client: &AzureBlobClient,
144) -> anyhow::Result<()> {
145 let base_url = Url::parse(&config.app_configuration.base_url)?;
146 let chatbot_configs =
147 headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
148
149 let course_ids: Vec<Uuid> = chatbot_configs
150 .iter()
151 .map(|config| config.course_id)
152 .collect::<HashSet<_>>()
153 .into_iter()
154 .collect();
155
156 let sync_statuses =
157 headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
158 conn,
159 &course_ids,
160 )
161 .await?;
162
163 let latest_histories =
164 headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
165 conn,
166 &course_ids,
167 )
168 .await?;
169
170 let shared_index_name = config.name.clone();
171 ensure_search_index_exists(
172 &shared_index_name,
173 &config.app_configuration,
174 &blob_client.container_name,
175 )
176 .await?;
177
178 if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
179 warn!("Search indexer is not ready to index. Skipping synchronization.");
180 return Ok(());
181 }
182
183 let mut any_changes = false;
184
185 for (course_id, statuses) in sync_statuses.iter() {
186 let page_ids: Vec<Uuid> = statuses.iter().map(|s| s.page_id).collect();
187 let public_pages_set: HashSet<Uuid> =
188 headless_lms_models::pages::get_by_ids_and_visibility(
189 conn,
190 &page_ids,
191 PageVisibility::Public,
192 )
193 .await?
194 .into_iter()
195 .map(|p| p.id)
196 .collect();
197
198 let outdated_statuses: Vec<_> = statuses
199 .iter()
200 .filter(|status| {
201 if !public_pages_set.contains(&status.page_id) {
202 return false;
203 }
204
205 let is_outdated = latest_histories
206 .get(&status.page_id)
207 .is_some_and(|history| status.synced_page_revision_id != Some(history.id));
208
209 if !is_outdated {
210 return false;
211 }
212
213 if status.consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
214 debug!(
215 "Skipping page {} due to permanent failure ({} consecutive failures). Manual intervention required.",
216 status.page_id, status.consecutive_failures
217 );
218 return false;
219 }
220
221 if let Some(error_msg) = &status.error_message
222 && !error_msg.is_empty() {
223 let error_age_seconds = (Utc::now() - status.updated_at).num_seconds();
224 if error_age_seconds < FAILURE_COOLDOWN_SECS {
225 debug!(
226 "Skipping page {} due to recent failure ({} seconds ago, {} consecutive failures): {}",
227 status.page_id, error_age_seconds, status.consecutive_failures, error_msg
228 );
229 return false;
230 }
231 }
232
233 true
234 })
235 .collect();
236
237 if outdated_statuses.is_empty() {
238 continue;
239 }
240
241 any_changes = true;
242 info!(
243 "Syncing {} pages for course id: {}.",
244 outdated_statuses.len(),
245 course_id
246 );
247 for status in &outdated_statuses {
248 info!(
249 "Page id: {}, synced page revision id: {:?}.",
250 status.page_id, status.synced_page_revision_id
251 );
252 }
253
254 let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
255 let pages = headless_lms_models::pages::get_by_ids_and_visibility(
256 conn,
257 &page_ids,
258 PageVisibility::Public,
259 )
260 .await?;
261
262 if !pages.is_empty() {
263 sync_pages_batch(
264 conn,
265 &pages,
266 blob_client,
267 &base_url,
268 &config.app_configuration,
269 &latest_histories,
270 )
271 .await?;
272 } else {
273 info!("No pages to sync for course id: {}.", course_id);
274 }
275
276 let hidden_page_ids: Vec<Uuid> = statuses
277 .iter()
278 .filter(|status| {
279 !public_pages_set.contains(&status.page_id)
280 && status.synced_page_revision_id.is_some()
281 })
282 .map(|s| s.page_id)
283 .collect();
284
285 if !hidden_page_ids.is_empty() {
286 info!(
287 "Clearing sync statuses for {} hidden pages: {:?}",
288 hidden_page_ids.len(),
289 hidden_page_ids
290 );
291 headless_lms_models::chatbot_page_sync_statuses::clear_sync_statuses(
292 conn,
293 &hidden_page_ids,
294 )
295 .await?;
296 }
297
298 delete_old_files(conn, *course_id, blob_client).await?;
299 }
300
301 if any_changes {
302 run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
303 info!("New files have been synced and the search indexer has been started.");
304 }
305
306 Ok(())
307}
308
309async fn ensure_search_index_exists(
311 name: &str,
312 app_config: &ApplicationConfiguration,
313 container_name: &str,
314) -> anyhow::Result<()> {
315 if !does_search_index_exist(name, app_config).await? {
316 create_search_index(name.to_owned(), app_config).await?;
317 }
318 if !does_skillset_exist(name, app_config).await? {
319 create_skillset(name, name, app_config).await?;
320 }
321 if !does_azure_datasource_exist(name, app_config).await? {
322 create_azure_datasource(name, container_name, app_config).await?;
323 }
324 if !does_search_indexer_exist(name, app_config).await? {
325 create_search_indexer(name, name, name, name, app_config).await?;
326 }
327
328 Ok(())
329}
330
331async fn sync_pages_batch(
333 conn: &mut PgConnection,
334 pages: &[Page],
335 blob_client: &AzureBlobClient,
336 base_url: &Url,
337 app_config: &ApplicationConfiguration,
338 latest_histories: &HashMap<Uuid, PageHistory>,
339) -> anyhow::Result<()> {
340 let course_id = pages
341 .first()
342 .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
343 .course_id
344 .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
345
346 let course = headless_lms_models::courses::get_course(conn, course_id).await?;
347 let organization =
348 headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
349 let task_lm = headless_lms_models::application_task_default_language_models::get_for_task(
350 conn,
351 ApplicationTask::ContentCleaning,
352 )
353 .await?;
354
355 let mut base_url = base_url.clone();
356 base_url.set_path(&format!(
357 "/org/{}/courses/{}",
358 organization.slug, course.slug
359 ));
360
361 let mut allowed_file_paths = Vec::new();
362
363 for page in pages {
364 info!("Syncing page id: {}.", page.id);
365
366 let mut page_url = base_url.clone();
367 page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
368
369 let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
370 let sanitized_blocks = remove_sensitive_attributes(parsed_content);
371
372 let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
373 &sanitized_blocks,
374 app_config,
375 &task_lm,
376 )
377 .await
378 {
379 Ok(markdown) => {
380 info!("Successfully cleaned content for page {}", page.id);
381 if markdown.trim().is_empty() {
383 warn!(
384 "Markdown is empty for page {}. Generating fallback content with a fake heading.",
385 page.id
386 );
387 format!("# {}", page.title)
388 } else {
389 markdown
390 }
391 }
392 Err(e) => {
393 let error_msg = format!("Sync failed: LLM processing error: {}", e);
394 warn!(
395 "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
396 page.id, error_msg
397 );
398 if let Err(db_err) =
399 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
400 conn, page.id, &error_msg,
401 )
402 .await
403 {
404 warn!(
405 "Failed to record sync error for page {}: {:?}",
406 page.id, db_err
407 );
408 }
409 serde_json::to_string(&sanitized_blocks)?
411 }
412 };
413
414 let blob_path = generate_blob_path(page)?;
415 let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
416 match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
417 Ok(c) => Some(c),
418 Err(e) => {
419 debug!("Chapter lookup failed for page {}: {}", page.id, e);
420 None
421 }
422 }
423 } else {
424 None
425 };
426
427 allowed_file_paths.push(blob_path.clone());
428 let mut metadata = HashMap::new();
429 metadata.insert("url".to_string(), url_encode(page_url.as_ref()));
433 metadata.insert("title".to_string(), url_encode(&page.title));
434 metadata.insert(
435 "course_id".to_string(),
436 page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
437 );
438 metadata.insert(
439 "language".to_string(),
440 course.language_code.to_string().into(),
441 );
442 metadata.insert("filepath".to_string(), blob_path.clone().into());
443 if let Some(c) = chapter {
444 metadata.insert(
445 "chunk_context".to_string(),
446 url_encode(&format!(
447 "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
448 page.title, c.chapter_number, c.name, course.name,
449 )),
450 );
451 } else {
452 metadata.insert(
453 "chunk_context".to_string(),
454 url_encode(&format!(
455 "This chunk is a snippet from page {} of the course {}.",
456 page.title, course.name,
457 )),
458 );
459 }
460
461 if let Err(e) = blob_client
462 .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
463 .await
464 {
465 let error_msg = format!("Sync failed: Upload error: {}", e);
466 warn!("Failed to upload file {}: {:?}", blob_path, e);
467 if let Err(db_err) =
468 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
469 conn, page.id, &error_msg,
470 )
471 .await
472 {
473 warn!(
474 "Failed to record upload error for page {}: {:?}",
475 page.id, db_err
476 );
477 }
478 } else if let Some(history_id) = latest_histories.get(&page.id) {
479 let mut page_revision_map = HashMap::new();
480 page_revision_map.insert(page.id, history_id.id);
481 if let Err(e) =
482 headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
483 conn,
484 page_revision_map,
485 )
486 .await
487 {
488 let error_msg = format!("Sync failed: Status update error: {}", e);
489 warn!("Failed to update sync status for page {}: {:?}", page.id, e);
490 if let Err(db_err) =
491 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
492 conn, page.id, &error_msg,
493 )
494 .await
495 {
496 warn!(
497 "Failed to record status update error for page {}: {:?}",
498 page.id, db_err
499 );
500 }
501 }
502 }
503 }
504
505 Ok(())
506}
507
508fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
510 let course_id = page
511 .course_id
512 .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
513
514 Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
515}
516
517async fn delete_old_files(
520 conn: &mut PgConnection,
521 course_id: Uuid,
522 blob_client: &AzureBlobClient,
523) -> anyhow::Result<()> {
524 let mut courses_prefix = "courses/".to_string();
525 courses_prefix.push_str(&course_id.to_string());
526 let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
527
528 let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
529 conn,
530 course_id,
531 PageVisibility::Public,
532 )
533 .await?;
534
535 let allowed_paths: HashSet<String> = pages
536 .iter()
537 .filter_map(|page| generate_blob_path(page).ok())
538 .collect();
539
540 for file in existing_files {
541 if !allowed_paths.contains(&file) {
542 info!("Deleting obsolete file: {}", file);
543 blob_client.delete_file(&file).await?;
544 }
545 }
546
547 Ok(())
548}