headless_lms_server/programs/
chatbot_syncer.rs1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use dotenv::dotenv;
8use sqlx::{PgConnection, PgPool};
9use url::Url;
10use uuid::Uuid;
11
12use crate::setup_tracing;
13
14use headless_lms_chatbot::{
15 azure_blob_storage::AzureBlobClient,
16 azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
17 azure_search_index::{create_search_index, does_search_index_exist},
18 azure_search_indexer::{
19 check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
20 run_search_indexer_now,
21 },
22 azure_skillset::{create_skillset, does_skillset_exist},
23 content_cleaner::convert_material_blocks_to_markdown_with_llm,
24};
25use headless_lms_models::{
26 chapters::DatabaseChapter,
27 page_history::PageHistory,
28 pages::{Page, PageVisibility},
29};
30use headless_lms_utils::{
31 ApplicationConfiguration,
32 document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
33};
34
35const SYNC_INTERVAL_SECS: u64 = 10;
36const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
37
38pub async fn main() -> anyhow::Result<()> {
39 initialize_environment()?;
40 let config = initialize_configuration().await?;
41 if config.app_configuration.azure_configuration.is_none() {
42 warn!("Azure configuration not provided. Not running chatbot syncer.");
43 loop {
45 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
46 }
47 }
48 if config.app_configuration.test_chatbot {
49 warn!(
50 "Using mock azure configuration, this must be a test/dev environment. Not running chatbot syncer."
51 );
52 loop {
54 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
55 }
56 }
57
58 let db_pool = initialize_database_pool(&config.database_url).await?;
59 let mut conn = db_pool.acquire().await?;
60 let blob_client = initialize_blob_client(&config).await?;
61
62 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
63 let mut ticks = 0;
64
65 info!("Starting chatbot syncer.");
66
67 loop {
68 interval.tick().await;
69 ticks += 1;
70
71 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
72 ticks = 0;
73 info!("Still syncing for chatbot.");
74 }
75 if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
76 error!("Error during synchronization: {:?}", e);
77 }
78 }
79}
80
81fn initialize_environment() -> anyhow::Result<()> {
82 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
84 dotenv().ok();
85 setup_tracing()?;
86 Ok(())
87}
88
89struct SyncerConfig {
90 database_url: String,
91 name: String,
92 app_configuration: ApplicationConfiguration,
93}
94
95async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
96 let database_url = env::var("DATABASE_URL")
97 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
98
99 let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
100 .expect("BASE_URL must be a valid URL");
101
102 let name = base_url
103 .host_str()
104 .expect("BASE_URL must have a host")
105 .replace(".", "-");
106
107 let app_configuration = ApplicationConfiguration::try_from_env()?;
108
109 Ok(SyncerConfig {
110 database_url,
111 name,
112 app_configuration,
113 })
114}
115
116async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
118 PgPool::connect(database_url).await.map_err(|e| {
119 anyhow::anyhow!(
120 "Failed to connect to the database at {}: {:?}",
121 database_url,
122 e
123 )
124 })
125}
126
127async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
129 let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
130 blob_client.ensure_container_exists().await?;
131 Ok(blob_client)
132}
133
134async fn sync_pages(
136 conn: &mut PgConnection,
137 config: &SyncerConfig,
138 blob_client: &AzureBlobClient,
139) -> anyhow::Result<()> {
140 let base_url = Url::parse(&config.app_configuration.base_url)?;
141 let chatbot_configs =
142 headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
143
144 let course_ids: Vec<Uuid> = chatbot_configs
145 .iter()
146 .map(|config| config.course_id)
147 .collect::<HashSet<_>>()
148 .into_iter()
149 .collect();
150
151 let sync_statuses =
152 headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
153 conn,
154 &course_ids,
155 )
156 .await?;
157
158 let latest_histories =
159 headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
160 conn,
161 &course_ids,
162 )
163 .await?;
164
165 let shared_index_name = config.name.clone();
166 ensure_search_index_exists(
167 &shared_index_name,
168 &config.app_configuration,
169 &blob_client.container_name,
170 )
171 .await?;
172
173 if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
174 warn!("Search indexer is not ready to index. Skipping synchronization.");
175 return Ok(());
176 }
177
178 let mut any_changes = false;
179
180 for (course_id, statuses) in sync_statuses.iter() {
181 let outdated_statuses: Vec<_> = statuses
182 .iter()
183 .filter(|status| {
184 latest_histories
185 .get(&status.page_id)
186 .is_some_and(|history| status.synced_page_revision_id != Some(history.id))
187 })
188 .collect();
189
190 if outdated_statuses.is_empty() {
191 continue;
192 }
193
194 any_changes = true;
195 info!(
196 "Syncing {} pages for course id: {}.",
197 outdated_statuses.len(),
198 course_id
199 );
200
201 let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
202 let mut pages = headless_lms_models::pages::get_by_ids_and_visibility(
203 conn,
204 &page_ids,
205 PageVisibility::Public,
206 )
207 .await?;
208
209 if !pages.is_empty() {
210 sync_pages_batch(
211 conn,
212 &pages,
213 blob_client,
214 &base_url,
215 &config.app_configuration,
216 )
217 .await?;
218 }
219
220 let deleted_pages = headless_lms_models::pages::get_by_ids_deleted_and_visibility(
221 conn,
222 &page_ids,
223 PageVisibility::Public,
224 )
225 .await?;
226 pages.extend(deleted_pages);
227
228 update_sync_statuses(conn, &pages, &latest_histories).await?;
229
230 delete_old_files(conn, *course_id, blob_client).await?;
231 }
232
233 if any_changes {
234 run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
235 info!("New files have been synced and the search indexer has been started.");
236 }
237
238 Ok(())
239}
240
241async fn ensure_search_index_exists(
243 name: &str,
244 app_config: &ApplicationConfiguration,
245 container_name: &str,
246) -> anyhow::Result<()> {
247 if !does_search_index_exist(name, app_config).await? {
248 create_search_index(name.to_owned(), app_config).await?;
249 }
250 if !does_skillset_exist(name, app_config).await? {
251 create_skillset(name, name, app_config).await?;
252 }
253 if !does_azure_datasource_exist(name, app_config).await? {
254 create_azure_datasource(name, container_name, app_config).await?;
255 }
256 if !does_search_indexer_exist(name, app_config).await? {
257 create_search_indexer(name, name, name, name, app_config).await?;
258 }
259
260 Ok(())
261}
262
263async fn sync_pages_batch(
265 conn: &mut PgConnection,
266 pages: &[Page],
267 blob_client: &AzureBlobClient,
268 base_url: &Url,
269 app_config: &ApplicationConfiguration,
270) -> anyhow::Result<()> {
271 let course_id = pages
272 .first()
273 .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
274 .course_id
275 .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
276
277 let course = headless_lms_models::courses::get_course(conn, course_id).await?;
278 let organization =
279 headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
280
281 let mut base_url = base_url.clone();
282 base_url.set_path(&format!(
283 "/org/{}/courses/{}",
284 organization.slug, course.slug
285 ));
286
287 let mut allowed_file_paths = Vec::new();
288
289 for page in pages {
290 info!("Syncing page id: {}.", page.id);
291
292 let mut page_url = base_url.clone();
293 page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
294
295 let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
296 let sanitized_blocks = remove_sensitive_attributes(parsed_content);
297
298 let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
299 &sanitized_blocks,
300 app_config,
301 )
302 .await
303 {
304 Ok(markdown) => {
305 info!("Successfully cleaned content for page {}", page.id);
306 if markdown.trim().is_empty() {
308 warn!(
309 "Markdown is empty for page {}. Generating fallback content with a fake heading.",
310 page.id
311 );
312 format!("# {}", page.title)
313 } else {
314 markdown
315 }
316 }
317 Err(e) => {
318 warn!(
319 "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
320 page.id, e
321 );
322 serde_json::to_string(&sanitized_blocks)?
324 }
325 };
326
327 let blob_path = generate_blob_path(page)?;
328 let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
329 match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
330 Ok(c) => Some(c),
331 Err(e) => {
332 debug!("Chapter lookup failed for page {}: {}", page.id, e);
333 None
334 }
335 }
336 } else {
337 None
338 };
339
340 allowed_file_paths.push(blob_path.clone());
341 let mut metadata = HashMap::new();
342 metadata.insert("url".to_string(), page_url.to_string().into());
343 metadata.insert("title".to_string(), page.title.to_string().into());
344 metadata.insert(
345 "course_id".to_string(),
346 page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
347 );
348 metadata.insert(
349 "language".to_string(),
350 course.language_code.to_string().into(),
351 );
352 metadata.insert("filepath".to_string(), blob_path.clone().into());
353 if let Some(c) = chapter {
354 metadata.insert(
355 "chunk_context".to_string(),
356 format!(
357 "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
358 page.title, c.chapter_number, c.name, course.name,
359 )
360 .into(),
361 );
362 } else {
363 metadata.insert(
364 "chunk_context".to_string(),
365 format!(
366 "This chunk is a snippet from page {} of the course {}.",
367 page.title, course.name,
368 )
369 .into(),
370 );
371 }
372
373 if let Err(e) = blob_client
374 .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
375 .await
376 {
377 warn!("Failed to upload file {}: {:?}", blob_path, e);
378 }
379 }
380
381 Ok(())
382}
383
384async fn update_sync_statuses(
386 conn: &mut PgConnection,
387 pages: &[Page],
388 latest_histories: &HashMap<Uuid, PageHistory>,
389) -> anyhow::Result<()> {
390 let page_revision_map: HashMap<Uuid, Uuid> = pages
391 .iter()
392 .map(|page| (page.id, latest_histories[&page.id].id))
393 .collect();
394
395 headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
396 conn,
397 page_revision_map,
398 )
399 .await?;
400
401 Ok(())
402}
403
404fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
406 let course_id = page
407 .course_id
408 .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
409
410 Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
411}
412
413async fn delete_old_files(
415 conn: &mut PgConnection,
416 course_id: Uuid,
417 blob_client: &AzureBlobClient,
418) -> anyhow::Result<()> {
419 let mut courses_prefix = "courses/".to_string();
420 courses_prefix.push_str(&course_id.to_string());
421 let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
422
423 let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
424 conn,
425 course_id,
426 PageVisibility::Public,
427 )
428 .await?;
429
430 let allowed_paths: HashSet<String> = pages
431 .iter()
432 .filter_map(|page| generate_blob_path(page).ok())
433 .collect();
434
435 for file in existing_files {
436 if !allowed_paths.contains(&file) {
437 info!("Deleting obsolete file: {}", file);
438 blob_client.delete_file(&file).await?;
439 }
440 }
441
442 Ok(())
443}