headless_lms_server/programs/
chatbot_syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use chrono::Utc;
8use dotenv::dotenv;
9use sqlx::{PgConnection, PgPool};
10use url::Url;
11use uuid::Uuid;
12
13use crate::setup_tracing;
14
15use headless_lms_chatbot::{
16    azure_blob_storage::AzureBlobClient,
17    azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
18    azure_search_index::{create_search_index, does_search_index_exist},
19    azure_search_indexer::{
20        check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
21        run_search_indexer_now,
22    },
23    azure_skillset::{create_skillset, does_skillset_exist},
24    content_cleaner::convert_material_blocks_to_markdown_with_llm,
25};
26use headless_lms_models::{
27    application_task_default_language_models::ApplicationTask,
28    chapters::DatabaseChapter,
29    page_history::PageHistory,
30    pages::{Page, PageVisibility},
31};
32use headless_lms_utils::{
33    ApplicationConfiguration,
34    document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
35    url_encoding::url_encode,
36};
37
38const SYNC_INTERVAL_SECS: u64 = 10;
39const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
40const FAILURE_COOLDOWN_SECS: i64 = 300;
41const MAX_CONSECUTIVE_FAILURES: i32 = 5;
42
43pub async fn main() -> anyhow::Result<()> {
44    initialize_environment()?;
45    let config = initialize_configuration().await?;
46    if config.app_configuration.azure_configuration.is_none() {
47        warn!("Azure configuration not provided. Not running chatbot syncer.");
48        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
49        loop {
50            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
51        }
52    }
53    if config.app_configuration.test_chatbot {
54        warn!(
55            "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
56        );
57        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
58        loop {
59            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
60        }
61    }
62
63    let db_pool = initialize_database_pool(&config.database_url).await?;
64    let mut conn = db_pool.acquire().await?;
65    let blob_client = initialize_blob_client(&config).await?;
66
67    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
68    let mut ticks = 0;
69
70    info!("Starting chatbot syncer.");
71
72    loop {
73        interval.tick().await;
74        ticks += 1;
75
76        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
77            ticks = 0;
78            info!("Still syncing for chatbot.");
79        }
80        if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
81            error!("Error during synchronization: {:?}", e);
82        }
83    }
84}
85
86fn initialize_environment() -> anyhow::Result<()> {
87    // TODO: Audit that the environment access only happens in single-threaded code.
88    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
89    dotenv().ok();
90    setup_tracing()?;
91    Ok(())
92}
93
94struct SyncerConfig {
95    database_url: String,
96    name: String,
97    app_configuration: ApplicationConfiguration,
98}
99
100async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
101    let database_url = env::var("DATABASE_URL")
102        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
103
104    let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
105        .expect("BASE_URL must be a valid URL");
106
107    let name = base_url
108        .host_str()
109        .expect("BASE_URL must have a host")
110        .replace(".", "-");
111
112    let app_configuration = ApplicationConfiguration::try_from_env()?;
113
114    Ok(SyncerConfig {
115        database_url,
116        name,
117        app_configuration,
118    })
119}
120
121/// Initializes the PostgreSQL connection pool.
122async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
123    PgPool::connect(database_url).await.map_err(|e| {
124        anyhow::anyhow!(
125            "Failed to connect to the database at {}: {:?}",
126            database_url,
127            e
128        )
129    })
130}
131
132/// Initializes the Azure Blob Storage client.
133async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
134    let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
135    blob_client.ensure_container_exists().await?;
136    Ok(blob_client)
137}
138
139/// Synchronizes pages to the chatbot backend.
140async fn sync_pages(
141    conn: &mut PgConnection,
142    config: &SyncerConfig,
143    blob_client: &AzureBlobClient,
144) -> anyhow::Result<()> {
145    let base_url = Url::parse(&config.app_configuration.base_url)?;
146    let chatbot_configs =
147        headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
148
149    let course_ids: Vec<Uuid> = chatbot_configs
150        .iter()
151        .map(|config| config.course_id)
152        .collect::<HashSet<_>>()
153        .into_iter()
154        .collect();
155
156    let sync_statuses =
157        headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
158            conn,
159            &course_ids,
160        )
161        .await?;
162
163    let latest_histories =
164        headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
165            conn,
166            &course_ids,
167        )
168        .await?;
169
170    let shared_index_name = config.name.clone();
171    ensure_search_index_exists(
172        &shared_index_name,
173        &config.app_configuration,
174        &blob_client.container_name,
175    )
176    .await?;
177
178    if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
179        warn!("Search indexer is not ready to index. Skipping synchronization.");
180        return Ok(());
181    }
182
183    let mut any_changes = false;
184
185    for (course_id, statuses) in sync_statuses.iter() {
186        let page_ids: Vec<Uuid> = statuses.iter().map(|s| s.page_id).collect();
187        let public_pages_set: HashSet<Uuid> =
188            headless_lms_models::pages::get_by_ids_and_visibility(
189                conn,
190                &page_ids,
191                PageVisibility::Public,
192            )
193            .await?
194            .into_iter()
195            .map(|p| p.id)
196            .collect();
197
198        let outdated_statuses: Vec<_> = statuses
199            .iter()
200            .filter(|status| {
201                if !public_pages_set.contains(&status.page_id) {
202                    return false;
203                }
204
205                let is_outdated = latest_histories
206                    .get(&status.page_id)
207                    .is_some_and(|history| status.synced_page_revision_id != Some(history.id));
208
209                if !is_outdated {
210                    return false;
211                }
212
213                if status.consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
214                    debug!(
215                        "Skipping page {} due to permanent failure ({} consecutive failures). Manual intervention required.",
216                        status.page_id, status.consecutive_failures
217                    );
218                    return false;
219                }
220
221                if let Some(error_msg) = &status.error_message
222                    && !error_msg.is_empty() {
223                        let error_age_seconds = (Utc::now() - status.updated_at).num_seconds();
224                        if error_age_seconds < FAILURE_COOLDOWN_SECS {
225                            debug!(
226                                "Skipping page {} due to recent failure ({} seconds ago, {} consecutive failures): {}",
227                                status.page_id, error_age_seconds, status.consecutive_failures, error_msg
228                            );
229                            return false;
230                        }
231                    }
232
233                true
234            })
235            .collect();
236
237        if outdated_statuses.is_empty() {
238            continue;
239        }
240
241        any_changes = true;
242        info!(
243            "Syncing {} pages for course id: {}.",
244            outdated_statuses.len(),
245            course_id
246        );
247        for status in &outdated_statuses {
248            info!(
249                "Page id: {}, synced page revision id: {:?}.",
250                status.page_id, status.synced_page_revision_id
251            );
252        }
253
254        let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
255        let pages = headless_lms_models::pages::get_by_ids_and_visibility(
256            conn,
257            &page_ids,
258            PageVisibility::Public,
259        )
260        .await?;
261
262        if !pages.is_empty() {
263            sync_pages_batch(
264                conn,
265                &pages,
266                blob_client,
267                &base_url,
268                &config.app_configuration,
269                &latest_histories,
270            )
271            .await?;
272        } else {
273            info!("No pages to sync for course id: {}.", course_id);
274        }
275
276        let hidden_page_ids: Vec<Uuid> = statuses
277            .iter()
278            .filter(|status| {
279                !public_pages_set.contains(&status.page_id)
280                    && status.synced_page_revision_id.is_some()
281            })
282            .map(|s| s.page_id)
283            .collect();
284
285        if !hidden_page_ids.is_empty() {
286            info!(
287                "Clearing sync statuses for {} hidden pages: {:?}",
288                hidden_page_ids.len(),
289                hidden_page_ids
290            );
291            headless_lms_models::chatbot_page_sync_statuses::clear_sync_statuses(
292                conn,
293                &hidden_page_ids,
294            )
295            .await?;
296        }
297
298        delete_old_files(conn, *course_id, blob_client).await?;
299    }
300
301    if any_changes {
302        run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
303        info!("New files have been synced and the search indexer has been started.");
304    }
305
306    Ok(())
307}
308
309/// Ensures that the specified search index exists, creating it if necessary.
310async fn ensure_search_index_exists(
311    name: &str,
312    app_config: &ApplicationConfiguration,
313    container_name: &str,
314) -> anyhow::Result<()> {
315    if !does_search_index_exist(name, app_config).await? {
316        create_search_index(name.to_owned(), app_config).await?;
317    }
318    if !does_skillset_exist(name, app_config).await? {
319        create_skillset(name, name, app_config).await?;
320    }
321    if !does_azure_datasource_exist(name, app_config).await? {
322        create_azure_datasource(name, container_name, app_config).await?;
323    }
324    if !does_search_indexer_exist(name, app_config).await? {
325        create_search_indexer(name, name, name, name, app_config).await?;
326    }
327
328    Ok(())
329}
330
331/// Processes and synchronizes a batch of pages.
332async fn sync_pages_batch(
333    conn: &mut PgConnection,
334    pages: &[Page],
335    blob_client: &AzureBlobClient,
336    base_url: &Url,
337    app_config: &ApplicationConfiguration,
338    latest_histories: &HashMap<Uuid, PageHistory>,
339) -> anyhow::Result<()> {
340    let course_id = pages
341        .first()
342        .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
343        .course_id
344        .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
345
346    let course = headless_lms_models::courses::get_course(conn, course_id).await?;
347    let organization =
348        headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
349    let task_lm = headless_lms_models::application_task_default_language_models::get_for_task(
350        conn,
351        ApplicationTask::ContentCleaning,
352    )
353    .await?;
354
355    let mut base_url = base_url.clone();
356    base_url.set_path(&format!(
357        "/org/{}/courses/{}",
358        organization.slug, course.slug
359    ));
360
361    let mut allowed_file_paths = Vec::new();
362
363    for page in pages {
364        info!("Syncing page id: {}.", page.id);
365
366        let mut page_url = base_url.clone();
367        page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
368
369        let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
370        let sanitized_blocks = remove_sensitive_attributes(parsed_content);
371
372        let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
373            &sanitized_blocks,
374            app_config,
375            &task_lm,
376        )
377        .await
378        {
379            Ok(markdown) => {
380                info!("Successfully cleaned content for page {}", page.id);
381                // Check if the markdown is empty, or if it just contains all spaces or newlines
382                if markdown.trim().is_empty() {
383                    warn!(
384                        "Markdown is empty for page {}. Generating fallback content with a fake heading.",
385                        page.id
386                    );
387                    format!("# {}", page.title)
388                } else {
389                    markdown
390                }
391            }
392            Err(e) => {
393                let error_msg = format!("Sync failed: LLM processing error: {}", e);
394                warn!(
395                    "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
396                    page.id, error_msg
397                );
398                if let Err(db_err) =
399                    headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
400                        conn, page.id, &error_msg,
401                    )
402                    .await
403                {
404                    warn!(
405                        "Failed to record sync error for page {}: {:?}",
406                        page.id, db_err
407                    );
408                }
409                // Fallback to original content
410                serde_json::to_string(&sanitized_blocks)?
411            }
412        };
413
414        let blob_path = generate_blob_path(page)?;
415        let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
416            match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
417                Ok(c) => Some(c),
418                Err(e) => {
419                    debug!("Chapter lookup failed for page {}: {}", page.id, e);
420                    None
421                }
422            }
423        } else {
424            None
425        };
426
427        allowed_file_paths.push(blob_path.clone());
428        let mut metadata = HashMap::new();
429        // Azure Blob Storage metadata values must be ASCII-only. URL-encode values that may
430        // contain non-ASCII characters (e.g., Finnish characters like ä, ö) to ensure they
431        // are ASCII-compatible. We decode the url and the title before we save them in our database.
432        metadata.insert("url".to_string(), url_encode(page_url.as_ref()));
433        metadata.insert("title".to_string(), url_encode(&page.title));
434        metadata.insert(
435            "course_id".to_string(),
436            page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
437        );
438        metadata.insert(
439            "language".to_string(),
440            course.language_code.to_string().into(),
441        );
442        metadata.insert("filepath".to_string(), blob_path.clone().into());
443        if let Some(c) = chapter {
444            metadata.insert(
445                "chunk_context".to_string(),
446                url_encode(&format!(
447                    "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
448                    page.title, c.chapter_number, c.name, course.name,
449                )),
450            );
451        } else {
452            metadata.insert(
453                "chunk_context".to_string(),
454                url_encode(&format!(
455                    "This chunk is a snippet from page {} of the course {}.",
456                    page.title, course.name,
457                )),
458            );
459        }
460
461        if let Err(e) = blob_client
462            .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
463            .await
464        {
465            let error_msg = format!("Sync failed: Upload error: {}", e);
466            warn!("Failed to upload file {}: {:?}", blob_path, e);
467            if let Err(db_err) =
468                headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
469                    conn, page.id, &error_msg,
470                )
471                .await
472            {
473                warn!(
474                    "Failed to record upload error for page {}: {:?}",
475                    page.id, db_err
476                );
477            }
478        } else if let Some(history_id) = latest_histories.get(&page.id) {
479            let mut page_revision_map = HashMap::new();
480            page_revision_map.insert(page.id, history_id.id);
481            if let Err(e) =
482                headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
483                    conn,
484                    page_revision_map,
485                )
486                .await
487            {
488                let error_msg = format!("Sync failed: Status update error: {}", e);
489                warn!("Failed to update sync status for page {}: {:?}", page.id, e);
490                if let Err(db_err) =
491                    headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
492                        conn, page.id, &error_msg,
493                    )
494                    .await
495                {
496                    warn!(
497                        "Failed to record status update error for page {}: {:?}",
498                        page.id, db_err
499                    );
500                }
501            }
502        }
503    }
504
505    Ok(())
506}
507
508/// Generates the blob storage path for a given page.
509fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
510    let course_id = page
511        .course_id
512        .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
513
514    Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
515}
516
517/// Deletes files from blob storage that are no longer associated with any public page.
518/// This includes files for deleted pages, hidden pages, and any other pages that are no longer public.
519async fn delete_old_files(
520    conn: &mut PgConnection,
521    course_id: Uuid,
522    blob_client: &AzureBlobClient,
523) -> anyhow::Result<()> {
524    let mut courses_prefix = "courses/".to_string();
525    courses_prefix.push_str(&course_id.to_string());
526    let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
527
528    let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
529        conn,
530        course_id,
531        PageVisibility::Public,
532    )
533    .await?;
534
535    let allowed_paths: HashSet<String> = pages
536        .iter()
537        .filter_map(|page| generate_blob_path(page).ok())
538        .collect();
539
540    for file in existing_files {
541        if !allowed_paths.contains(&file) {
542            info!("Deleting obsolete file: {}", file);
543            blob_client.delete_file(&file).await?;
544        }
545    }
546
547    Ok(())
548}