headless_lms_server/domain/system_health/
kubernetes.rs1use super::{
4 CronJobInfo, DeploymentInfo, EventInfo, IngressInfo, JobInfo, PodDisruptionBudgetInfo, PodInfo,
5 ServiceInfo, ServicePortInfo,
6};
7use anyhow::Result;
8use k8s_openapi::api::{
9 apps::v1::Deployment,
10 batch::v1::{CronJob, Job},
11 core::v1::{Event, Pod, Service},
12 networking::v1::Ingress,
13 policy::v1::PodDisruptionBudget,
14};
15use kube::{
16 Api, Client,
17 api::{ListParams, LogParams},
18};
19use std::collections::HashMap;
20
21pub async fn get_pods(ns: &str) -> Result<Vec<PodInfo>> {
22 let client = Client::try_default().await?;
23 let lp = ListParams::default();
24 let pods: Api<Pod> = Api::namespaced(client, ns);
25 let pod_list = pods.list(&lp).await?;
26 let pods_info: Vec<PodInfo> = pod_list
27 .iter()
28 .map(|p| {
29 let name = p
30 .metadata
31 .name
32 .as_deref()
33 .unwrap_or("<unnamed>")
34 .to_string();
35 let phase = p
36 .status
37 .as_ref()
38 .and_then(|s| s.phase.as_deref())
39 .unwrap_or("Unknown")
40 .to_string();
41 let ready = p.status.as_ref().and_then(|s| {
42 s.conditions.as_ref().and_then(|conditions| {
43 conditions
44 .iter()
45 .find(|c| c.type_ == "Ready")
46 .map(|c| c.status == "True")
47 })
48 });
49 let labels: HashMap<String, String> = p
50 .metadata
51 .labels
52 .clone()
53 .unwrap_or_default()
54 .into_iter()
55 .collect();
56 PodInfo {
57 name,
58 phase,
59 ready,
60 labels,
61 }
62 })
63 .collect();
64 Ok(pods_info)
65}
66
67pub async fn get_deployments(ns: &str) -> Result<Vec<DeploymentInfo>> {
68 let client = Client::try_default().await?;
69 let lp = ListParams::default();
70 let deployments: Api<Deployment> = Api::namespaced(client, ns);
71 let deploy_list = deployments.list(&lp).await?;
72 let deployments_info: Vec<DeploymentInfo> = deploy_list
73 .iter()
74 .map(|d| {
75 let name = d
76 .metadata
77 .name
78 .as_deref()
79 .unwrap_or("<unnamed>")
80 .to_string();
81 let replicas = d.status.as_ref().and_then(|s| s.replicas).unwrap_or(0);
82 let ready_replicas = d
83 .status
84 .as_ref()
85 .and_then(|s| s.ready_replicas)
86 .unwrap_or(0);
87 let selector_labels: HashMap<String, String> = d
88 .spec
89 .as_ref()
90 .and_then(|s| s.selector.match_labels.clone())
91 .unwrap_or_default()
92 .into_iter()
93 .collect();
94 DeploymentInfo {
95 name,
96 replicas,
97 ready_replicas,
98 selector_labels,
99 }
100 })
101 .collect();
102 Ok(deployments_info)
103}
104
105pub async fn get_cronjobs(ns: &str) -> Result<Vec<CronJobInfo>> {
106 let client = Client::try_default().await?;
107 let lp = ListParams::default();
108 let cronjobs: Api<CronJob> = Api::namespaced(client, ns);
109 let cron_list = cronjobs.list(&lp).await?;
110 let cronjobs_info: Vec<CronJobInfo> = cron_list
111 .iter()
112 .map(|cj| {
113 let name = cj
114 .metadata
115 .name
116 .as_deref()
117 .unwrap_or("<unnamed>")
118 .to_string();
119 let schedule = cj
120 .spec
121 .as_ref()
122 .map(|s| s.schedule.clone())
123 .unwrap_or_else(|| "<no schedule>".to_string());
124 let last_schedule_time = cj
125 .status
126 .as_ref()
127 .and_then(|s| s.last_schedule_time.as_ref())
128 .map(|t| t.0.to_string());
129 CronJobInfo {
130 name,
131 schedule,
132 last_schedule_time,
133 }
134 })
135 .collect();
136 Ok(cronjobs_info)
137}
138
139pub async fn get_jobs(ns: &str) -> Result<Vec<JobInfo>> {
140 let client = Client::try_default().await?;
141 let lp = ListParams::default();
142 let jobs: Api<Job> = Api::namespaced(client, ns);
143 let job_list = jobs.list(&lp).await?;
144 let jobs_info: Vec<JobInfo> = job_list
145 .iter()
146 .map(|j| {
147 let name = j
148 .metadata
149 .name
150 .as_deref()
151 .unwrap_or("<unnamed>")
152 .to_string();
153 let succeeded = j.status.as_ref().and_then(|s| s.succeeded);
154 let failed = j.status.as_ref().and_then(|s| s.failed);
155 let active = j.status.as_ref().and_then(|s| s.active);
156 JobInfo {
157 name,
158 succeeded,
159 failed,
160 active,
161 }
162 })
163 .collect();
164 Ok(jobs_info)
165}
166
167pub async fn get_services(ns: &str) -> Result<Vec<ServiceInfo>> {
168 let client = Client::try_default().await?;
169 let lp = ListParams::default();
170 let services: Api<Service> = Api::namespaced(client, ns);
171 let service_list = services.list(&lp).await?;
172 let services_info: Vec<ServiceInfo> = service_list
173 .iter()
174 .map(|s| {
175 let name = s
176 .metadata
177 .name
178 .as_deref()
179 .unwrap_or("<unnamed>")
180 .to_string();
181 let cluster_ip = s.spec.as_ref().and_then(|spec| spec.cluster_ip.clone());
182 let ports: Vec<ServicePortInfo> = s
183 .spec
184 .as_ref()
185 .and_then(|spec| spec.ports.as_ref())
186 .map(|ports| {
187 ports
188 .iter()
189 .map(|p| ServicePortInfo {
190 name: p.name.clone(),
191 port: p.port,
192 target_port: p.target_port.as_ref().map(|tp| match tp {
193 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(i) => {
194 i.to_string()
195 }
196 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::String(s) => {
197 s.clone()
198 }
199 }),
200 protocol: p.protocol.clone(),
201 })
202 .collect()
203 })
204 .unwrap_or_default();
205 ServiceInfo {
206 name,
207 cluster_ip,
208 ports,
209 }
210 })
211 .collect();
212 Ok(services_info)
213}
214
215pub async fn get_events(ns: &str) -> Result<Vec<EventInfo>> {
216 let client = Client::try_default().await?;
217 let lp = ListParams::default();
218 let events: Api<Event> = Api::namespaced(client, ns);
219 let event_list = events.list(&lp).await?;
220 let events_info: Vec<EventInfo> = event_list
221 .iter()
222 .map(|e| {
223 let name = e
224 .metadata
225 .name
226 .as_deref()
227 .unwrap_or("<unnamed>")
228 .to_string();
229 let reason = e.reason.clone();
230 let message = e.message.clone();
231 let type_ = e.type_.clone();
232 let first_timestamp = e.first_timestamp.as_ref().map(|t| t.0.to_string());
233 let last_timestamp = e.last_timestamp.as_ref().map(|t| t.0.to_string());
234 let count = e.count;
235 let involved_object_kind = e.involved_object.kind.clone();
236 let involved_object_name = e.involved_object.name.clone();
237 EventInfo {
238 name,
239 reason,
240 message,
241 type_,
242 first_timestamp,
243 last_timestamp,
244 count,
245 involved_object_kind,
246 involved_object_name,
247 }
248 })
249 .collect();
250 Ok(events_info)
251}
252
253pub async fn get_pod_disruption_budgets(ns: &str) -> Result<Vec<PodDisruptionBudgetInfo>> {
254 let client = Client::try_default().await?;
255 let lp = ListParams::default();
256 let pdbs: Api<PodDisruptionBudget> = Api::namespaced(client, ns);
257 let pdb_list = pdbs.list(&lp).await?;
258 let pdbs_info: Vec<PodDisruptionBudgetInfo> = pdb_list
259 .iter()
260 .map(|pdb| {
261 let name = pdb
262 .metadata
263 .name
264 .as_deref()
265 .unwrap_or("<unnamed>")
266 .to_string();
267 let status = pdb.status.as_ref();
268 let current_healthy = status.map(|s| s.current_healthy).unwrap_or(0);
269 let desired_healthy = status.map(|s| s.desired_healthy).unwrap_or(0);
270 let disruptions_allowed = status.map(|s| s.disruptions_allowed).unwrap_or(0);
271 let expected_pods = status.map(|s| s.expected_pods).unwrap_or(0);
272 let selector_labels: HashMap<String, String> = pdb
273 .spec
274 .as_ref()
275 .and_then(|s| s.selector.as_ref())
276 .and_then(|s| s.match_labels.clone())
277 .unwrap_or_default()
278 .into_iter()
279 .collect();
280 PodDisruptionBudgetInfo {
281 name,
282 current_healthy,
283 desired_healthy,
284 disruptions_allowed,
285 expected_pods,
286 selector_labels,
287 }
288 })
289 .collect();
290 Ok(pdbs_info)
291}
292
293pub async fn get_ingresses(ns: &str) -> Result<Vec<IngressInfo>> {
294 let client = Client::try_default().await?;
295 let lp = ListParams::default();
296 let ingresses: Api<Ingress> = Api::namespaced(client, ns);
297 let ingress_list = ingresses.list(&lp).await?;
298 let ingresses_info: Vec<IngressInfo> = ingress_list
299 .iter()
300 .map(|i| {
301 let name = i
302 .metadata
303 .name
304 .as_deref()
305 .unwrap_or("<unnamed>")
306 .to_string();
307 let class_name = i
308 .spec
309 .as_ref()
310 .and_then(|s| s.ingress_class_name.as_ref())
311 .cloned();
312 let mut hosts = Vec::new();
313 let mut paths = Vec::new();
314 if let Some(spec) = &i.spec
315 && let Some(rules) = &spec.rules
316 {
317 for rule in rules {
318 if let Some(host) = &rule.host {
319 hosts.push(host.clone());
320 }
321 if let Some(http) = &rule.http {
322 for path in &http.paths {
323 paths.push(format!(
324 "{} ({})",
325 path.path.as_deref().unwrap_or("/"),
326 path.path_type.as_str()
327 ));
328 }
329 }
330 }
331 }
332 IngressInfo {
333 name,
334 hosts,
335 paths,
336 class_name,
337 }
338 })
339 .collect();
340 Ok(ingresses_info)
341}
342
343pub async fn get_pod_logs(
344 ns: &str,
345 pod_name: &str,
346 container: Option<&str>,
347 tail_lines: i64,
348) -> Result<String> {
349 let client = Client::try_default().await?;
350 let pods: Api<Pod> = Api::namespaced(client, ns);
351
352 let mut log_params = LogParams {
353 tail_lines: Some(tail_lines),
354 ..Default::default()
355 };
356 if let Some(container_name) = container {
357 log_params.container = Some(container_name.to_string());
358 }
359 log_params.limit_bytes = Some(10_000_000);
360 log_params.since_seconds = Some(3600);
361
362 let logs = pods.logs(pod_name, &log_params).await?;
363 Ok(logs)
364}