headless_lms_models/
chatbot_page_sync_statuses.rs

1use std::collections::HashMap;
2
3use crate::prelude::*;
4
5#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
6#[cfg_attr(feature = "ts_rs", derive(TS))]
7pub struct ChatbotPageSyncStatus {
8    pub id: Uuid,
9    pub created_at: DateTime<Utc>,
10    pub updated_at: DateTime<Utc>,
11    pub deleted_at: Option<DateTime<Utc>>,
12    pub course_id: Uuid,
13    pub page_id: Uuid,
14    pub error_message: Option<String>,
15    pub synced_page_revision_id: Option<Uuid>,
16    pub consecutive_failures: i32,
17}
18
19pub async fn ensure_sync_statuses_exist(
20    conn: &mut PgConnection,
21    course_ids: &[Uuid],
22) -> ModelResult<HashMap<Uuid, Vec<ChatbotPageSyncStatus>>> {
23    sqlx::query!(
24        r#"
25INSERT INTO chatbot_page_sync_statuses (course_id, page_id)
26SELECT course_id,
27  id
28FROM pages
29WHERE course_id = ANY($1)
30  AND deleted_at IS NULL
31  AND hidden IS FALSE ON CONFLICT (page_id, deleted_at) DO NOTHING
32        "#,
33        course_ids
34    )
35    .execute(&mut *conn)
36    .await?;
37
38    let all_statuses = sqlx::query_as!(
39        ChatbotPageSyncStatus,
40        r#"
41SELECT *
42FROM chatbot_page_sync_statuses
43WHERE course_id = ANY($1)
44        "#,
45        course_ids
46    )
47    .fetch_all(&mut *conn)
48    .await?
49    .into_iter()
50    .fold(
51        HashMap::<Uuid, Vec<ChatbotPageSyncStatus>>::new(),
52        |mut map, status| {
53            map.entry(status.course_id).or_default().push(status);
54            map
55        },
56    );
57
58    Ok(all_statuses)
59}
60
61// Given a mapping from page id to the new revision id, update the sync statuses
62pub async fn update_page_revision_ids(
63    conn: &mut PgConnection,
64    page_id_to_new_revision_id: HashMap<Uuid, Uuid>,
65) -> ModelResult<()> {
66    // If there are no updates to perform, return early
67    if page_id_to_new_revision_id.is_empty() {
68        return Ok(());
69    }
70    let (page_ids, revision_ids): (Vec<Uuid>, Vec<Uuid>) =
71        page_id_to_new_revision_id.into_iter().unzip();
72
73    sqlx::query!(
74        r#"
75UPDATE chatbot_page_sync_statuses AS cps
76SET synced_page_revision_id = data.synced_page_revision_id,
77    error_message = NULL,
78    consecutive_failures = 0
79FROM (
80    SELECT unnest($1::uuid []) AS page_id,
81      unnest($2::uuid []) AS synced_page_revision_id
82  ) AS data
83WHERE cps.page_id = data.page_id
84AND cps.deleted_at IS NULL
85    "#,
86        &page_ids,
87        &revision_ids
88    )
89    .execute(conn)
90    .await?;
91
92    Ok(())
93}
94
95pub async fn set_page_sync_error(
96    conn: &mut PgConnection,
97    page_id: Uuid,
98    error_message: &str,
99) -> ModelResult<()> {
100    sqlx::query!(
101        r#"
102UPDATE chatbot_page_sync_statuses
103SET error_message = $2,
104    consecutive_failures = consecutive_failures + 1
105WHERE page_id = $1
106AND deleted_at IS NULL
107    "#,
108        page_id,
109        error_message
110    )
111    .execute(conn)
112    .await?;
113
114    Ok(())
115}
116
117/// Clears sync statuses for the given page IDs.
118/// This is used when pages become hidden to ensure they'll be re-synced if unhidden.
119pub async fn clear_sync_statuses(conn: &mut PgConnection, page_ids: &[Uuid]) -> ModelResult<()> {
120    if page_ids.is_empty() {
121        return Ok(());
122    }
123
124    sqlx::query!(
125        r#"
126UPDATE chatbot_page_sync_statuses
127SET synced_page_revision_id = NULL,
128    error_message = NULL,
129    consecutive_failures = 0
130WHERE page_id = ANY($1)
131AND deleted_at IS NULL
132        "#,
133        page_ids
134    )
135    .execute(conn)
136    .await?;
137
138    Ok(())
139}