Skip to main content

headless_lms_server/programs/
chatbot_syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use chrono::Utc;
8use dotenvy::dotenv;
9use sqlx::{PgConnection, PgPool};
10use url::Url;
11use uuid::Uuid;
12
13use crate::config::program_config::ProgramConfig;
14use crate::setup_tracing;
15
16use headless_lms_base::config::ApplicationConfiguration;
17use headless_lms_chatbot::{
18    azure_blob_storage::AzureBlobClient,
19    azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
20    azure_search_index::{create_search_index, does_search_index_exist},
21    azure_search_indexer::{
22        check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
23        run_search_indexer_now,
24    },
25    azure_skillset::{create_skillset, does_skillset_exist},
26    content_cleaner::convert_material_blocks_to_markdown_with_llm,
27};
28use headless_lms_models::{
29    application_task_default_language_models::ApplicationTask,
30    chapters::DatabaseChapter,
31    pages::{Page, PageVisibility},
32};
33use headless_lms_utils::{
34    document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
35    url_encoding::url_encode,
36};
37
38const SYNC_INTERVAL_SECS: u64 = 10;
39const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
40const FAILURE_COOLDOWN_SECS: i64 = 300;
41const MAX_CONSECUTIVE_FAILURES: i32 = 5;
42
43pub async fn main() -> anyhow::Result<()> {
44    initialize_environment()?;
45    let config = initialize_configuration().await?;
46    if config.app_configuration.azure_configuration.is_none() {
47        warn!("Azure configuration not provided. Not running chatbot syncer.");
48        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
49        loop {
50            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
51        }
52    }
53    if config.app_configuration.test_chatbot {
54        warn!(
55            "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
56        );
57        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
58        loop {
59            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
60        }
61    }
62
63    let db_pool = initialize_database_pool(&config.database_url).await?;
64    let mut conn = db_pool.acquire().await?;
65    let blob_client = initialize_blob_client(&config).await?;
66
67    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
68    let mut ticks = 0;
69
70    info!("Starting chatbot syncer.");
71
72    loop {
73        interval.tick().await;
74        ticks += 1;
75
76        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
77            ticks = 0;
78            info!("Still syncing for chatbot.");
79        }
80        if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
81            error!("Error during synchronization: {:?}", e);
82        }
83    }
84}
85
86fn initialize_environment() -> anyhow::Result<()> {
87    // TODO: Audit that the environment access only happens in single-threaded code.
88    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
89    dotenv().ok();
90    setup_tracing()?;
91    Ok(())
92}
93
94struct SyncerConfig {
95    database_url: String,
96    name: String,
97    app_configuration: ApplicationConfiguration,
98}
99
100async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
101    let database_url = ProgramConfig::database_url_with_default();
102    let base_url_raw = ProgramConfig::required("BASE_URL")?;
103    let base_url =
104        Url::parse(&base_url_raw).map_err(|e| anyhow::anyhow!("invalid BASE_URL: {}", e))?;
105
106    let name = base_url
107        .host_str()
108        .ok_or_else(|| anyhow::anyhow!("BASE_URL must have a host"))?
109        .replace(".", "-");
110
111    let app_configuration = ApplicationConfiguration::try_from_env()?;
112
113    Ok(SyncerConfig {
114        database_url,
115        name,
116        app_configuration,
117    })
118}
119
120/// Initializes the PostgreSQL connection pool.
121async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
122    PgPool::connect(database_url).await.map_err(|e| {
123        anyhow::anyhow!(
124            "Failed to connect to the database at {}: {:?}",
125            database_url,
126            e
127        )
128    })
129}
130
131/// Initializes the Azure Blob Storage client.
132async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
133    let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
134    blob_client.ensure_container_exists().await?;
135    Ok(blob_client)
136}
137
138/// Synchronizes pages to the chatbot backend.
139async fn sync_pages(
140    conn: &mut PgConnection,
141    config: &SyncerConfig,
142    blob_client: &AzureBlobClient,
143) -> anyhow::Result<()> {
144    let base_url = Url::parse(&config.app_configuration.base_url)?;
145    let chatbot_configs =
146        headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
147
148    let course_ids: Vec<Uuid> = chatbot_configs
149        .iter()
150        .map(|config| config.course_id)
151        .collect::<HashSet<_>>()
152        .into_iter()
153        .collect();
154
155    let sync_statuses =
156        headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
157            conn,
158            &course_ids,
159        )
160        .await?;
161
162    let latest_history_ids =
163        headless_lms_models::page_history::get_latest_page_history_ids_by_course_ids(
164            conn,
165            &course_ids,
166        )
167        .await?;
168
169    let shared_index_name = config.name.clone();
170    ensure_search_index_exists(
171        &shared_index_name,
172        &config.app_configuration,
173        &blob_client.container_name,
174    )
175    .await?;
176
177    if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
178        warn!("Search indexer is not ready to index. Skipping synchronization.");
179        return Ok(());
180    }
181
182    let mut any_changes = false;
183
184    for (course_id, statuses) in sync_statuses.iter() {
185        let page_ids: Vec<Uuid> = statuses.iter().map(|s| s.page_id).collect();
186        let public_pages_set: HashSet<Uuid> =
187            headless_lms_models::pages::get_by_ids_and_visibility(
188                conn,
189                &page_ids,
190                PageVisibility::Public,
191            )
192            .await?
193            .into_iter()
194            .map(|p| p.id)
195            .collect();
196
197        let outdated_statuses: Vec<_> = statuses
198            .iter()
199            .filter(|status| {
200                if !public_pages_set.contains(&status.page_id) {
201                    return false;
202                }
203
204                let is_outdated = latest_history_ids
205                    .get(&status.page_id)
206                    .is_some_and(|history_id| {
207                        status.synced_page_revision_id != Some(*history_id)
208                    });
209
210                if !is_outdated {
211                    return false;
212                }
213
214                if status.consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
215                    debug!(
216                        "Skipping page {} due to permanent failure ({} consecutive failures). Manual intervention required.",
217                        status.page_id, status.consecutive_failures
218                    );
219                    return false;
220                }
221
222                if let Some(error_msg) = &status.error_message
223                    && !error_msg.is_empty() {
224                        let error_age_seconds = (Utc::now() - status.updated_at).num_seconds();
225                        if error_age_seconds < FAILURE_COOLDOWN_SECS {
226                            debug!(
227                                "Skipping page {} due to recent failure ({} seconds ago, {} consecutive failures): {}",
228                                status.page_id, error_age_seconds, status.consecutive_failures, error_msg
229                            );
230                            return false;
231                        }
232                    }
233
234                true
235            })
236            .collect();
237
238        if outdated_statuses.is_empty() {
239            continue;
240        }
241
242        any_changes = true;
243        info!(
244            "Syncing {} pages for course id: {}.",
245            outdated_statuses.len(),
246            course_id
247        );
248        for status in &outdated_statuses {
249            info!(
250                "Page id: {}, synced page revision id: {:?}.",
251                status.page_id, status.synced_page_revision_id
252            );
253        }
254
255        let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
256        let pages = headless_lms_models::pages::get_by_ids_and_visibility(
257            conn,
258            &page_ids,
259            PageVisibility::Public,
260        )
261        .await?;
262
263        if !pages.is_empty() {
264            sync_pages_batch(
265                conn,
266                &pages,
267                blob_client,
268                &base_url,
269                &config.app_configuration,
270                &latest_history_ids,
271            )
272            .await?;
273        } else {
274            info!("No pages to sync for course id: {}.", course_id);
275        }
276
277        let hidden_page_ids: Vec<Uuid> = statuses
278            .iter()
279            .filter(|status| {
280                !public_pages_set.contains(&status.page_id)
281                    && status.synced_page_revision_id.is_some()
282            })
283            .map(|s| s.page_id)
284            .collect();
285
286        if !hidden_page_ids.is_empty() {
287            info!(
288                "Clearing sync statuses for {} hidden pages: {:?}",
289                hidden_page_ids.len(),
290                hidden_page_ids
291            );
292            headless_lms_models::chatbot_page_sync_statuses::clear_sync_statuses(
293                conn,
294                &hidden_page_ids,
295            )
296            .await?;
297        }
298
299        delete_old_files(conn, *course_id, blob_client).await?;
300    }
301
302    if any_changes {
303        run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
304        info!("New files have been synced and the search indexer has been started.");
305    }
306
307    Ok(())
308}
309
310/// Ensures that the specified search index exists, creating it if necessary.
311async fn ensure_search_index_exists(
312    name: &str,
313    app_config: &ApplicationConfiguration,
314    container_name: &str,
315) -> anyhow::Result<()> {
316    if !does_search_index_exist(name, app_config).await? {
317        create_search_index(name.to_owned(), app_config).await?;
318    }
319    if !does_skillset_exist(name, app_config).await? {
320        create_skillset(name, name, app_config).await?;
321    }
322    if !does_azure_datasource_exist(name, app_config).await? {
323        create_azure_datasource(name, container_name, app_config).await?;
324    }
325    if !does_search_indexer_exist(name, app_config).await? {
326        create_search_indexer(name, name, name, name, app_config).await?;
327    }
328
329    Ok(())
330}
331
332/// Processes and synchronizes a batch of pages.
333async fn sync_pages_batch(
334    conn: &mut PgConnection,
335    pages: &[Page],
336    blob_client: &AzureBlobClient,
337    base_url: &Url,
338    app_config: &ApplicationConfiguration,
339    latest_history_ids: &HashMap<Uuid, Uuid>,
340) -> anyhow::Result<()> {
341    let course_id = pages
342        .first()
343        .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
344        .course_id
345        .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
346
347    let course = headless_lms_models::courses::get_course(conn, course_id).await?;
348    let organization =
349        headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
350    let task_lm = headless_lms_models::application_task_default_language_models::get_for_task(
351        conn,
352        ApplicationTask::ContentCleaning,
353    )
354    .await?;
355
356    let mut base_url = base_url.clone();
357    base_url.set_path(&format!(
358        "/org/{}/courses/{}",
359        organization.slug, course.slug
360    ));
361
362    let mut allowed_file_paths = Vec::new();
363
364    for page in pages {
365        info!("Syncing page id: {}.", page.id);
366
367        let mut page_url = base_url.clone();
368        page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
369
370        let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
371        let sanitized_blocks = remove_sensitive_attributes(parsed_content);
372
373        let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
374            &sanitized_blocks,
375            app_config,
376            &task_lm,
377        )
378        .await
379        {
380            Ok(markdown) => {
381                info!("Successfully cleaned content for page {}", page.id);
382                // Check if the markdown is empty, or if it just contains all spaces or newlines
383                if markdown.trim().is_empty() {
384                    warn!(
385                        "Markdown is empty for page {}. Generating fallback content with a fake heading.",
386                        page.id
387                    );
388                    format!("# {}", page.title)
389                } else {
390                    markdown
391                }
392            }
393            Err(e) => {
394                let error_msg = format!("Sync failed: LLM processing error: {}", e);
395                warn!(
396                    "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
397                    page.id, error_msg
398                );
399                if let Err(db_err) =
400                    headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
401                        conn, page.id, &error_msg,
402                    )
403                    .await
404                {
405                    warn!(
406                        "Failed to record sync error for page {}: {:?}",
407                        page.id, db_err
408                    );
409                }
410                // Fallback to original content
411                serde_json::to_string(&sanitized_blocks)?
412            }
413        };
414
415        let blob_path = generate_blob_path(page)?;
416        let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
417            match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
418                Ok(c) => Some(c),
419                Err(e) => {
420                    debug!("Chapter lookup failed for page {}: {}", page.id, e);
421                    None
422                }
423            }
424        } else {
425            None
426        };
427
428        allowed_file_paths.push(blob_path.clone());
429        let mut metadata = HashMap::new();
430        // Azure Blob Storage metadata values must be ASCII-only. URL-encode values that may
431        // contain non-ASCII characters (e.g., Finnish characters like ä, ö) to ensure they
432        // are ASCII-compatible. We decode the url and the title before we save them in our database.
433        metadata.insert("url".to_string(), url_encode(page_url.as_ref()));
434        metadata.insert("title".to_string(), url_encode(&page.title));
435        metadata.insert(
436            "course_id".to_string(),
437            page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
438        );
439        metadata.insert(
440            "language".to_string(),
441            course.language_code.to_string().into(),
442        );
443        metadata.insert("filepath".to_string(), blob_path.clone().into());
444        if let Some(c) = chapter {
445            metadata.insert(
446                "chunk_context".to_string(),
447                url_encode(&format!(
448                    "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
449                    page.title, c.chapter_number, c.name, course.name,
450                )),
451            );
452        } else {
453            metadata.insert(
454                "chunk_context".to_string(),
455                url_encode(&format!(
456                    "This chunk is a snippet from page {} of the course {}.",
457                    page.title, course.name,
458                )),
459            );
460        }
461
462        if let Err(e) = blob_client
463            .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
464            .await
465        {
466            let error_msg = format!("Sync failed: Upload error: {}", e);
467            warn!("Failed to upload file {}: {:?}", blob_path, e);
468            if let Err(db_err) =
469                headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
470                    conn, page.id, &error_msg,
471                )
472                .await
473            {
474                warn!(
475                    "Failed to record upload error for page {}: {:?}",
476                    page.id, db_err
477                );
478            }
479        } else if let Some(history_id) = latest_history_ids.get(&page.id) {
480            let mut page_revision_map = HashMap::new();
481            page_revision_map.insert(page.id, *history_id);
482            if let Err(e) =
483                headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
484                    conn,
485                    page_revision_map,
486                )
487                .await
488            {
489                let error_msg = format!("Sync failed: Status update error: {}", e);
490                warn!("Failed to update sync status for page {}: {:?}", page.id, e);
491                if let Err(db_err) =
492                    headless_lms_models::chatbot_page_sync_statuses::set_page_sync_error(
493                        conn, page.id, &error_msg,
494                    )
495                    .await
496                {
497                    warn!(
498                        "Failed to record status update error for page {}: {:?}",
499                        page.id, db_err
500                    );
501                }
502            }
503        }
504    }
505
506    Ok(())
507}
508
509/// Generates the blob storage path for a given page.
510fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
511    let course_id = page
512        .course_id
513        .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
514
515    Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
516}
517
518/// Deletes files from blob storage that are no longer associated with any public page.
519/// This includes files for deleted pages, hidden pages, and any other pages that are no longer public.
520async fn delete_old_files(
521    conn: &mut PgConnection,
522    course_id: Uuid,
523    blob_client: &AzureBlobClient,
524) -> anyhow::Result<()> {
525    let mut courses_prefix = "courses/".to_string();
526    courses_prefix.push_str(&course_id.to_string());
527    let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
528
529    let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
530        conn,
531        course_id,
532        PageVisibility::Public,
533    )
534    .await?;
535
536    let allowed_paths: HashSet<String> = pages
537        .iter()
538        .filter_map(|page| generate_blob_path(page).ok())
539        .collect();
540
541    for file in existing_files {
542        if !allowed_paths.contains(&file) {
543            info!("Deleting obsolete file: {}", file);
544            blob_client.delete_file(&file).await?;
545        }
546    }
547
548    Ok(())
549}