headless_lms_server/programs/
chatbot_syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use dotenv::dotenv;
8use sqlx::{PgConnection, PgPool};
9use url::Url;
10use uuid::Uuid;
11
12use crate::setup_tracing;
13
14use headless_lms_chatbot::{
15    azure_blob_storage::AzureBlobClient,
16    azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
17    azure_search_index::{create_search_index, does_search_index_exist},
18    azure_search_indexer::{
19        check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
20        run_search_indexer_now,
21    },
22    azure_skillset::{create_skillset, does_skillset_exist},
23    content_cleaner::convert_material_blocks_to_markdown_with_llm,
24};
25use headless_lms_models::{
26    chapters::DatabaseChapter,
27    page_history::PageHistory,
28    pages::{Page, PageVisibility},
29};
30use headless_lms_utils::{
31    ApplicationConfiguration,
32    document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
33};
34
35const SYNC_INTERVAL_SECS: u64 = 10;
36const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
37
38pub async fn main() -> anyhow::Result<()> {
39    initialize_environment()?;
40    let config = initialize_configuration().await?;
41    if config.app_configuration.azure_configuration.is_none() {
42        warn!("Azure configuration not provided. Not running chatbot syncer.");
43        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
44        loop {
45            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
46        }
47    }
48    if config.app_configuration.test_chatbot {
49        warn!(
50            "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
51        );
52        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
53        loop {
54            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
55        }
56    }
57
58    let db_pool = initialize_database_pool(&config.database_url).await?;
59    let mut conn = db_pool.acquire().await?;
60    let blob_client = initialize_blob_client(&config).await?;
61
62    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
63    let mut ticks = 0;
64
65    info!("Starting chatbot syncer.");
66
67    loop {
68        interval.tick().await;
69        ticks += 1;
70
71        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
72            ticks = 0;
73            info!("Still syncing for chatbot.");
74        }
75        if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
76            error!("Error during synchronization: {:?}", e);
77        }
78    }
79}
80
81fn initialize_environment() -> anyhow::Result<()> {
82    // TODO: Audit that the environment access only happens in single-threaded code.
83    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
84    dotenv().ok();
85    setup_tracing()?;
86    Ok(())
87}
88
89struct SyncerConfig {
90    database_url: String,
91    name: String,
92    app_configuration: ApplicationConfiguration,
93}
94
95async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
96    let database_url = env::var("DATABASE_URL")
97        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
98
99    let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
100        .expect("BASE_URL must be a valid URL");
101
102    let name = base_url
103        .host_str()
104        .expect("BASE_URL must have a host")
105        .replace(".", "-");
106
107    let app_configuration = ApplicationConfiguration::try_from_env()?;
108
109    Ok(SyncerConfig {
110        database_url,
111        name,
112        app_configuration,
113    })
114}
115
116/// Initializes the PostgreSQL connection pool.
117async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
118    PgPool::connect(database_url).await.map_err(|e| {
119        anyhow::anyhow!(
120            "Failed to connect to the database at {}: {:?}",
121            database_url,
122            e
123        )
124    })
125}
126
127/// Initializes the Azure Blob Storage client.
128async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
129    let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
130    blob_client.ensure_container_exists().await?;
131    Ok(blob_client)
132}
133
134/// Synchronizes pages to the chatbot backend.
135async fn sync_pages(
136    conn: &mut PgConnection,
137    config: &SyncerConfig,
138    blob_client: &AzureBlobClient,
139) -> anyhow::Result<()> {
140    let base_url = Url::parse(&config.app_configuration.base_url)?;
141    let chatbot_configs =
142        headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
143
144    let course_ids: Vec<Uuid> = chatbot_configs
145        .iter()
146        .map(|config| config.course_id)
147        .collect::<HashSet<_>>()
148        .into_iter()
149        .collect();
150
151    let sync_statuses =
152        headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
153            conn,
154            &course_ids,
155        )
156        .await?;
157
158    let latest_histories =
159        headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
160            conn,
161            &course_ids,
162        )
163        .await?;
164
165    let shared_index_name = config.name.clone();
166    ensure_search_index_exists(
167        &shared_index_name,
168        &config.app_configuration,
169        &blob_client.container_name,
170    )
171    .await?;
172
173    if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
174        warn!("Search indexer is not ready to index. Skipping synchronization.");
175        return Ok(());
176    }
177
178    let mut any_changes = false;
179
180    for (course_id, statuses) in sync_statuses.iter() {
181        let outdated_statuses: Vec<_> = statuses
182            .iter()
183            .filter(|status| {
184                latest_histories
185                    .get(&status.page_id)
186                    .is_some_and(|history| status.synced_page_revision_id != Some(history.id))
187            })
188            .collect();
189
190        if outdated_statuses.is_empty() {
191            continue;
192        }
193
194        any_changes = true;
195        info!(
196            "Syncing {} pages for course id: {}.",
197            outdated_statuses.len(),
198            course_id
199        );
200
201        let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
202        let mut pages = headless_lms_models::pages::get_by_ids_and_visibility(
203            conn,
204            &page_ids,
205            PageVisibility::Public,
206        )
207        .await?;
208
209        if !pages.is_empty() {
210            sync_pages_batch(
211                conn,
212                &pages,
213                blob_client,
214                &base_url,
215                &config.app_configuration,
216            )
217            .await?;
218        }
219
220        let deleted_pages = headless_lms_models::pages::get_by_ids_deleted_and_visibility(
221            conn,
222            &page_ids,
223            PageVisibility::Public,
224        )
225        .await?;
226        pages.extend(deleted_pages);
227
228        update_sync_statuses(conn, &pages, &latest_histories).await?;
229
230        delete_old_files(conn, *course_id, blob_client).await?;
231    }
232
233    if any_changes {
234        run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
235        info!("New files have been synced and the search indexer has been started.");
236    }
237
238    Ok(())
239}
240
241/// Ensures that the specified search index exists, creating it if necessary.
242async fn ensure_search_index_exists(
243    name: &str,
244    app_config: &ApplicationConfiguration,
245    container_name: &str,
246) -> anyhow::Result<()> {
247    if !does_search_index_exist(name, app_config).await? {
248        create_search_index(name.to_owned(), app_config).await?;
249    }
250    if !does_skillset_exist(name, app_config).await? {
251        create_skillset(name, name, app_config).await?;
252    }
253    if !does_azure_datasource_exist(name, app_config).await? {
254        create_azure_datasource(name, container_name, app_config).await?;
255    }
256    if !does_search_indexer_exist(name, app_config).await? {
257        create_search_indexer(name, name, name, name, app_config).await?;
258    }
259
260    Ok(())
261}
262
263/// Processes and synchronizes a batch of pages.
264async fn sync_pages_batch(
265    conn: &mut PgConnection,
266    pages: &[Page],
267    blob_client: &AzureBlobClient,
268    base_url: &Url,
269    app_config: &ApplicationConfiguration,
270) -> anyhow::Result<()> {
271    let course_id = pages
272        .first()
273        .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
274        .course_id
275        .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
276
277    let course = headless_lms_models::courses::get_course(conn, course_id).await?;
278    let organization =
279        headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
280
281    let mut base_url = base_url.clone();
282    base_url.set_path(&format!(
283        "/org/{}/courses/{}",
284        organization.slug, course.slug
285    ));
286
287    let mut allowed_file_paths = Vec::new();
288
289    for page in pages {
290        info!("Syncing page id: {}.", page.id);
291
292        let mut page_url = base_url.clone();
293        page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
294
295        let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
296        let sanitized_blocks = remove_sensitive_attributes(parsed_content);
297
298        let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
299            &sanitized_blocks,
300            app_config,
301        )
302        .await
303        {
304            Ok(markdown) => {
305                info!("Successfully cleaned content for page {}", page.id);
306                // Check if the markdown is empty, or if it just contains all spaces or newlines
307                if markdown.trim().is_empty() {
308                    warn!(
309                        "Markdown is empty for page {}. Generating fallback content with a fake heading.",
310                        page.id
311                    );
312                    format!("# {}", page.title)
313                } else {
314                    markdown
315                }
316            }
317            Err(e) => {
318                warn!(
319                    "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
320                    page.id, e
321                );
322                // Fallback to original content
323                serde_json::to_string(&sanitized_blocks)?
324            }
325        };
326
327        let blob_path = generate_blob_path(page)?;
328        let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
329            match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
330                Ok(c) => Some(c),
331                Err(e) => {
332                    debug!("Chapter lookup failed for page {}: {}", page.id, e);
333                    None
334                }
335            }
336        } else {
337            None
338        };
339
340        allowed_file_paths.push(blob_path.clone());
341        let mut metadata = HashMap::new();
342        metadata.insert("url".to_string(), page_url.to_string().into());
343        metadata.insert("title".to_string(), page.title.to_string().into());
344        metadata.insert(
345            "course_id".to_string(),
346            page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
347        );
348        metadata.insert(
349            "language".to_string(),
350            course.language_code.to_string().into(),
351        );
352        metadata.insert("filepath".to_string(), blob_path.clone().into());
353        if let Some(c) = chapter {
354            metadata.insert(
355                "chunk_context".to_string(),
356                format!(
357                    "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
358                    page.title, c.chapter_number, c.name, course.name,
359                )
360                .into(),
361            );
362        } else {
363            metadata.insert(
364                "chunk_context".to_string(),
365                format!(
366                    "This chunk is a snippet from page {} of the course {}.",
367                    page.title, course.name,
368                )
369                .into(),
370            );
371        }
372
373        if let Err(e) = blob_client
374            .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
375            .await
376        {
377            warn!("Failed to upload file {}: {:?}", blob_path, e);
378        }
379    }
380
381    Ok(())
382}
383
384/// Update sync statuses for pages after syncing
385async fn update_sync_statuses(
386    conn: &mut PgConnection,
387    pages: &[Page],
388    latest_histories: &HashMap<Uuid, PageHistory>,
389) -> anyhow::Result<()> {
390    let page_revision_map: HashMap<Uuid, Uuid> = pages
391        .iter()
392        .map(|page| (page.id, latest_histories[&page.id].id))
393        .collect();
394
395    headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
396        conn,
397        page_revision_map,
398    )
399    .await?;
400
401    Ok(())
402}
403
404/// Generates the blob storage path for a given page.
405fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
406    let course_id = page
407        .course_id
408        .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
409
410    Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
411}
412
413/// Deletes files from blob storage that are no longer associated with any page.
414async fn delete_old_files(
415    conn: &mut PgConnection,
416    course_id: Uuid,
417    blob_client: &AzureBlobClient,
418) -> anyhow::Result<()> {
419    let mut courses_prefix = "courses/".to_string();
420    courses_prefix.push_str(&course_id.to_string());
421    let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
422
423    let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
424        conn,
425        course_id,
426        PageVisibility::Public,
427    )
428    .await?;
429
430    let allowed_paths: HashSet<String> = pages
431        .iter()
432        .filter_map(|page| generate_blob_path(page).ok())
433        .collect();
434
435    for file in existing_files {
436        if !allowed_paths.contains(&file) {
437            info!("Deleting obsolete file: {}", file);
438            blob_client.delete_file(&file).await?;
439        }
440    }
441
442    Ok(())
443}