headless_lms_server/domain/system_health/
kubernetes.rs

1//! Functions for fetching data from Kubernetes API.
2
3use super::{
4    CronJobInfo, DeploymentInfo, EventInfo, IngressInfo, JobInfo, PodDisruptionBudgetInfo, PodInfo,
5    ServiceInfo, ServicePortInfo,
6};
7use anyhow::Result;
8use k8s_openapi::api::{
9    apps::v1::Deployment,
10    batch::v1::{CronJob, Job},
11    core::v1::{Event, Pod, Service},
12    networking::v1::Ingress,
13    policy::v1::PodDisruptionBudget,
14};
15use kube::{
16    Api, Client,
17    api::{ListParams, LogParams},
18};
19use std::collections::HashMap;
20
21pub async fn get_pods(ns: &str) -> Result<Vec<PodInfo>> {
22    let client = Client::try_default().await?;
23    let lp = ListParams::default();
24    let pods: Api<Pod> = Api::namespaced(client, ns);
25    let pod_list = pods.list(&lp).await?;
26    let pods_info: Vec<PodInfo> = pod_list
27        .iter()
28        .map(|p| {
29            let name = p
30                .metadata
31                .name
32                .as_deref()
33                .unwrap_or("<unnamed>")
34                .to_string();
35            let phase = p
36                .status
37                .as_ref()
38                .and_then(|s| s.phase.as_deref())
39                .unwrap_or("Unknown")
40                .to_string();
41            let ready = p.status.as_ref().and_then(|s| {
42                s.conditions.as_ref().and_then(|conditions| {
43                    conditions
44                        .iter()
45                        .find(|c| c.type_ == "Ready")
46                        .map(|c| c.status == "True")
47                })
48            });
49            let labels: HashMap<String, String> = p
50                .metadata
51                .labels
52                .clone()
53                .unwrap_or_default()
54                .into_iter()
55                .collect();
56            PodInfo {
57                name,
58                phase,
59                ready,
60                labels,
61            }
62        })
63        .collect();
64    Ok(pods_info)
65}
66
67pub async fn get_deployments(ns: &str) -> Result<Vec<DeploymentInfo>> {
68    let client = Client::try_default().await?;
69    let lp = ListParams::default();
70    let deployments: Api<Deployment> = Api::namespaced(client, ns);
71    let deploy_list = deployments.list(&lp).await?;
72    let deployments_info: Vec<DeploymentInfo> = deploy_list
73        .iter()
74        .map(|d| {
75            let name = d
76                .metadata
77                .name
78                .as_deref()
79                .unwrap_or("<unnamed>")
80                .to_string();
81            let replicas = d.status.as_ref().and_then(|s| s.replicas).unwrap_or(0);
82            let ready_replicas = d
83                .status
84                .as_ref()
85                .and_then(|s| s.ready_replicas)
86                .unwrap_or(0);
87            let selector_labels: HashMap<String, String> = d
88                .spec
89                .as_ref()
90                .and_then(|s| s.selector.match_labels.clone())
91                .unwrap_or_default()
92                .into_iter()
93                .collect();
94            DeploymentInfo {
95                name,
96                replicas,
97                ready_replicas,
98                selector_labels,
99            }
100        })
101        .collect();
102    Ok(deployments_info)
103}
104
105pub async fn get_cronjobs(ns: &str) -> Result<Vec<CronJobInfo>> {
106    let client = Client::try_default().await?;
107    let lp = ListParams::default();
108    let cronjobs: Api<CronJob> = Api::namespaced(client, ns);
109    let cron_list = cronjobs.list(&lp).await?;
110    let cronjobs_info: Vec<CronJobInfo> = cron_list
111        .iter()
112        .map(|cj| {
113            let name = cj
114                .metadata
115                .name
116                .as_deref()
117                .unwrap_or("<unnamed>")
118                .to_string();
119            let schedule = cj
120                .spec
121                .as_ref()
122                .map(|s| s.schedule.clone())
123                .unwrap_or_else(|| "<no schedule>".to_string());
124            let last_schedule_time = cj
125                .status
126                .as_ref()
127                .and_then(|s| s.last_schedule_time.as_ref())
128                .map(|t| t.0.format("%Y-%m-%d %H:%M:%S UTC").to_string());
129            CronJobInfo {
130                name,
131                schedule,
132                last_schedule_time,
133            }
134        })
135        .collect();
136    Ok(cronjobs_info)
137}
138
139pub async fn get_jobs(ns: &str) -> Result<Vec<JobInfo>> {
140    let client = Client::try_default().await?;
141    let lp = ListParams::default();
142    let jobs: Api<Job> = Api::namespaced(client, ns);
143    let job_list = jobs.list(&lp).await?;
144    let jobs_info: Vec<JobInfo> = job_list
145        .iter()
146        .map(|j| {
147            let name = j
148                .metadata
149                .name
150                .as_deref()
151                .unwrap_or("<unnamed>")
152                .to_string();
153            let succeeded = j.status.as_ref().and_then(|s| s.succeeded);
154            let failed = j.status.as_ref().and_then(|s| s.failed);
155            let active = j.status.as_ref().and_then(|s| s.active);
156            JobInfo {
157                name,
158                succeeded,
159                failed,
160                active,
161            }
162        })
163        .collect();
164    Ok(jobs_info)
165}
166
167pub async fn get_services(ns: &str) -> Result<Vec<ServiceInfo>> {
168    let client = Client::try_default().await?;
169    let lp = ListParams::default();
170    let services: Api<Service> = Api::namespaced(client, ns);
171    let service_list = services.list(&lp).await?;
172    let services_info: Vec<ServiceInfo> = service_list
173        .iter()
174        .map(|s| {
175            let name = s
176                .metadata
177                .name
178                .as_deref()
179                .unwrap_or("<unnamed>")
180                .to_string();
181            let cluster_ip = s.spec.as_ref().and_then(|spec| spec.cluster_ip.clone());
182            let ports: Vec<ServicePortInfo> = s
183                .spec
184                .as_ref()
185                .and_then(|spec| spec.ports.as_ref())
186                .map(|ports| {
187                    ports
188                        .iter()
189                        .map(|p| ServicePortInfo {
190                            name: p.name.clone(),
191                            port: p.port,
192                            target_port: p.target_port.as_ref().map(|tp| match tp {
193                                k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(i) => {
194                                    i.to_string()
195                                }
196                                k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::String(s) => {
197                                    s.clone()
198                                }
199                            }),
200                            protocol: p.protocol.clone(),
201                        })
202                        .collect()
203                })
204                .unwrap_or_default();
205            ServiceInfo {
206                name,
207                cluster_ip,
208                ports,
209            }
210        })
211        .collect();
212    Ok(services_info)
213}
214
215pub async fn get_events(ns: &str) -> Result<Vec<EventInfo>> {
216    let client = Client::try_default().await?;
217    let lp = ListParams::default();
218    let events: Api<Event> = Api::namespaced(client, ns);
219    let event_list = events.list(&lp).await?;
220    let events_info: Vec<EventInfo> = event_list
221        .iter()
222        .map(|e| {
223            let name = e
224                .metadata
225                .name
226                .as_deref()
227                .unwrap_or("<unnamed>")
228                .to_string();
229            let reason = e.reason.clone();
230            let message = e.message.clone();
231            let type_ = e.type_.clone();
232            let first_timestamp = e
233                .first_timestamp
234                .as_ref()
235                .map(|t| t.0.format("%Y-%m-%d %H:%M:%S UTC").to_string());
236            let last_timestamp = e
237                .last_timestamp
238                .as_ref()
239                .map(|t| t.0.format("%Y-%m-%d %H:%M:%S UTC").to_string());
240            let count = e.count;
241            let involved_object_kind = e.involved_object.kind.clone();
242            let involved_object_name = e.involved_object.name.clone();
243            EventInfo {
244                name,
245                reason,
246                message,
247                type_,
248                first_timestamp,
249                last_timestamp,
250                count,
251                involved_object_kind,
252                involved_object_name,
253            }
254        })
255        .collect();
256    Ok(events_info)
257}
258
259pub async fn get_pod_disruption_budgets(ns: &str) -> Result<Vec<PodDisruptionBudgetInfo>> {
260    let client = Client::try_default().await?;
261    let lp = ListParams::default();
262    let pdbs: Api<PodDisruptionBudget> = Api::namespaced(client, ns);
263    let pdb_list = pdbs.list(&lp).await?;
264    let pdbs_info: Vec<PodDisruptionBudgetInfo> = pdb_list
265        .iter()
266        .map(|pdb| {
267            let name = pdb
268                .metadata
269                .name
270                .as_deref()
271                .unwrap_or("<unnamed>")
272                .to_string();
273            let status = pdb.status.as_ref();
274            let current_healthy = status.map(|s| s.current_healthy).unwrap_or(0);
275            let desired_healthy = status.map(|s| s.desired_healthy).unwrap_or(0);
276            let disruptions_allowed = status.map(|s| s.disruptions_allowed).unwrap_or(0);
277            let expected_pods = status.map(|s| s.expected_pods).unwrap_or(0);
278            let selector_labels: HashMap<String, String> = pdb
279                .spec
280                .as_ref()
281                .and_then(|s| s.selector.as_ref())
282                .and_then(|s| s.match_labels.clone())
283                .unwrap_or_default()
284                .into_iter()
285                .collect();
286            PodDisruptionBudgetInfo {
287                name,
288                current_healthy,
289                desired_healthy,
290                disruptions_allowed,
291                expected_pods,
292                selector_labels,
293            }
294        })
295        .collect();
296    Ok(pdbs_info)
297}
298
299pub async fn get_ingresses(ns: &str) -> Result<Vec<IngressInfo>> {
300    let client = Client::try_default().await?;
301    let lp = ListParams::default();
302    let ingresses: Api<Ingress> = Api::namespaced(client, ns);
303    let ingress_list = ingresses.list(&lp).await?;
304    let ingresses_info: Vec<IngressInfo> = ingress_list
305        .iter()
306        .map(|i| {
307            let name = i
308                .metadata
309                .name
310                .as_deref()
311                .unwrap_or("<unnamed>")
312                .to_string();
313            let class_name = i
314                .spec
315                .as_ref()
316                .and_then(|s| s.ingress_class_name.as_ref())
317                .cloned();
318            let mut hosts = Vec::new();
319            let mut paths = Vec::new();
320            if let Some(spec) = &i.spec {
321                if let Some(rules) = &spec.rules {
322                    for rule in rules {
323                        if let Some(host) = &rule.host {
324                            hosts.push(host.clone());
325                        }
326                        if let Some(http) = &rule.http {
327                            for path in &http.paths {
328                                paths.push(format!(
329                                    "{} ({})",
330                                    path.path.as_deref().unwrap_or("/"),
331                                    path.path_type.as_str()
332                                ));
333                            }
334                        }
335                    }
336                }
337            }
338            IngressInfo {
339                name,
340                hosts,
341                paths,
342                class_name,
343            }
344        })
345        .collect();
346    Ok(ingresses_info)
347}
348
349pub async fn get_pod_logs(
350    ns: &str,
351    pod_name: &str,
352    container: Option<&str>,
353    tail_lines: i64,
354) -> Result<String> {
355    let client = Client::try_default().await?;
356    let pods: Api<Pod> = Api::namespaced(client, ns);
357
358    let mut log_params = LogParams {
359        tail_lines: Some(tail_lines),
360        ..Default::default()
361    };
362    if let Some(container_name) = container {
363        log_params.container = Some(container_name.to_string());
364    }
365    log_params.limit_bytes = Some(10_000_000);
366    log_params.since_seconds = Some(3600);
367
368    let logs = pods.logs(pod_name, &log_params).await?;
369    Ok(logs)
370}