1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use chrono::Utc;
8use dotenvy::dotenv;
9use sqlx::{PgConnection, PgPool};
10use url::Url;
11use uuid::Uuid;
12
13use crate::config::program_config::ProgramConfig;
14use crate::setup_tracing;
15
16use headless_lms_base::config::ApplicationConfiguration;
17use headless_lms_chatbot::{
18 azure_blob_storage::AzureBlobClient,
19 azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
20 azure_search_index::{create_search_index, does_search_index_exist},
21 azure_search_indexer::{
22 check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
23 run_search_indexer_now,
24 },
25 azure_skillset::{create_skillset, does_skillset_exist},
26 content_cleaner::convert_material_blocks_to_markdown_with_llm,
27};
28use headless_lms_models::{
29 application_task_default_language_models::ApplicationTask,
30 chapters::DatabaseChapter,
31 pages::{Page, PageVisibility},
32};
33use headless_lms_utils::{
34 document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
35 url_encoding::url_encode,
36};
37
38const SYNC_INTERVAL_SECS: u64 = 10;
39const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
40const FAILURE_COOLDOWN_SECS: i64 = 300;
41const MAX_CONSECUTIVE_FAILURES: i32 = 5;
42
43pub async fn main() -> anyhow::Result<()> {
44 initialize_environment()?;
45 let config = initialize_configuration().await?;
46 if config.app_configuration.azure_configuration.is_none() {
47 warn!("Azure configuration not provided. Not running chatbot syncer.");
48 loop {
50 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
51 }
52 }
53 if config.app_configuration.test_chatbot {
54 warn!(
55 "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
56 );
57 loop {
59 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
60 }
61 }
62
63 let db_pool = initialize_database_pool(&config.database_url).await?;
64 let mut conn = db_pool.acquire().await?;
65 let blob_client = initialize_blob_client(&config).await?;
66
67 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
68 let mut ticks = 0;
69
70 info!("Starting chatbot syncer.");
71
72 loop {
73 interval.tick().await;
74 ticks += 1;
75
76 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
77 ticks = 0;
78 info!("Still syncing for chatbot.");
79 }
80 if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
81 error!("Error during synchronization: {:?}", e);
82 }
83 }
84}
85
86fn initialize_environment() -> anyhow::Result<()> {
87 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
89 dotenv().ok();
90 setup_tracing()?;
91 Ok(())
92}
93
94struct SyncerConfig {
95 database_url: String,
96 name: String,
97 app_configuration: ApplicationConfiguration,
98}
99
100async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
101 let database_url = ProgramConfig::database_url_with_default();
102 let base_url_raw = ProgramConfig::required("BASE_URL")?;
103 let base_url =
104 Url::parse(&base_url_raw).map_err(|e| anyhow::anyhow!("invalid BASE_URL: {}", e))?;
105
106 let name = base_url
107 .host_str()
108 .ok_or_else(|| anyhow::anyhow!("BASE_URL must have a host"))?
109 .replace(".", "-");
110
111 let app_configuration = ApplicationConfiguration::try_from_env()?;
112
113 Ok(SyncerConfig {
114 database_url,
115 name,
116 app_configuration,
117 })
118}
119
120async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
122 PgPool::connect(database_url).await.map_err(|e| {
123 anyhow::anyhow!(
124 "Failed to connect to the database at {}: {:?}",
125 database_url,
126 e
127 )
128 })
129}
130
131async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
133 let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
134 blob_client.ensure_container_exists().await?;
135 Ok(blob_client)
136}
137
138async fn sync_pages(
140 conn: &mut PgConnection,
141 config: &SyncerConfig,
142 blob_client: &AzureBlobClient,
143) -> anyhow::Result<()> {
144 let base_url = Url::parse(&config.app_configuration.base_url)?;
145 let chatbot_configs =
146 headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
147
148 let course_ids: Vec<Uuid> = chatbot_configs
149 .iter()
150 .map(|config| config.course_id)
151 .collect::<HashSet<_>>()
152 .into_iter()
153 .collect();
154
155 let sync_statuses =
156 headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
157 conn,
158 &course_ids,
159 )
160 .await?;
161
162 let latest_history_ids =
163 headless_lms_models::page_history::get_latest_page_history_ids_by_course_ids(
164 conn,
165 &course_ids,
166 )
167 .await?;
168
169 let shared_index_name = config.name.clone();
170 ensure_search_index_exists(
171 &shared_index_name,
172 &config.app_configuration,
173 &blob_client.container_name,
174 )
175 .await?;
176
177 if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
178 warn!("Search indexer is not ready to index. Skipping synchronization.");
179 return Ok(());
180 }
181
182 let mut any_changes = false;
183
184 for (course_id, statuses) in sync_statuses.iter() {
185 let page_ids: Vec<Uuid> = statuses.iter().map(|s| s.page_id).collect();
186 let public_pages_set: HashSet<Uuid> =
187 headless_lms_models::pages::get_by_ids_and_visibility(
188 conn,
189 &page_ids,
190 PageVisibility::Public,
191 )
192 .await?
193 .into_iter()
194 .map(|p| p.id)
195 .collect();
196
197 let outdated_statuses: Vec<_> = statuses
198 .iter()
199 .filter(|status| {
200 if !public_pages_set.contains(&status.page_id) {
201 return false;
202 }
203
204 let is_outdated = latest_history_ids
205 .get(&status.page_id)
206 .is_some_and(|history_id| {
207 status.synced_page_revision_id != Some(*history_id)
208 });
209
210 if !is_outdated {
211 return false;
212 }
213
214 if status.consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
215 debug!(
216 "Skipping page {} due to permanent failure ({} consecutive failures). Manual intervention required.",
217 status.page_id, status.consecutive_failures
218 );
219 return false;
220 }
221
222 if let Some(error_msg) = &status.error_message
223 && !error_msg.is_empty() {
224 let error_age_seconds = (Utc::now() - status.updated_at).num_seconds();
225 if error_age_seconds < FAILURE_COOLDOWN_SECS {
226 debug!(
227 "Skipping page {} due to recent failure ({} seconds ago, {} consecutive failures): {}",
228 status.page_id, error_age_seconds, status.consecutive_failures, error_msg
229 );
230 return false;
231 }
232 }
233
234 true
235 })
236 .collect();
237
238 if outdated_statuses.is_empty() {
239 continue;
240 }
241
242 any_changes = true;
243 info!(
244 "Syncing {} pages for course id: {}.",
245 outdated_statuses.len(),
246 course_id
247 );
248 for status in &outdated_statuses {
249 info!(
250 "Page id: {}, synced page revision id: {:?}.",
251 status.page_id, status.synced_page_revision_id
252 );
253 }
254
255 let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
256 let pages = headless_lms_models::pages::get_by_ids_and_visibility(
257 conn,
258 &page_ids,
259 PageVisibility::Public,
260 )
261 .await?;
262
263 if !pages.is_empty() {
264 sync_pages_batch(
265 conn,
266 &pages,
267 blob_client,
268 &base_url,
269 &config.app_configuration,
270 &latest_history_ids,
271 )
272 .await?;
273 } else {
274 info!("No pages to sync for course id: {}.", course_id);
275 }
276
277 let hidden_page_ids: Vec<Uuid> = statuses
278 .iter()
279 .filter(|status| {
280 !public_pages_set.contains(&status.page_id)
281 && status.synced_page_revision_id.is_some()
282 })
283 .map(|s| s.page_id)
284 .collect();
285
286 if !hidden_page_ids.is_empty() {
287 info!(
288 "Clearing sync statuses for {} hidden pages: {:?}",
289 hidden_page_ids.len(),
290 hidden_page_ids
291 );
292 headless_lms_models::chatbot_page_sync_statuses::clear_sync_statuses(
293 conn,
294 &hidden_page_ids,
295 )
296 .await?;
297 }
298
299 delete_old_files(conn, *course_id, blob_client).await?;
300 }
301
302 if any_changes {
303 run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
304 info!("New files have been synced and the search indexer has been started.");
305 }
306
307 Ok(())
308}
309
310async fn ensure_search_index_exists(
312 name: &str,
313 app_config: &ApplicationConfiguration,
314 container_name: &str,
315) -> anyhow::Result<()> {
316 if !does_search_index_exist(name, app_config).await? {
317 create_search_index(name.to_owned(), app_config).await?;
318 }
319 if !does_skillset_exist(name, app_config).await? {
320 create_skillset(name, name, app_config).await?;
321 }
322 if !does_azure_datasource_exist(name, app_config).await? {
323 create_azure_datasource(name, container_name, app_config).await?;
324 }
325 if !does_search_indexer_exist(name, app_config).await? {
326 create_search_indexer(name, name, name, name, app_config).await?;
327 }
328
329 Ok(())
330}
331
332async fn sync_pages_batch(
334 conn: &mut PgConnection,
335 pages: &[Page],
336 blob_client: &AzureBlobClient,
337 base_url: &Url,
338 app_config: &ApplicationConfiguration,
339 latest_history_ids: &HashMap<Uuid, Uuid>,
340) -> anyhow::Result<()> {
341 let course_id = pages
342 .first()
343 .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
344 .course_id
345 .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
346
347 let course = headless_lms_models::courses::get_course(conn, course_id).await?;
348 let organization =
349 headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
350 let task_lm = headless_lms_models::application_task_default_language_models::get_for_task(
351 conn,
352 ApplicationTask::ContentCleaning,
353 )
354 .await?;
355
356 let mut base_url = base_url.clone();
357 base_url.set_path(&format!(
358 "/org/{}/courses/{}",
359 organization.slug, course.slug
360 ));
361
362 let mut allowed_file_paths = Vec::new();
363
364 for page in pages {
365 info!("Syncing page id: {}.", page.id);
366
367 let mut page_url = base_url.clone();
368 page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
369
370 let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
371 let sanitized_blocks = remove_sensitive_attributes(parsed_content);
372
373 let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
374 &sanitized_blocks,
375 app_config,
376 &task_lm,
377 )
378 .await
379 {
380 Ok(markdown) => {
381 info!("Successfully cleaned content for page {}", page.id);
382 if markdown.trim().is_empty() {
384 warn!(
385 "Markdown is empty for page {}. Generating fallback content with a fake heading.",
386 page.id
387 );
388 format!("# {}", page.title)
389 } else {
390 markdown
391 }
392 }
393 Err(e) => {
394 let error_msg = format!("Sync failed: LLM processing error: {}", e);
395 warn!(
396 "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
397 page.id, error_msg
398 );
399 if let Err(db_err) =
400 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
401 conn, page.id, &error_msg,
402 )
403 .await
404 {
405 warn!(
406 "Failed to record sync error for page {}: {:?}",
407 page.id, db_err
408 );
409 }
410 serde_json::to_string(&sanitized_blocks)?
412 }
413 };
414
415 let blob_path = generate_blob_path(page)?;
416 let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
417 match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
418 Ok(c) => Some(c),
419 Err(e) => {
420 debug!("Chapter lookup failed for page {}: {}", page.id, e);
421 None
422 }
423 }
424 } else {
425 None
426 };
427
428 allowed_file_paths.push(blob_path.clone());
429 let mut metadata = HashMap::new();
430 metadata.insert("url".to_string(), url_encode(page_url.as_ref()));
434 metadata.insert("title".to_string(), url_encode(&page.title));
435 metadata.insert(
436 "course_id".to_string(),
437 page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
438 );
439 metadata.insert(
440 "language".to_string(),
441 course.language_code.to_string().into(),
442 );
443 metadata.insert("filepath".to_string(), blob_path.clone().into());
444 if let Some(c) = chapter {
445 metadata.insert(
446 "chunk_context".to_string(),
447 url_encode(&format!(
448 "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
449 page.title, c.chapter_number, c.name, course.name,
450 )),
451 );
452 } else {
453 metadata.insert(
454 "chunk_context".to_string(),
455 url_encode(&format!(
456 "This chunk is a snippet from page {} of the course {}.",
457 page.title, course.name,
458 )),
459 );
460 }
461
462 if let Err(e) = blob_client
463 .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
464 .await
465 {
466 let error_msg = format!("Sync failed: Upload error: {}", e);
467 warn!("Failed to upload file {}: {:?}", blob_path, e);
468 if let Err(db_err) =
469 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
470 conn, page.id, &error_msg,
471 )
472 .await
473 {
474 warn!(
475 "Failed to record upload error for page {}: {:?}",
476 page.id, db_err
477 );
478 }
479 } else if let Some(history_id) = latest_history_ids.get(&page.id) {
480 let mut page_revision_map = HashMap::new();
481 page_revision_map.insert(page.id, *history_id);
482 if let Err(e) =
483 headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
484 conn,
485 page_revision_map,
486 )
487 .await
488 {
489 let error_msg = format!("Sync failed: Status update error: {}", e);
490 warn!("Failed to update sync status for page {}: {:?}", page.id, e);
491 if let Err(db_err) =
492 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
493 conn, page.id, &error_msg,
494 )
495 .await
496 {
497 warn!(
498 "Failed to record status update error for page {}: {:?}",
499 page.id, db_err
500 );
501 }
502 }
503 }
504 }
505
506 Ok(())
507}
508
509fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
511 let course_id = page
512 .course_id
513 .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
514
515 Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
516}
517
518async fn delete_old_files(
521 conn: &mut PgConnection,
522 course_id: Uuid,
523 blob_client: &AzureBlobClient,
524) -> anyhow::Result<()> {
525 let mut courses_prefix = "courses/".to_string();
526 courses_prefix.push_str(&course_id.to_string());
527 let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
528
529 let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
530 conn,
531 course_id,
532 PageVisibility::Public,
533 )
534 .await?;
535
536 let allowed_paths: HashSet<String> = pages
537 .iter()
538 .filter_map(|page| generate_blob_path(page).ok())
539 .collect();
540
541 for file in existing_files {
542 if !allowed_paths.contains(&file) {
543 info!("Deleting obsolete file: {}", file);
544 blob_client.delete_file(&file).await?;
545 }
546 }
547
548 Ok(())
549}