headless_lms_server/programs/
chatbot_syncer.rs1use std::{
2 collections::{HashMap, HashSet},
3 env,
4 time::Duration,
5};
6
7use dotenv::dotenv;
8use sqlx::{PgConnection, PgPool};
9use url::Url;
10use uuid::Uuid;
11
12use crate::setup_tracing;
13
14use headless_lms_chatbot::{
15 azure_blob_storage::AzureBlobClient,
16 azure_datasources::{create_azure_datasource, does_azure_datasource_exist},
17 azure_search_index::{create_search_index, does_search_index_exist},
18 azure_search_indexer::{
19 check_search_indexer_status, create_search_indexer, does_search_indexer_exist,
20 run_search_indexer_now,
21 },
22 azure_skillset::{create_skillset, does_skillset_exist},
23 content_cleaner::convert_material_blocks_to_markdown_with_llm,
24};
25use headless_lms_models::{
26 page_history::PageHistory,
27 pages::{Page, PageVisibility},
28};
29use headless_lms_utils::{
30 ApplicationConfiguration,
31 document_schema_processor::{GutenbergBlock, remove_sensitive_attributes},
32};
33
34const SYNC_INTERVAL_SECS: u64 = 10;
35const PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD: u32 = 60;
36
37pub async fn main() -> anyhow::Result<()> {
38 initialize_environment()?;
39 let config = initialize_configuration().await?;
40 if config.app_configuration.azure_configuration.is_none() {
41 warn!("Azure configuration not provided. Not running chatbot syncer.");
42 loop {
44 tokio::time::sleep(Duration::from_secs(u64::MAX)).await;
45 }
46 }
47
48 let db_pool = initialize_database_pool(&config.database_url).await?;
49 let mut conn = db_pool.acquire().await?;
50 let blob_client = initialize_blob_client(&config).await?;
51
52 let mut interval = tokio::time::interval(Duration::from_secs(SYNC_INTERVAL_SECS));
53 let mut ticks = 0;
54
55 info!("Starting chatbot syncer.");
56
57 loop {
58 interval.tick().await;
59 ticks += 1;
60
61 if ticks >= PRINT_STILL_RUNNING_MESSAGE_TICKS_THRESHOLD {
62 ticks = 0;
63 info!("Still syncing for chatbot.");
64 }
65 if let Err(e) = sync_pages(&mut conn, &config, &blob_client).await {
66 error!("Error during synchronization: {:?}", e);
67 }
68 }
69}
70
71fn initialize_environment() -> anyhow::Result<()> {
72 unsafe { env::set_var("RUST_LOG", "info,actix_web=info,sqlx=warn") };
74 dotenv().ok();
75 setup_tracing()?;
76 Ok(())
77}
78
79struct SyncerConfig {
80 database_url: String,
81 name: String,
82 app_configuration: ApplicationConfiguration,
83}
84
85async fn initialize_configuration() -> anyhow::Result<SyncerConfig> {
86 let database_url = env::var("DATABASE_URL")
87 .unwrap_or_else(|_| "postgres://localhost/headless_lms_dev".to_string());
88
89 let base_url = Url::parse(&env::var("BASE_URL").expect("BASE_URL must be defined"))
90 .expect("BASE_URL must be a valid URL");
91
92 let name = base_url
93 .host_str()
94 .expect("BASE_URL must have a host")
95 .replace(".", "-");
96
97 let app_configuration = ApplicationConfiguration::try_from_env()?;
98
99 Ok(SyncerConfig {
100 database_url,
101 name,
102 app_configuration,
103 })
104}
105
106async fn initialize_database_pool(database_url: &str) -> anyhow::Result<PgPool> {
108 PgPool::connect(database_url).await.map_err(|e| {
109 anyhow::anyhow!(
110 "Failed to connect to the database at {}: {:?}",
111 database_url,
112 e
113 )
114 })
115}
116
117async fn initialize_blob_client(config: &SyncerConfig) -> anyhow::Result<AzureBlobClient> {
119 let blob_client = AzureBlobClient::new(&config.app_configuration, &config.name).await?;
120 blob_client.ensure_container_exists().await?;
121 Ok(blob_client)
122}
123
124async fn sync_pages(
126 conn: &mut PgConnection,
127 config: &SyncerConfig,
128 blob_client: &AzureBlobClient,
129) -> anyhow::Result<()> {
130 let base_url = Url::parse(&config.app_configuration.base_url)?;
131 let chatbot_configs =
132 headless_lms_models::chatbot_configurations::get_for_azure_search_maintenance(conn).await?;
133
134 let course_ids: Vec<Uuid> = chatbot_configs
135 .iter()
136 .map(|config| config.course_id)
137 .collect::<HashSet<_>>()
138 .into_iter()
139 .collect();
140
141 let sync_statuses =
142 headless_lms_models::chatbot_page_sync_statuses::ensure_sync_statuses_exist(
143 conn,
144 &course_ids,
145 )
146 .await?;
147
148 let latest_histories =
149 headless_lms_models::page_history::get_latest_history_entries_for_pages_by_course_ids(
150 conn,
151 &course_ids,
152 )
153 .await?;
154
155 let shared_index_name = config.name.clone();
156 ensure_search_index_exists(
157 &shared_index_name,
158 &config.app_configuration,
159 &blob_client.container_name,
160 )
161 .await?;
162
163 if !check_search_indexer_status(&shared_index_name, &config.app_configuration).await? {
164 warn!("Search indexer is not ready to index. Skipping synchronization.");
165 return Ok(());
166 }
167
168 let mut any_changes = false;
169
170 for (course_id, statuses) in sync_statuses.iter() {
171 let outdated_statuses: Vec<_> = statuses
172 .iter()
173 .filter(|status| {
174 latest_histories
175 .get(&status.page_id)
176 .is_some_and(|history| status.synced_page_revision_id != Some(history.id))
177 })
178 .collect();
179
180 if outdated_statuses.is_empty() {
181 continue;
182 }
183
184 any_changes = true;
185 info!(
186 "Syncing {} pages for course id: {}.",
187 outdated_statuses.len(),
188 course_id
189 );
190
191 let page_ids: Vec<Uuid> = outdated_statuses.iter().map(|s| s.page_id).collect();
192 let pages = headless_lms_models::pages::get_by_ids(conn, &page_ids).await?;
193
194 sync_pages_batch(
195 conn,
196 &pages,
197 &latest_histories,
198 blob_client,
199 &base_url,
200 &config.app_configuration,
201 )
202 .await?;
203
204 delete_old_files(conn, *course_id, blob_client).await?;
205 }
206
207 if any_changes {
208 run_search_indexer_now(&shared_index_name, &config.app_configuration).await?;
209 info!("New files have been synced and the search indexer has been started.");
210 }
211
212 Ok(())
213}
214
215async fn ensure_search_index_exists(
217 name: &str,
218 app_config: &ApplicationConfiguration,
219 container_name: &str,
220) -> anyhow::Result<()> {
221 if !does_search_index_exist(name, app_config).await? {
222 create_search_index(name.to_owned(), app_config).await?;
223 }
224 if !does_skillset_exist(name, app_config).await? {
225 create_skillset(name, name, app_config).await?;
226 }
227 if !does_azure_datasource_exist(name, app_config).await? {
228 create_azure_datasource(name, container_name, app_config).await?;
229 }
230 if !does_search_indexer_exist(name, app_config).await? {
231 create_search_indexer(name, name, name, name, app_config).await?;
232 }
233
234 Ok(())
235}
236
237async fn sync_pages_batch(
239 conn: &mut PgConnection,
240 pages: &[Page],
241 latest_histories: &HashMap<Uuid, PageHistory>,
242 blob_client: &AzureBlobClient,
243 base_url: &Url,
244 app_config: &ApplicationConfiguration,
245) -> anyhow::Result<()> {
246 let course_id = pages
247 .first()
248 .ok_or_else(|| anyhow::anyhow!("No pages to sync."))?
249 .course_id
250 .ok_or_else(|| anyhow::anyhow!("The first page does not belong to any course."))?;
251
252 let course = headless_lms_models::courses::get_course(conn, course_id).await?;
253 let organization =
254 headless_lms_models::organizations::get_organization(conn, course.organization_id).await?;
255
256 let mut base_url = base_url.clone();
257 base_url.set_path(&format!(
258 "/org/{}/courses/{}",
259 organization.slug, course.slug
260 ));
261
262 let mut allowed_file_paths = Vec::new();
263
264 for page in pages {
265 info!("Syncing page id: {}.", page.id);
266
267 let mut page_url = base_url.clone();
268 page_url.set_path(&format!("{}{}", base_url.path(), page.url_path));
269
270 let parsed_content: Vec<GutenbergBlock> = serde_json::from_value(page.content.clone())?;
271 let sanitized_blocks = remove_sensitive_attributes(parsed_content);
272
273 let content_to_upload = match convert_material_blocks_to_markdown_with_llm(
274 &sanitized_blocks,
275 app_config,
276 )
277 .await
278 {
279 Ok(markdown) => {
280 info!("Successfully cleaned content for page {}", page.id);
281 if markdown.trim().is_empty() {
283 warn!(
284 "Markdown is empty for page {}. Generating fallback content with a fake heading.",
285 page.id
286 );
287 format!("# {}", page.title)
288 } else {
289 markdown
290 }
291 }
292 Err(e) => {
293 warn!(
294 "Failed to clean content with LLM for page {}: {}. Using serialized sanitized content instead.",
295 page.id, e
296 );
297 serde_json::to_string(&sanitized_blocks)?
299 }
300 };
301
302 let blob_path = generate_blob_path(page)?;
303
304 allowed_file_paths.push(blob_path.clone());
305 let mut metadata = HashMap::new();
306 metadata.insert("url".to_string(), page_url.to_string().into());
307 metadata.insert("title".to_string(), page.title.to_string().into());
308 metadata.insert(
309 "course_id".to_string(),
310 page.course_id.unwrap_or(Uuid::nil()).to_string().into(),
311 );
312 metadata.insert(
313 "language".to_string(),
314 course.language_code.to_string().into(),
315 );
316
317 if let Err(e) = blob_client
318 .upload_file(&blob_path, content_to_upload.as_bytes(), Some(metadata))
319 .await
320 {
321 warn!("Failed to upload file {}: {:?}", blob_path, e);
322 }
323 }
324
325 let page_revision_map: HashMap<Uuid, Uuid> = pages
326 .iter()
327 .map(|page| (page.id, latest_histories[&page.id].id))
328 .collect();
329
330 headless_lms_models::chatbot_page_sync_statuses::update_page_revision_ids(
331 conn,
332 page_revision_map,
333 )
334 .await?;
335
336 Ok(())
337}
338
339fn generate_blob_path(page: &Page) -> anyhow::Result<String> {
341 let course_id = page
342 .course_id
343 .ok_or_else(|| anyhow::anyhow!("Page {} does not belong to any course.", page.id))?;
344
345 let mut url_path = page.url_path.trim_start_matches('/').to_string();
346 if url_path.is_empty() {
347 url_path = "index".to_string();
348 }
349
350 Ok(format!("courses/{}/{}.md", course_id, url_path))
351}
352
353async fn delete_old_files(
355 conn: &mut PgConnection,
356 course_id: Uuid,
357 blob_client: &AzureBlobClient,
358) -> anyhow::Result<()> {
359 let existing_files = blob_client
360 .list_files_with_prefix(&course_id.to_string())
361 .await?;
362
363 let pages = headless_lms_models::pages::get_all_by_course_id_and_visibility(
364 conn,
365 course_id,
366 PageVisibility::Public,
367 )
368 .await?;
369
370 let allowed_paths: HashSet<String> = pages
371 .iter()
372 .filter_map(|page| generate_blob_path(page).ok())
373 .collect();
374
375 for file in existing_files {
376 if !allowed_paths.contains(&file) {
377 info!("Deleting obsolete file: {}", file);
378 blob_client.delete_file(&file).await?;
379 }
380 }
381
382 Ok(())
383}