headless_lms_server/programs/mailchimp_syncer/
mailchimp_ops.rs1use headless_lms_models::marketing_consents::MarketingMailingListAccessToken;
2use headless_lms_utils::http::REQWEST_CLIENT;
3use reqwest::Method;
4use secrecy::ExposeSecret;
5use serde_json::Value;
6use std::collections::HashMap;
7use std::time::Duration;
8
9use super::batch_client::{
10 BatchOperation, parse_batch_results_from_tar_gz, parse_operation_response_value,
11 poll_batch_until_finished_with_response_url, submit_batch,
12};
13
14const DEFAULT_BATCH_THRESHOLD: usize = 5;
15
16#[derive(Debug, Clone)]
17pub struct MailchimpOperation {
18 pub method: Method,
19 pub path: String,
20 pub body: Option<Value>,
21 pub operation_id: Option<String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct MailchimpOpResult {
26 pub operation_id: Option<String>,
27 pub status_code: u16,
28 pub response_raw: Value,
29 pub response_json: Option<Value>,
30 pub error: Option<String>,
31}
32
33impl MailchimpOpResult {
34 pub fn is_success(&self) -> bool {
35 (200..=299).contains(&self.status_code) && self.error.is_none()
36 }
37}
38
39pub struct MailchimpExecutor {
40 batch_threshold: usize,
41 timeout: Duration,
42 poll_interval: Duration,
43}
44
45impl MailchimpExecutor {
46 pub fn new(timeout: Duration, poll_interval: Duration) -> Self {
48 Self {
49 batch_threshold: DEFAULT_BATCH_THRESHOLD,
50 timeout,
51 poll_interval,
52 }
53 }
54
55 pub async fn execute(
56 &self,
57 token: &MarketingMailingListAccessToken,
58 ops: Vec<MailchimpOperation>,
59 ) -> anyhow::Result<Vec<MailchimpOpResult>> {
60 if ops.is_empty() {
61 return Ok(vec![]);
62 }
63 if ops.len() < self.batch_threshold {
64 Ok(execute_direct(token, ops).await)
65 } else {
66 execute_batch(token, ops, self.timeout, self.poll_interval).await
67 }
68 }
69}
70
71async fn execute_direct(
72 token: &MarketingMailingListAccessToken,
73 ops: Vec<MailchimpOperation>,
74) -> Vec<MailchimpOpResult> {
75 let mut results = Vec::with_capacity(ops.len());
76 for op in ops {
77 let url = format!(
78 "https://{}.api.mailchimp.com/3.0{}",
79 token.server_prefix, op.path
80 );
81 let mut request = REQWEST_CLIENT.request(op.method.clone(), &url).header(
82 "Authorization",
83 format!("apikey {}", token.access_token.expose_secret()),
84 );
85 if let Some(body) = op.body.clone() {
86 request = request.json(&body);
87 }
88 let response = match request.send().await {
89 Ok(resp) => resp,
90 Err(err) => {
91 results.push(MailchimpOpResult {
92 operation_id: op.operation_id.clone(),
93 status_code: 0,
94 response_raw: Value::Null,
95 response_json: None,
96 error: Some(format!("Transport error: {}", err)),
97 });
98 continue;
99 }
100 };
101
102 let status_code = response.status().as_u16();
103 let body_bytes = match response.bytes().await {
104 Ok(bytes) => bytes,
105 Err(err) => {
106 results.push(MailchimpOpResult {
107 operation_id: op.operation_id.clone(),
108 status_code,
109 response_raw: Value::Null,
110 response_json: None,
111 error: Some(format!("Body read error: {}", err)),
112 });
113 continue;
114 }
115 };
116
117 let response_raw = match serde_json::from_slice::<Value>(&body_bytes) {
118 Ok(json) => json,
119 Err(_) => Value::String(String::from_utf8_lossy(&body_bytes).to_string()),
120 };
121 let response_json = parse_operation_response_value(&response_raw);
122 results.push(MailchimpOpResult {
123 operation_id: op.operation_id.clone(),
124 status_code,
125 response_raw,
126 response_json,
127 error: None,
128 });
129 }
130 results
131}
132
133async fn execute_batch(
134 token: &MarketingMailingListAccessToken,
135 ops: Vec<MailchimpOperation>,
136 timeout: Duration,
137 poll_interval: Duration,
138) -> anyhow::Result<Vec<MailchimpOpResult>> {
139 if ops.is_empty() {
140 return Ok(vec![]);
141 }
142
143 let batch_ops: Vec<BatchOperation> = ops
144 .iter()
145 .map(|op| BatchOperation {
146 method: op.method.as_str().to_string(),
147 path: op.path.clone(),
148 body: op.body.as_ref().map(|b| b.to_string()).unwrap_or_default(),
149 operation_id: op.operation_id.clone(),
150 })
151 .collect();
152
153 let batch_ids = submit_batch(&token.server_prefix, &token.access_token, batch_ops).await?;
154
155 let mut results = Vec::new();
156 for batch_id in &batch_ids {
157 let poll_result = poll_batch_until_finished_with_response_url(
158 &token.server_prefix,
159 &token.access_token,
160 batch_id,
161 timeout,
162 poll_interval,
163 )
164 .await?;
165 if poll_result.total_operations > 0 {
166 info!(
167 "Mailchimp batch {} finished with {} errored operations out of {}",
168 batch_id, poll_result.errored_operations, poll_result.total_operations
169 );
170 }
171 let response_url = match poll_result.response_body_url {
172 Some(url) => url,
173 None => {
174 return Err(anyhow::anyhow!(
175 "Mailchimp batch {} finished without response_body_url",
176 batch_id
177 ));
178 }
179 };
180 let response = REQWEST_CLIENT.get(&response_url).send().await?;
181 if !response.status().is_success() {
182 let error_text = response.text().await.unwrap_or_default();
183 return Err(anyhow::anyhow!(
184 "Failed to fetch Mailchimp batch results: {}",
185 error_text
186 ));
187 }
188 let bytes = response.bytes().await?;
189 let batch_results = parse_batch_results_from_tar_gz(&bytes)?;
190 for result in batch_results {
191 let response_json = parse_operation_response_value(&result.response);
192 results.push(MailchimpOpResult {
193 operation_id: result.operation_id.clone(),
194 status_code: result.status_code,
195 response_raw: result.response,
196 response_json,
197 error: None,
198 });
199 }
200 }
201
202 Ok(order_results(&ops, results))
203}
204
205fn order_results(
206 ops: &[MailchimpOperation],
207 results: Vec<MailchimpOpResult>,
208) -> Vec<MailchimpOpResult> {
209 let mut ordered = Vec::new();
210 let mut results_by_id: HashMap<String, Vec<MailchimpOpResult>> = HashMap::new();
211 let mut unmatched = Vec::new();
212
213 for result in results {
214 if let Some(ref op_id) = result.operation_id {
215 results_by_id.entry(op_id.clone()).or_default().push(result);
216 } else {
217 unmatched.push(result);
218 }
219 }
220
221 for op in ops {
222 if let Some(ref op_id) = op.operation_id
223 && let Some(mut entries) = results_by_id.remove(op_id)
224 {
225 ordered.append(&mut entries);
226 }
227 }
228
229 for (_, mut entries) in results_by_id {
230 ordered.append(&mut entries);
231 }
232
233 ordered.extend(unmatched);
234 ordered
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn order_results_respects_input_order() {
243 let ops = vec![
244 MailchimpOperation {
245 method: Method::GET,
246 path: "/a".to_string(),
247 body: None,
248 operation_id: Some("u1".to_string()),
249 },
250 MailchimpOperation {
251 method: Method::GET,
252 path: "/b".to_string(),
253 body: None,
254 operation_id: Some("u2".to_string()),
255 },
256 ];
257 let results = vec![
258 MailchimpOpResult {
259 operation_id: Some("u2".to_string()),
260 status_code: 200,
261 response_raw: Value::Null,
262 response_json: None,
263 error: None,
264 },
265 MailchimpOpResult {
266 operation_id: Some("u1".to_string()),
267 status_code: 200,
268 response_raw: Value::Null,
269 response_json: None,
270 error: None,
271 },
272 ];
273 let ordered = order_results(&ops, results);
274 assert_eq!(ordered[0].operation_id.as_deref(), Some("u1"));
275 assert_eq!(ordered[1].operation_id.as_deref(), Some("u2"));
276 }
277}