headless_lms_server/programs/
chatbot_syncer.rs

1use std::{
2    collections::{HashMap, HashSet},
3    env,
4    time::Duration,
5};
6
7use dotenv::dotenv;
8use sqlx::{PgConnection, PgPool};
9use url::Url;
10use uuid::Uuid;
11
12use crate::setup_tracing;
13
14use headless_lms_chatbot::{
15    azure_blob_storage::AzureBlobClient,
16    azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
17    azure_search_index::{create_search_index, does_search_index_exist},
18    azure_search_indexer::{
19        check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
20        run_search_indexer_now,
21    },
22    azure_skillset::{create_skillset, does_skillset_exist},
23    content_cleaner::convert_material_blocks_to_markdown_with_llm,
24};
25use headless_lms_models::{
26    page_history::PageHistory,
27    pages::{Page, PageVisibility},
28};
29use headless_lms_utils::{
30    ApplicationConfiguration,
31    document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
32};
33
34const SYNC_INTERVAL_SECS: u64 = 10;
35const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
36
37pub async fn main() -> anyhow::Result<()> {
38    initialize_environment()?;
39    let config = initialize_configuration().await?;
40    if config.app_configuration.azure_configuration.is_none() {
41        warn!("Azure configuration not provided. Not running chatbot syncer.");
42        // Sleep indefinitely to prevent the program from exiting. This only happens in development.
43        loop {
44            tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
45        }
46    }
47
48    let db_pool = initialize_database_pool(&config.database_url).await?;
49    let mut conn = db_pool.acquire().await?;
50    let blob_client = initialize_blob_client(&config).await?;
51
52    let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
53    let mut ticks = 0;
54
55    info!("Starting chatbot syncer.");
56
57    loop {
58        interval.tick().await;
59        ticks += 1;
60
61        if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
62            ticks = 0;
63            info!("Still syncing for chatbot.");
64        }
65        if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
66            error!("Error during synchronization: {:?}", e);
67        }
68    }
69}
70
71fn initialize_environment() -> anyhow::Result<()> {
72    // TODO: Audit that the environment access only happens in single-threaded code.
73    unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
74    dotenv().ok();
75    setup_tracing()?;
76    Ok(())
77}
78
79struct SyncerConfig {
80    database_url: String,
81    name: String,
82    app_configuration: ApplicationConfiguration,
83}
84
85async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
86    let database_url = env::var("DATABASE_URL")
87        .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
88
89    let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
90        .expect("BASE_URL must be a valid URL");
91
92    let name = base_url
93        .host_str()
94        .expect("BASE_URL must have a host")
95        .replace(".", "-");
96
97    let app_configuration = ApplicationConfiguration::try_from_env()?;
98
99    Ok(SyncerConfig {
100        database_url,
101        name,
102        app_configuration,
103    })
104}
105
106/// Initializes the PostgreSQL connection pool.
107async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
108    PgPool::connect(database_url).await.map_err(|e| {
109        anyhow::anyhow!(
110            "Failed to connect to the database at {}: {:?}",
111            database_url,
112            e
113        )
114    })
115}
116
117/// Initializes the Azure Blob Storage client.
118async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
119    let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
120    blob_client.ensure_container_exists().await?;
121    Ok(blob_client)
122}
123
124/// Synchronizes pages to the chatbot backend.
125async fn sync_pages(
126    conn: &mut PgConnection,
127    config: &SyncerConfig,
128    blob_client: &AzureBlobClient,
129) -> anyhow::Result<()> {
130    let base_url = Url::parse(&config.app_configuration.base_url)?;
131    let chatbot_configs =
132        headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
133
134    let course_ids: Vec<Uuid> = chatbot_configs
135        .iter()
136        .map(|config| config.course_id)
137        .collect::<HashSet<_>>()
138        .into_iter()
139        .collect();
140
141    let sync_statuses =
142        headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
143            conn,
144            &course_ids,
145        )
146        .await?;
147
148    let latest_histories =
149        headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
150            conn,
151            &course_ids,
152        )
153        .await?;
154
155    let shared_index_name = config.name.clone();
156    ensure_search_index_exists(
157        &shared_index_name,
158        &config.app_configuration,
159        &blob_client.container_name,
160    )
161    .await?;
162
163    if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
164        warn!("Search indexer is not ready to index. Skipping synchronization.");
165        return Ok(());
166    }
167
168    let mut any_changes = false;
169
170    for (course_id, statuses) in sync_statuses.iter() {
171        let outdated_statuses: Vec<_> = statuses
172            .iter()
173            .filter(|status| {
174                latest_histories
175                    .get(&status.page_id)
176                    .is_some_and(|history| status.synced_page_revision_id != Some(history.id))
177            })
178            .collect();
179
180        if outdated_statuses.is_empty() {
181            continue;
182        }
183
184        any_changes = true;
185        info!(
186            "Syncing {} pages for course id: {}.",
187            outdated_statuses.len(),
188            course_id
189        );
190
191        let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
192        let pages = headless_lms_models::pages::get_by_ids(conn, &page_ids).await?;
193
194        sync_pages_batch(
195            conn,
196            &pages,
197            &latest_histories,
198            blob_client,
199            &base_url,
200            &config.app_configuration,
201        )
202        .await?;
203
204        delete_old_files(conn, *course_id, blob_client).await?;
205    }
206
207    if any_changes {
208        run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
209        info!("New files have been synced and the search indexer has been started.");
210    }
211
212    Ok(())
213}
214
215/// Ensures that the specified search index exists, creating it if necessary.
216async fn ensure_search_index_exists(
217    name: &str,
218    app_config: &ApplicationConfiguration,
219    container_name: &str,
220) -> anyhow::Result<()> {
221    if !does_search_index_exist(name, app_config).await? {
222        create_search_index(name.to_owned(), app_config).await?;
223    }
224    if !does_skillset_exist(name, app_config).await? {
225        create_skillset(name, name, app_config).await?;
226    }
227    if !does_azure_datasource_exist(name, app_config).await? {
228        create_azure_datasource(name, container_name, app_config).await?;
229    }
230    if !does_search_indexer_exist(name, app_config).await? {
231        create_search_indexer(name, name, name, name, app_config).await?;
232    }
233
234    Ok(())
235}
236
237/// Processes and synchronizes a batch of pages.
238async fn sync_pages_batch(
239    conn: &mut PgConnection,
240    pages: &[Page],
241    latest_histories: &HashMap<Uuid, PageHistory>,
242    blob_client: &AzureBlobClient,
243    base_url: &Url,
244    app_config: &ApplicationConfiguration,
245) -> anyhow::Result<()> {
246    let course_id = pages
247        .first()
248        .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
249        .course_id
250        .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
251
252    let course = headless_lms_models::courses::get_course(conn, course_id).await?;
253    let organization =
254        headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
255
256    let mut base_url = base_url.clone();
257    base_url.set_path(&format!(
258        "/org/{}/courses/{}",
259        organization.slug, course.slug
260    ));
261
262    let mut allowed_file_paths = Vec::new();
263
264    for page in pages {
265        info!("Syncing page id: {}.", page.id);
266
267        let mut page_url = base_url.clone();
268        page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
269
270        let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
271        let sanitized_blocks = remove_sensitive_attributes(parsed_content);
272
273        let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
274            &sanitized_blocks,
275            app_config,
276        )
277        .await
278        {
279            Ok(markdown) => {
280                info!("Successfully cleaned content for page {}", page.id);
281                // Check if the markdown is empty, or if it just contains all spaces or newlines
282                if markdown.trim().is_empty() {
283                    warn!(
284                        "Markdown is empty for page {}. Generating fallback content with a fake heading.",
285                        page.id
286                    );
287                    format!("# {}", page.title)
288                } else {
289                    markdown
290                }
291            }
292            Err(e) => {
293                warn!(
294                    "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
295                    page.id, e
296                );
297                // Fallback to original content
298                serde_json::to_string(&sanitized_blocks)?
299            }
300        };
301
302        let blob_path = generate_blob_path(page)?;
303
304        allowed_file_paths.push(blob_path.clone());
305        let mut metadata = HashMap::new();
306        metadata.insert("url".to_string(), page_url.to_string().into());
307        metadata.insert("title".to_string(), page.title.to_string().into());
308        metadata.insert(
309            "course_id".to_string(),
310            page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
311        );
312        metadata.insert(
313            "language".to_string(),
314            course.language_code.to_string().into(),
315        );
316
317        if let Err(e) = blob_client
318            .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
319            .await
320        {
321            warn!("Failed to upload file {}: {:?}", blob_path, e);
322        }
323    }
324
325    let page_revision_map: HashMap<Uuid, Uuid> = pages
326        .iter()
327        .map(|page| (page.id, latest_histories[&page.id].id))
328        .collect();
329
330    headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
331        conn,
332        page_revision_map,
333    )
334    .await?;
335
336    Ok(())
337}
338
339/// Generates the blob storage path for a given page.
340fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
341    let course_id = page
342        .course_id
343        .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
344
345    let mut url_path = page.url_path.trim_start_matches('/').to_string();
346    if url_path.is_empty() {
347        url_path = "index".to_string();
348    }
349
350    Ok(format!("courses/{}/{}.md", course_id, url_path))
351}
352
353/// Deletes files from blob storage that are no longer associated with any page.
354async fn delete_old_files(
355    conn: &mut PgConnection,
356    course_id: Uuid,
357    blob_client: &AzureBlobClient,
358) -> anyhow::Result<()> {
359    let existing_files = blob_client
360        .list_files_with_prefix(&course_id.to_string())
361        .await?;
362
363    let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
364        conn,
365        course_id,
366        PageVisibility::Public,
367    )
368    .await?;
369
370    let allowed_paths: HashSet<String> = pages
371        .iter()
372        .filter_map(|page| generate_blob_path(page).ok())
373        .collect();
374
375    for file in existing_files {
376        if !allowed_paths.contains(&file) {
377            info!("Deleting obsolete file: {}", file);
378            blob_client.delete_file(&file).await?;
379        }
380    }
381
382    Ok(())
383}