headless_lms_server/programs/
chatbot_syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use dotenv::dotenv;
8use sqlx::{PgConnection, PgPool};
9use url::Url;
10use uuid::Uuid;
11
12use crate::setup_tracing;
13
14use headless_lms_chatbot::{
15    azure_blob_storage::AzureBlobClient,
16    azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
17    azure_search_index::{create_search_index, does_search_index_exist},
18    azure_search_indexer::{
19        check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
20        run_search_indexer_now,
21    },
22    azure_skillset::{create_skillset, does_skillset_exist},
23    content_cleaner::convert_material_blocks_to_markdown_with_llm,
24};
25use headless_lms_models::{
26    chapters::DatabaseChapter,
27    page_history::PageHistory,
28    pages::{Page, PageVisibility},
29};
30use headless_lms_utils::{
31    ApplicationConfiguration,
32    document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
33};
34
35const SYNC_INTERVAL_SECS: u64 = 10;
36const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
37
38pub async fn main() -> anyhow::Result<()> {
39    initialize_environment()?;
40    let config = initialize_configuration().await?;
41    if config.app_configuration.azure_configuration.is_none() {
42        warn!("Azure configuration not provided. Not running chatbot syncer.");
43        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
44        loop {
45            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
46        }
47    }
48
49    let db_pool = initialize_database_pool(&config.database_url).await?;
50    let mut conn = db_pool.acquire().await?;
51    let blob_client = initialize_blob_client(&config).await?;
52
53    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
54    let mut ticks = 0;
55
56    info!("Starting chatbot syncer.");
57
58    loop {
59        interval.tick().await;
60        ticks += 1;
61
62        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
63            ticks = 0;
64            info!("Still syncing for chatbot.");
65        }
66        if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
67            error!("Error during synchronization: {:?}", e);
68        }
69    }
70}
71
72fn initialize_environment() -> anyhow::Result<()> {
73    // TODO: Audit that the environment access only happens in single-threaded code.
74    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
75    dotenv().ok();
76    setup_tracing()?;
77    Ok(())
78}
79
80struct SyncerConfig {
81    database_url: String,
82    name: String,
83    app_configuration: ApplicationConfiguration,
84}
85
86async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
87    let database_url = env::var("DATABASE_URL")
88        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
89
90    let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
91        .expect("BASE_URL must be a valid URL");
92
93    let name = base_url
94        .host_str()
95        .expect("BASE_URL must have a host")
96        .replace(".", "-");
97
98    let app_configuration = ApplicationConfiguration::try_from_env()?;
99
100    Ok(SyncerConfig {
101        database_url,
102        name,
103        app_configuration,
104    })
105}
106
107/// Initializes the PostgreSQL connection pool.
108async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
109    PgPool::connect(database_url).await.map_err(|e| {
110        anyhow::anyhow!(
111            "Failed to connect to the database at {}: {:?}",
112            database_url,
113            e
114        )
115    })
116}
117
118/// Initializes the Azure Blob Storage client.
119async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
120    let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
121    blob_client.ensure_container_exists().await?;
122    Ok(blob_client)
123}
124
125/// Synchronizes pages to the chatbot backend.
126async fn sync_pages(
127    conn: &mut PgConnection,
128    config: &SyncerConfig,
129    blob_client: &AzureBlobClient,
130) -> anyhow::Result<()> {
131    let base_url = Url::parse(&config.app_configuration.base_url)?;
132    let chatbot_configs =
133        headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
134
135    let course_ids: Vec<Uuid> = chatbot_configs
136        .iter()
137        .map(|config| config.course_id)
138        .collect::<HashSet<_>>()
139        .into_iter()
140        .collect();
141
142    let sync_statuses =
143        headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
144            conn,
145            &course_ids,
146        )
147        .await?;
148
149    let latest_histories =
150        headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
151            conn,
152            &course_ids,
153        )
154        .await?;
155
156    let shared_index_name = config.name.clone();
157    ensure_search_index_exists(
158        &shared_index_name,
159        &config.app_configuration,
160        &blob_client.container_name,
161    )
162    .await?;
163
164    if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
165        warn!("Search indexer is not ready to index. Skipping synchronization.");
166        return Ok(());
167    }
168
169    let mut any_changes = false;
170
171    for (course_id, statuses) in sync_statuses.iter() {
172        let outdated_statuses: Vec<_> = statuses
173            .iter()
174            .filter(|status| {
175                latest_histories
176                    .get(&status.page_id)
177                    .is_some_and(|history| status.synced_page_revision_id != Some(history.id))
178            })
179            .collect();
180
181        if outdated_statuses.is_empty() {
182            continue;
183        }
184
185        any_changes = true;
186        info!(
187            "Syncing {} pages for course id: {}.",
188            outdated_statuses.len(),
189            course_id
190        );
191
192        let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
193        let mut pages = headless_lms_models::pages::get_by_ids_and_visibility(
194            conn,
195            &page_ids,
196            PageVisibility::Public,
197        )
198        .await?;
199
200        if !pages.is_empty() {
201            sync_pages_batch(
202                conn,
203                &pages,
204                blob_client,
205                &base_url,
206                &config.app_configuration,
207            )
208            .await?;
209        }
210
211        let deleted_pages = headless_lms_models::pages::get_by_ids_deleted_and_visibility(
212            conn,
213            &page_ids,
214            PageVisibility::Public,
215        )
216        .await?;
217        pages.extend(deleted_pages);
218
219        update_sync_statuses(conn, &pages, &latest_histories).await?;
220
221        delete_old_files(conn, *course_id, blob_client).await?;
222    }
223
224    if any_changes {
225        run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
226        info!("New files have been synced and the search indexer has been started.");
227    }
228
229    Ok(())
230}
231
232/// Ensures that the specified search index exists, creating it if necessary.
233async fn ensure_search_index_exists(
234    name: &str,
235    app_config: &ApplicationConfiguration,
236    container_name: &str,
237) -> anyhow::Result<()> {
238    if !does_search_index_exist(name, app_config).await? {
239        create_search_index(name.to_owned(), app_config).await?;
240    }
241    if !does_skillset_exist(name, app_config).await? {
242        create_skillset(name, name, app_config).await?;
243    }
244    if !does_azure_datasource_exist(name, app_config).await? {
245        create_azure_datasource(name, container_name, app_config).await?;
246    }
247    if !does_search_indexer_exist(name, app_config).await? {
248        create_search_indexer(name, name, name, name, app_config).await?;
249    }
250
251    Ok(())
252}
253
254/// Processes and synchronizes a batch of pages.
255async fn sync_pages_batch(
256    conn: &mut PgConnection,
257    pages: &[Page],
258    blob_client: &AzureBlobClient,
259    base_url: &Url,
260    app_config: &ApplicationConfiguration,
261) -> anyhow::Result<()> {
262    let course_id = pages
263        .first()
264        .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
265        .course_id
266        .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
267
268    let course = headless_lms_models::courses::get_course(conn, course_id).await?;
269    let organization =
270        headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
271
272    let mut base_url = base_url.clone();
273    base_url.set_path(&format!(
274        "/org/{}/courses/{}",
275        organization.slug, course.slug
276    ));
277
278    let mut allowed_file_paths = Vec::new();
279
280    for page in pages {
281        info!("Syncing page id: {}.", page.id);
282
283        let mut page_url = base_url.clone();
284        page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
285
286        let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
287        let sanitized_blocks = remove_sensitive_attributes(parsed_content);
288
289        let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
290            &sanitized_blocks,
291            app_config,
292        )
293        .await
294        {
295            Ok(markdown) => {
296                info!("Successfully cleaned content for page {}", page.id);
297                // Check if the markdown is empty, or if it just contains all spaces or newlines
298                if markdown.trim().is_empty() {
299                    warn!(
300                        "Markdown is empty for page {}. Generating fallback content with a fake heading.",
301                        page.id
302                    );
303                    format!("# {}", page.title)
304                } else {
305                    markdown
306                }
307            }
308            Err(e) => {
309                warn!(
310                    "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
311                    page.id, e
312                );
313                // Fallback to original content
314                serde_json::to_string(&sanitized_blocks)?
315            }
316        };
317
318        let blob_path = generate_blob_path(page)?;
319        let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
320            match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
321                Ok(c) => Some(c),
322                Err(e) => {
323                    debug!("Chapter lookup failed for page {}: {}", page.id, e);
324                    None
325                }
326            }
327        } else {
328            None
329        };
330
331        allowed_file_paths.push(blob_path.clone());
332        let mut metadata = HashMap::new();
333        metadata.insert("url".to_string(), page_url.to_string().into());
334        metadata.insert("title".to_string(), page.title.to_string().into());
335        metadata.insert(
336            "course_id".to_string(),
337            page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
338        );
339        metadata.insert(
340            "language".to_string(),
341            course.language_code.to_string().into(),
342        );
343        metadata.insert("filepath".to_string(), blob_path.clone().into());
344        if let Some(c) = chapter {
345            metadata.insert(
346                "chunk_context".to_string(),
347                format!(
348                    "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
349                    page.title, c.chapter_number, c.name, course.name,
350                )
351                .into(),
352            );
353        } else {
354            metadata.insert(
355                "chunk_context".to_string(),
356                format!(
357                    "This chunk is a snippet from page {} of the course {}.",
358                    page.title, course.name,
359                )
360                .into(),
361            );
362        }
363
364        if let Err(e) = blob_client
365            .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
366            .await
367        {
368            warn!("Failed to upload file {}: {:?}", blob_path, e);
369        }
370    }
371
372    Ok(())
373}
374
375/// Update sync statuses for pages after syncing
376async fn update_sync_statuses(
377    conn: &mut PgConnection,
378    pages: &[Page],
379    latest_histories: &HashMap<Uuid, PageHistory>,
380) -> anyhow::Result<()> {
381    let page_revision_map: HashMap<Uuid, Uuid> = pages
382        .iter()
383        .map(|page| (page.id, latest_histories[&page.id].id))
384        .collect();
385
386    headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
387        conn,
388        page_revision_map,
389    )
390    .await?;
391
392    Ok(())
393}
394
395/// Generates the blob storage path for a given page.
396fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
397    let course_id = page
398        .course_id
399        .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
400
401    Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
402}
403
404/// Deletes files from blob storage that are no longer associated with any page.
405async fn delete_old_files(
406    conn: &mut PgConnection,
407    course_id: Uuid,
408    blob_client: &AzureBlobClient,
409) -> anyhow::Result<()> {
410    let mut courses_prefix = "courses/".to_string();
411    courses_prefix.push_str(&course_id.to_string());
412    let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
413
414    let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
415        conn,
416        course_id,
417        PageVisibility::Public,
418    )
419    .await?;
420
421    let allowed_paths: HashSet<String> = pages
422        .iter()
423        .filter_map(|page| generate_blob_path(page).ok())
424        .collect();
425
426    for file in existing_files {
427        if !allowed_paths.contains(&file) {
428            info!("Deleting obsolete file: {}", file);
429            blob_client.delete_file(&file).await?;
430        }
431    }
432
433    Ok(())
434}