1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use chrono::Utc;
8use dotenv::dotenv;
9use sqlx::{PgConnection, PgPool};
10use url::Url;
11use uuid::Uuid;
12
13use crate::setup_tracing;
14
15use headless_lms_chatbot::{
16 azure_blob_storage::AzureBlobClient,
17 azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
18 azure_search_index::{create_search_index, does_search_index_exist},
19 azure_search_indexer::{
20 check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
21 run_search_indexer_now,
22 },
23 azure_skillset::{create_skillset, does_skillset_exist},
24 content_cleaner::convert_material_blocks_to_markdown_with_llm,
25};
26use headless_lms_models::{
27 chapters::DatabaseChapter,
28 page_history::PageHistory,
29 pages::{Page, PageVisibility},
30};
31use headless_lms_utils::{
32 ApplicationConfiguration,
33 document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
34 url_encoding::url_encode,
35};
36
37const SYNC_INTERVAL_SECS: u64 = 10;
38const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
39const FAILURE_COOLDOWN_SECS: i64 = 300;
40const MAX_CONSECUTIVE_FAILURES: i32 = 5;
41
42pub async fn main() -> anyhow::Result<()> {
43 initialize_environment()?;
44 let config = initialize_configuration().await?;
45 if config.app_configuration.azure_configuration.is_none() {
46 warn!("Azure configuration not provided. Not running chatbot syncer.");
47 loop {
49 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
50 }
51 }
52 if config.app_configuration.test_chatbot {
53 warn!(
54 "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
55 );
56 loop {
58 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
59 }
60 }
61
62 let db_pool = initialize_database_pool(&config.database_url).await?;
63 let mut conn = db_pool.acquire().await?;
64 let blob_client = initialize_blob_client(&config).await?;
65
66 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
67 let mut ticks = 0;
68
69 info!("Starting chatbot syncer.");
70
71 loop {
72 interval.tick().await;
73 ticks += 1;
74
75 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
76 ticks = 0;
77 info!("Still syncing for chatbot.");
78 }
79 if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
80 error!("Error during synchronization: {:?}", e);
81 }
82 }
83}
84
85fn initialize_environment() -> anyhow::Result<()> {
86 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
88 dotenv().ok();
89 setup_tracing()?;
90 Ok(())
91}
92
93struct SyncerConfig {
94 database_url: String,
95 name: String,
96 app_configuration: ApplicationConfiguration,
97}
98
99async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
100 let database_url = env::var("DATABASE_URL")
101 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
102
103 let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
104 .expect("BASE_URL must be a valid URL");
105
106 let name = base_url
107 .host_str()
108 .expect("BASE_URL must have a host")
109 .replace(".", "-");
110
111 let app_configuration = ApplicationConfiguration::try_from_env()?;
112
113 Ok(SyncerConfig {
114 database_url,
115 name,
116 app_configuration,
117 })
118}
119
120async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
122 PgPool::connect(database_url).await.map_err(|e| {
123 anyhow::anyhow!(
124 "Failed to connect to the database at {}: {:?}",
125 database_url,
126 e
127 )
128 })
129}
130
131async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
133 let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
134 blob_client.ensure_container_exists().await?;
135 Ok(blob_client)
136}
137
138async fn sync_pages(
140 conn: &mut PgConnection,
141 config: &SyncerConfig,
142 blob_client: &AzureBlobClient,
143) -> anyhow::Result<()> {
144 let base_url = Url::parse(&config.app_configuration.base_url)?;
145 let chatbot_configs =
146 headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
147
148 let course_ids: Vec<Uuid> = chatbot_configs
149 .iter()
150 .map(|config| config.course_id)
151 .collect::<HashSet<_>>()
152 .into_iter()
153 .collect();
154
155 let sync_statuses =
156 headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
157 conn,
158 &course_ids,
159 )
160 .await?;
161
162 let latest_histories =
163 headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
164 conn,
165 &course_ids,
166 )
167 .await?;
168
169 let shared_index_name = config.name.clone();
170 ensure_search_index_exists(
171 &shared_index_name,
172 &config.app_configuration,
173 &blob_client.container_name,
174 )
175 .await?;
176
177 if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
178 warn!("Search indexer is not ready to index. Skipping synchronization.");
179 return Ok(());
180 }
181
182 let mut any_changes = false;
183
184 for (course_id, statuses) in sync_statuses.iter() {
185 let page_ids: Vec<Uuid> = statuses.iter().map(|s| s.page_id).collect();
186 let public_pages_set: HashSet<Uuid> =
187 headless_lms_models::pages::get_by_ids_and_visibility(
188 conn,
189 &page_ids,
190 PageVisibility::Public,
191 )
192 .await?
193 .into_iter()
194 .map(|p| p.id)
195 .collect();
196
197 let outdated_statuses: Vec<_> = statuses
198 .iter()
199 .filter(|status| {
200 if !public_pages_set.contains(&status.page_id) {
201 return false;
202 }
203
204 let is_outdated = latest_histories
205 .get(&status.page_id)
206 .is_some_and(|history| status.synced_page_revision_id != Some(history.id));
207
208 if !is_outdated {
209 return false;
210 }
211
212 if status.consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
213 debug!(
214 "Skipping page {} due to permanent failure ({} consecutive failures). Manual intervention required.",
215 status.page_id, status.consecutive_failures
216 );
217 return false;
218 }
219
220 if let Some(error_msg) = &status.error_message
221 && !error_msg.is_empty() {
222 let error_age_seconds = (Utc::now() - status.updated_at).num_seconds();
223 if error_age_seconds < FAILURE_COOLDOWN_SECS {
224 debug!(
225 "Skipping page {} due to recent failure ({} seconds ago, {} consecutive failures): {}",
226 status.page_id, error_age_seconds, status.consecutive_failures, error_msg
227 );
228 return false;
229 }
230 }
231
232 true
233 })
234 .collect();
235
236 if outdated_statuses.is_empty() {
237 continue;
238 }
239
240 any_changes = true;
241 info!(
242 "Syncing {} pages for course id: {}.",
243 outdated_statuses.len(),
244 course_id
245 );
246 for status in &outdated_statuses {
247 info!(
248 "Page id: {}, synced page revision id: {:?}.",
249 status.page_id, status.synced_page_revision_id
250 );
251 }
252
253 let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
254 let pages = headless_lms_models::pages::get_by_ids_and_visibility(
255 conn,
256 &page_ids,
257 PageVisibility::Public,
258 )
259 .await?;
260
261 if !pages.is_empty() {
262 sync_pages_batch(
263 conn,
264 &pages,
265 blob_client,
266 &base_url,
267 &config.app_configuration,
268 &latest_histories,
269 )
270 .await?;
271 } else {
272 info!("No pages to sync for course id: {}.", course_id);
273 }
274
275 let hidden_page_ids: Vec<Uuid> = statuses
276 .iter()
277 .filter(|status| {
278 !public_pages_set.contains(&status.page_id)
279 && status.synced_page_revision_id.is_some()
280 })
281 .map(|s| s.page_id)
282 .collect();
283
284 if !hidden_page_ids.is_empty() {
285 info!(
286 "Clearing sync statuses for {} hidden pages: {:?}",
287 hidden_page_ids.len(),
288 hidden_page_ids
289 );
290 headless_lms_models::chatbot_page_sync_statuses::clear_sync_statuses(
291 conn,
292 &hidden_page_ids,
293 )
294 .await?;
295 }
296
297 delete_old_files(conn, *course_id, blob_client).await?;
298 }
299
300 if any_changes {
301 run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
302 info!("New files have been synced and the search indexer has been started.");
303 }
304
305 Ok(())
306}
307
308async fn ensure_search_index_exists(
310 name: &str,
311 app_config: &ApplicationConfiguration,
312 container_name: &str,
313) -> anyhow::Result<()> {
314 if !does_search_index_exist(name, app_config).await? {
315 create_search_index(name.to_owned(), app_config).await?;
316 }
317 if !does_skillset_exist(name, app_config).await? {
318 create_skillset(name, name, app_config).await?;
319 }
320 if !does_azure_datasource_exist(name, app_config).await? {
321 create_azure_datasource(name, container_name, app_config).await?;
322 }
323 if !does_search_indexer_exist(name, app_config).await? {
324 create_search_indexer(name, name, name, name, app_config).await?;
325 }
326
327 Ok(())
328}
329
330async fn sync_pages_batch(
332 conn: &mut PgConnection,
333 pages: &[Page],
334 blob_client: &AzureBlobClient,
335 base_url: &Url,
336 app_config: &ApplicationConfiguration,
337 latest_histories: &HashMap<Uuid, PageHistory>,
338) -> anyhow::Result<()> {
339 let course_id = pages
340 .first()
341 .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
342 .course_id
343 .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
344
345 let course = headless_lms_models::courses::get_course(conn, course_id).await?;
346 let organization =
347 headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
348
349 let mut base_url = base_url.clone();
350 base_url.set_path(&format!(
351 "/org/{}/courses/{}",
352 organization.slug, course.slug
353 ));
354
355 let mut allowed_file_paths = Vec::new();
356
357 for page in pages {
358 info!("Syncing page id: {}.", page.id);
359
360 let mut page_url = base_url.clone();
361 page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
362
363 let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
364 let sanitized_blocks = remove_sensitive_attributes(parsed_content);
365
366 let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
367 &sanitized_blocks,
368 app_config,
369 )
370 .await
371 {
372 Ok(markdown) => {
373 info!("Successfully cleaned content for page {}", page.id);
374 if markdown.trim().is_empty() {
376 warn!(
377 "Markdown is empty for page {}. Generating fallback content with a fake heading.",
378 page.id
379 );
380 format!("# {}", page.title)
381 } else {
382 markdown
383 }
384 }
385 Err(e) => {
386 let error_msg = format!("Sync failed: LLM processing error: {}", e);
387 warn!(
388 "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
389 page.id, error_msg
390 );
391 if let Err(db_err) =
392 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
393 conn, page.id, &error_msg,
394 )
395 .await
396 {
397 warn!(
398 "Failed to record sync error for page {}: {:?}",
399 page.id, db_err
400 );
401 }
402 serde_json::to_string(&sanitized_blocks)?
404 }
405 };
406
407 let blob_path = generate_blob_path(page)?;
408 let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
409 match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
410 Ok(c) => Some(c),
411 Err(e) => {
412 debug!("Chapter lookup failed for page {}: {}", page.id, e);
413 None
414 }
415 }
416 } else {
417 None
418 };
419
420 allowed_file_paths.push(blob_path.clone());
421 let mut metadata = HashMap::new();
422 metadata.insert("url".to_string(), url_encode(page_url.as_ref()));
426 metadata.insert("title".to_string(), url_encode(&page.title));
427 metadata.insert(
428 "course_id".to_string(),
429 page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
430 );
431 metadata.insert(
432 "language".to_string(),
433 course.language_code.to_string().into(),
434 );
435 metadata.insert("filepath".to_string(), blob_path.clone().into());
436 if let Some(c) = chapter {
437 metadata.insert(
438 "chunk_context".to_string(),
439 url_encode(&format!(
440 "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
441 page.title, c.chapter_number, c.name, course.name,
442 )),
443 );
444 } else {
445 metadata.insert(
446 "chunk_context".to_string(),
447 url_encode(&format!(
448 "This chunk is a snippet from page {} of the course {}.",
449 page.title, course.name,
450 )),
451 );
452 }
453
454 if let Err(e) = blob_client
455 .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
456 .await
457 {
458 let error_msg = format!("Sync failed: Upload error: {}", e);
459 warn!("Failed to upload file {}: {:?}", blob_path, e);
460 if let Err(db_err) =
461 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
462 conn, page.id, &error_msg,
463 )
464 .await
465 {
466 warn!(
467 "Failed to record upload error for page {}: {:?}",
468 page.id, db_err
469 );
470 }
471 } else if let Some(history_id) = latest_histories.get(&page.id) {
472 let mut page_revision_map = HashMap::new();
473 page_revision_map.insert(page.id, history_id.id);
474 if let Err(e) =
475 headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
476 conn,
477 page_revision_map,
478 )
479 .await
480 {
481 let error_msg = format!("Sync failed: Status update error: {}", e);
482 warn!("Failed to update sync status for page {}: {:?}", page.id, e);
483 if let Err(db_err) =
484 headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
485 conn, page.id, &error_msg,
486 )
487 .await
488 {
489 warn!(
490 "Failed to record status update error for page {}: {:?}",
491 page.id, db_err
492 );
493 }
494 }
495 }
496 }
497
498 Ok(())
499}
500
501fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
503 let course_id = page
504 .course_id
505 .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
506
507 Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
508}
509
510async fn delete_old_files(
513 conn: &mut PgConnection,
514 course_id: Uuid,
515 blob_client: &AzureBlobClient,
516) -> anyhow::Result<()> {
517 let mut courses_prefix = "courses/".to_string();
518 courses_prefix.push_str(&course_id.to_string());
519 let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
520
521 let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
522 conn,
523 course_id,
524 PageVisibility::Public,
525 )
526 .await?;
527
528 let allowed_paths: HashSet<String> = pages
529 .iter()
530 .filter_map(|page| generate_blob_path(page).ok())
531 .collect();
532
533 for file in existing_files {
534 if !allowed_paths.contains(&file) {
535 info!("Deleting obsolete file: {}", file);
536 blob_client.delete_file(&file).await?;
537 }
538 }
539
540 Ok(())
541}