headless_lms_server/programs/
chatbot_syncer.rs1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use dotenv::dotenv;
8use sqlx::{PgConnection, PgPool};
9use url::Url;
10use uuid::Uuid;
11
12use crate::setup_tracing;
13
14use headless_lms_chatbot::{
15 azure_blob_storage::AzureBlobClient,
16 azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
17 azure_search_index::{create_search_index, does_search_index_exist},
18 azure_search_indexer::{
19 check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
20 run_search_indexer_now,
21 },
22 azure_skillset::{create_skillset, does_skillset_exist},
23 content_cleaner::convert_material_blocks_to_markdown_with_llm,
24};
25use headless_lms_models::{
26 chapters::DatabaseChapter,
27 page_history::PageHistory,
28 pages::{Page, PageVisibility},
29};
30use headless_lms_utils::{
31 ApplicationConfiguration,
32 document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
33};
34
35const SYNC_INTERVAL_SECS: u64 = 10;
36const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
37
38pub async fn main() -> anyhow::Result<()> {
39 initialize_environment()?;
40 let config = initialize_configuration().await?;
41 if config.app_configuration.azure_configuration.is_none() {
42 warn!("Azure configuration not provided. Not running chatbot syncer.");
43 loop {
45 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
46 }
47 }
48
49 let db_pool = initialize_database_pool(&config.database_url).await?;
50 let mut conn = db_pool.acquire().await?;
51 let blob_client = initialize_blob_client(&config).await?;
52
53 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
54 let mut ticks = 0;
55
56 info!("Starting chatbot syncer.");
57
58 loop {
59 interval.tick().await;
60 ticks += 1;
61
62 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
63 ticks = 0;
64 info!("Still syncing for chatbot.");
65 }
66 if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
67 error!("Error during synchronization: {:?}", e);
68 }
69 }
70}
71
72fn initialize_environment() -> anyhow::Result<()> {
73 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
75 dotenv().ok();
76 setup_tracing()?;
77 Ok(())
78}
79
80struct SyncerConfig {
81 database_url: String,
82 name: String,
83 app_configuration: ApplicationConfiguration,
84}
85
86async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
87 let database_url = env::var("DATABASE_URL")
88 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
89
90 let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
91 .expect("BASE_URL must be a valid URL");
92
93 let name = base_url
94 .host_str()
95 .expect("BASE_URL must have a host")
96 .replace(".", "-");
97
98 let app_configuration = ApplicationConfiguration::try_from_env()?;
99
100 Ok(SyncerConfig {
101 database_url,
102 name,
103 app_configuration,
104 })
105}
106
107async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
109 PgPool::connect(database_url).await.map_err(|e| {
110 anyhow::anyhow!(
111 "Failed to connect to the database at {}: {:?}",
112 database_url,
113 e
114 )
115 })
116}
117
118async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
120 let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
121 blob_client.ensure_container_exists().await?;
122 Ok(blob_client)
123}
124
125async fn sync_pages(
127 conn: &mut PgConnection,
128 config: &SyncerConfig,
129 blob_client: &AzureBlobClient,
130) -> anyhow::Result<()> {
131 let base_url = Url::parse(&config.app_configuration.base_url)?;
132 let chatbot_configs =
133 headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
134
135 let course_ids: Vec<Uuid> = chatbot_configs
136 .iter()
137 .map(|config| config.course_id)
138 .collect::<HashSet<_>>()
139 .into_iter()
140 .collect();
141
142 let sync_statuses =
143 headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
144 conn,
145 &course_ids,
146 )
147 .await?;
148
149 let latest_histories =
150 headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
151 conn,
152 &course_ids,
153 )
154 .await?;
155
156 let shared_index_name = config.name.clone();
157 ensure_search_index_exists(
158 &shared_index_name,
159 &config.app_configuration,
160 &blob_client.container_name,
161 )
162 .await?;
163
164 if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
165 warn!("Search indexer is not ready to index. Skipping synchronization.");
166 return Ok(());
167 }
168
169 let mut any_changes = false;
170
171 for (course_id, statuses) in sync_statuses.iter() {
172 let outdated_statuses: Vec<_> = statuses
173 .iter()
174 .filter(|status| {
175 latest_histories
176 .get(&status.page_id)
177 .is_some_and(|history| status.synced_page_revision_id != Some(history.id))
178 })
179 .collect();
180
181 if outdated_statuses.is_empty() {
182 continue;
183 }
184
185 any_changes = true;
186 info!(
187 "Syncing {} pages for course id: {}.",
188 outdated_statuses.len(),
189 course_id
190 );
191
192 let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
193 let mut pages = headless_lms_models::pages::get_by_ids_and_visibility(
194 conn,
195 &page_ids,
196 PageVisibility::Public,
197 )
198 .await?;
199
200 if !pages.is_empty() {
201 sync_pages_batch(
202 conn,
203 &pages,
204 blob_client,
205 &base_url,
206 &config.app_configuration,
207 )
208 .await?;
209 }
210
211 let deleted_pages = headless_lms_models::pages::get_by_ids_deleted_and_visibility(
212 conn,
213 &page_ids,
214 PageVisibility::Public,
215 )
216 .await?;
217 pages.extend(deleted_pages);
218
219 update_sync_statuses(conn, &pages, &latest_histories).await?;
220
221 delete_old_files(conn, *course_id, blob_client).await?;
222 }
223
224 if any_changes {
225 run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
226 info!("New files have been synced and the search indexer has been started.");
227 }
228
229 Ok(())
230}
231
232async fn ensure_search_index_exists(
234 name: &str,
235 app_config: &ApplicationConfiguration,
236 container_name: &str,
237) -> anyhow::Result<()> {
238 if !does_search_index_exist(name, app_config).await? {
239 create_search_index(name.to_owned(), app_config).await?;
240 }
241 if !does_skillset_exist(name, app_config).await? {
242 create_skillset(name, name, app_config).await?;
243 }
244 if !does_azure_datasource_exist(name, app_config).await? {
245 create_azure_datasource(name, container_name, app_config).await?;
246 }
247 if !does_search_indexer_exist(name, app_config).await? {
248 create_search_indexer(name, name, name, name, app_config).await?;
249 }
250
251 Ok(())
252}
253
254async fn sync_pages_batch(
256 conn: &mut PgConnection,
257 pages: &[Page],
258 blob_client: &AzureBlobClient,
259 base_url: &Url,
260 app_config: &ApplicationConfiguration,
261) -> anyhow::Result<()> {
262 let course_id = pages
263 .first()
264 .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
265 .course_id
266 .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
267
268 let course = headless_lms_models::courses::get_course(conn, course_id).await?;
269 let organization =
270 headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
271
272 let mut base_url = base_url.clone();
273 base_url.set_path(&format!(
274 "/org/{}/courses/{}",
275 organization.slug, course.slug
276 ));
277
278 let mut allowed_file_paths = Vec::new();
279
280 for page in pages {
281 info!("Syncing page id: {}.", page.id);
282
283 let mut page_url = base_url.clone();
284 page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
285
286 let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
287 let sanitized_blocks = remove_sensitive_attributes(parsed_content);
288
289 let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
290 &sanitized_blocks,
291 app_config,
292 )
293 .await
294 {
295 Ok(markdown) => {
296 info!("Successfully cleaned content for page {}", page.id);
297 if markdown.trim().is_empty() {
299 warn!(
300 "Markdown is empty for page {}. Generating fallback content with a fake heading.",
301 page.id
302 );
303 format!("# {}", page.title)
304 } else {
305 markdown
306 }
307 }
308 Err(e) => {
309 warn!(
310 "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
311 page.id, e
312 );
313 serde_json::to_string(&sanitized_blocks)?
315 }
316 };
317
318 let blob_path = generate_blob_path(page)?;
319 let chapter: Option<DatabaseChapter> = if page.chapter_id.is_some() {
320 match headless_lms_models::chapters::get_chapter_by_page_id(conn, page.id).await {
321 Ok(c) => Some(c),
322 Err(e) => {
323 debug!("Chapter lookup failed for page {}: {}", page.id, e);
324 None
325 }
326 }
327 } else {
328 None
329 };
330
331 allowed_file_paths.push(blob_path.clone());
332 let mut metadata = HashMap::new();
333 metadata.insert("url".to_string(), page_url.to_string().into());
334 metadata.insert("title".to_string(), page.title.to_string().into());
335 metadata.insert(
336 "course_id".to_string(),
337 page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
338 );
339 metadata.insert(
340 "language".to_string(),
341 course.language_code.to_string().into(),
342 );
343 metadata.insert("filepath".to_string(), blob_path.clone().into());
344 if let Some(c) = chapter {
345 metadata.insert(
346 "chunk_context".to_string(),
347 format!(
348 "This chunk is a snippet from page {} from chapter {}: {} of the course {}.",
349 page.title, c.chapter_number, c.name, course.name,
350 )
351 .into(),
352 );
353 } else {
354 metadata.insert(
355 "chunk_context".to_string(),
356 format!(
357 "This chunk is a snippet from page {} of the course {}.",
358 page.title, course.name,
359 )
360 .into(),
361 );
362 }
363
364 if let Err(e) = blob_client
365 .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
366 .await
367 {
368 warn!("Failed to upload file {}: {:?}", blob_path, e);
369 }
370 }
371
372 Ok(())
373}
374
375async fn update_sync_statuses(
377 conn: &mut PgConnection,
378 pages: &[Page],
379 latest_histories: &HashMap<Uuid, PageHistory>,
380) -> anyhow::Result<()> {
381 let page_revision_map: HashMap<Uuid, Uuid> = pages
382 .iter()
383 .map(|page| (page.id, latest_histories[&page.id].id))
384 .collect();
385
386 headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
387 conn,
388 page_revision_map,
389 )
390 .await?;
391
392 Ok(())
393}
394
395fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
397 let course_id = page
398 .course_id
399 .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
400
401 Ok(format!("courses/{}/pages/{}.md", course_id, page.id))
402}
403
404async fn delete_old_files(
406 conn: &mut PgConnection,
407 course_id: Uuid,
408 blob_client: &AzureBlobClient,
409) -> anyhow::Result<()> {
410 let mut courses_prefix = "courses/".to_string();
411 courses_prefix.push_str(&course_id.to_string());
412 let existing_files = blob_client.list_files_with_prefix(&courses_prefix).await?;
413
414 let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
415 conn,
416 course_id,
417 PageVisibility::Public,
418 )
419 .await?;
420
421 let allowed_paths: HashSet<String> = pages
422 .iter()
423 .filter_map(|page| generate_blob_path(page).ok())
424 .collect();
425
426 for file in existing_files {
427 if !allowed_paths.contains(&file) {
428 info!("Deleting obsolete file: {}", file);
429 blob_client.delete_file(&file).await?;
430 }
431 }
432
433 Ok(())
434}