1use std::{collections::HashMap, sync::Arc};
92
93use log::{debug, info, warn};
94use serde::{Deserialize, Serialize};
95use tokio::sync::RwLock;
96
97use crate::{AirError, Result, Utility};
98
99#[derive(Debug)]
101pub struct HealthCheckManager {
102 ServiceHealth:Arc<RwLock<HashMap<String, ServiceHealth>>>,
104 HealthHistory:Arc<RwLock<Vec<HealthCheckRecord>>>,
106 RecoveryActions:Arc<RwLock<HashMap<String, RecoveryAction>>>,
108 config:HealthCheckConfig,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct ServiceHealth {
115 pub ServiceName:String,
117 pub Status:HealthStatus,
119 pub LastCheck:u64,
121 pub LastSuccess:Option<u64>,
123 pub FailureCount:u32,
125 pub ErrorMessage:Option<String>,
127 pub ResponseTimeMs:Option<u64>,
129 pub CheckLevel:HealthCheckLevel,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum HealthStatus {
136 Healthy,
138 Degraded,
140 Unhealthy,
142 Unknown,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub enum HealthCheckLevel {
149 Alive,
151 Responsive,
153 Functional,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct HealthCheckRecord {
160 pub Timestamp:u64,
162 pub ServiceName:String,
164 pub Status:HealthStatus,
166 pub ResponseTimeMs:Option<u64>,
168 pub ErrorMessage:Option<String>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct RecoveryAction {
175 pub Name:String,
177 pub ServiceName:String,
179 pub Trigger:RecoveryTrigger,
181 pub Action:RecoveryActionType,
183 pub MaxRetries:u32,
185 pub RetryCount:u32,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub enum RecoveryTrigger {
192 ConsecutiveFailures(u32),
194 ResponseTimeExceeds(u64),
196 ServiceUnresponsive,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum RecoveryActionType {
203 RestartService,
205 ResetConnection,
207 ClearCache,
209 ReloadConfiguration,
211 Escalate,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct HealthCheckConfig {
218 pub DefaultCheckInterval:u64,
220 pub HistoryRetention:usize,
222 pub ConsecutiveFailuresThreshold:u32,
224 pub ResponseTimeThresholdMs:u64,
226 pub EnableAutoRecovery:bool,
228 pub RecoveryTimeoutSec:u64,
230}
231
232impl Default for HealthCheckConfig {
233 fn default() -> Self {
234 Self {
235 DefaultCheckInterval:30,
236 HistoryRetention:100,
237 ConsecutiveFailuresThreshold:3,
238 ResponseTimeThresholdMs:5000,
239 EnableAutoRecovery:true,
240 RecoveryTimeoutSec:60,
241 }
242 }
243}
244
245impl HealthCheckManager {
246 pub fn new(config:Option<HealthCheckConfig>) -> Self {
248 Self {
249 ServiceHealth:Arc::new(RwLock::new(HashMap::new())),
250 HealthHistory:Arc::new(RwLock::new(Vec::new())),
251 RecoveryActions:Arc::new(RwLock::new(HashMap::new())),
252 config:config.unwrap_or_default(),
253 }
254 }
255
256 pub async fn RegisterService(&self, ServiceName:String, CheckLevel:HealthCheckLevel) -> Result<()> {
258 let mut HealthMap = self.ServiceHealth.write().await;
259
260 HealthMap.insert(
261 ServiceName.clone(),
262 ServiceHealth {
263 ServiceName:ServiceName.clone(),
264 Status:HealthStatus::Unknown,
265 LastCheck:0,
266 LastSuccess:None,
267 FailureCount:0,
268 ErrorMessage:None,
269 ResponseTimeMs:None,
270 CheckLevel:CheckLevel.clone(),
271 },
272 );
273
274 info!(
275 "[HealthCheck] Registered service for monitoring: {} ({:?})",
276 ServiceName, CheckLevel
277 );
278 Ok(())
279 }
280
281 pub async fn CheckService(&self, ServiceName:&str) -> Result<HealthStatus> {
283 let StartTime = Utility::CurrentTimestamp();
284
285 let CheckTimeout = tokio::time::Duration::from_secs(10);
287
288 let (status, ErrorMessage) = tokio::time::timeout(CheckTimeout, async {
289 match ServiceName {
290 "authentication" => self.CheckAuthenticationService().await,
291 "updates" => self.CheckUpdatesService().await,
292 "downloader" => self.CheckDownloaderService().await,
293 "indexing" => self.CheckIndexingService().await,
294 "grpc" => self.CheckGrpcService().await,
295 "connections" => self.CheckConnectionsService().await,
296 _ => {
297 warn!("[HealthCheck] Unknown service: {}", ServiceName);
298 return (HealthStatus::Unhealthy, Some(format!("Unknown service: {}", ServiceName)));
299 },
300 }
301 })
302 .await
303 .map_err(|_| {
304 warn!("[HealthCheck] Timeout checking service: {}", ServiceName);
305 (
306 HealthStatus::Unhealthy,
307 Some(format!("Health check timeout for service: {}", ServiceName)),
308 )
309 })?;
310
311 let ResponseTime = Utility::CurrentTimestamp() - StartTime;
312
313 self.UpdateServiceHealth(ServiceName, status.clone(), &ErrorMessage, ResponseTime)
315 .await?;
316
317 self.RecordHealthCheck(ServiceName, status.clone(), ResponseTime, &ErrorMessage)
319 .await;
320
321 if self.config.EnableAutoRecovery {
323 self.TriggerRecoveryIfNeeded(ServiceName).await;
324 }
325
326 self.HandleCriticalAlerts(ServiceName, &status).await;
328
329 Ok(status)
330 }
331
332 async fn CheckAuthenticationService(&self) -> (HealthStatus, Option<String>) {
334 debug!("[HealthCheck] Checking authentication service health");
335
336 let start = std::time::Instant::now();
341
342 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
352
353 let elapsed = start.elapsed();
354
355 if elapsed.as_millis() > 1000 {
357 return (
358 HealthStatus::Degraded,
359 Some(format!(
360 "Authentication service response time too slow: {}ms",
361 elapsed.as_millis()
362 )),
363 );
364 }
365
366 debug!("[HealthCheck] Authentication service healthy");
367 (HealthStatus::Healthy, None)
368 }
369
370 async fn CheckUpdatesService(&self) -> (HealthStatus, Option<String>) {
372 debug!("[HealthCheck] Checking updates service health");
373
374 let start = std::time::Instant::now();
375
376 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
386
387 let elapsed = start.elapsed();
388
389 if elapsed.as_millis() > 500 {
391 return (
392 HealthStatus::Degraded,
393 Some(format!("Updates service response time too slow: {}ms", elapsed.as_millis())),
394 );
395 }
396
397 debug!("[HealthCheck] Updates service healthy");
398 (HealthStatus::Healthy, None)
399 }
400
401 async fn CheckDownloaderService(&self) -> (HealthStatus, Option<String>) {
403 debug!("[HealthCheck] Checking downloader service health");
404
405 let start = std::time::Instant::now();
406
407 tokio::time::sleep(tokio::time::Duration::from_millis(40)).await;
418
419 let elapsed = start.elapsed();
420
421 if elapsed.as_millis() > 1000 {
423 return (
424 HealthStatus::Degraded,
425 Some(format!("Downloader service response time too slow: {}ms", elapsed.as_millis())),
426 );
427 }
428
429 debug!("[HealthCheck] Downloader service healthy");
430 (HealthStatus::Healthy, None)
431 }
432
433 async fn CheckIndexingService(&self) -> (HealthStatus, Option<String>) {
435 debug!("[HealthCheck] Checking indexing service health");
436
437 let start = std::time::Instant::now();
438
439 tokio::time::sleep(tokio::time::Duration::from_millis(60)).await;
450
451 let elapsed = start.elapsed();
452
453 if elapsed.as_millis() > 500 {
455 return (
456 HealthStatus::Degraded,
457 Some(format!("Indexing service response time too slow: {}ms", elapsed.as_millis())),
458 );
459 }
460
461 debug!("[HealthCheck] Indexing service healthy");
462 (HealthStatus::Healthy, None)
463 }
464
465 async fn CheckGrpcService(&self) -> (HealthStatus, Option<String>) {
467 debug!("[HealthCheck] Checking gRPC service health");
468
469 let start = std::time::Instant::now();
470
471 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
482
483 let elapsed = start.elapsed();
484
485 if elapsed.as_millis() > 200 {
487 return (
488 HealthStatus::Degraded,
489 Some(format!("gRPC service response time too slow: {}ms", elapsed.as_millis())),
490 );
491 }
492
493 debug!("[HealthCheck] gRPC service healthy");
494 (HealthStatus::Healthy, None)
495 }
496
497 async fn CheckConnectionsService(&self) -> (HealthStatus, Option<String>) {
499 debug!("[HealthCheck] Checking connections service health");
500
501 let start = std::time::Instant::now();
502
503 tokio::time::sleep(tokio::time::Duration::from_millis(35)).await;
514
515 let elapsed = start.elapsed();
516
517 if elapsed.as_millis() > 300 {
519 return (
520 HealthStatus::Degraded,
521 Some(format!("Connections service response time too slow: {}ms", elapsed.as_millis())),
522 );
523 }
524
525 debug!("[HealthCheck] Connections service healthy");
526 (HealthStatus::Healthy, None)
527 }
528
529 async fn UpdateServiceHealth(
531 &self,
532 ServiceName:&str,
533 status:HealthStatus,
534 ErrorMessage:&Option<String>,
535 ResponseTime:u64,
536 ) -> Result<()> {
537 let mut HealthMap = self.ServiceHealth.write().await;
538
539 if let Some(ServiceHealth) = HealthMap.get_mut(ServiceName) {
540 ServiceHealth.Status = status.clone();
541 ServiceHealth.LastCheck = Utility::CurrentTimestamp();
542 ServiceHealth.ResponseTimeMs = Some(ResponseTime);
543
544 match status {
545 HealthStatus::Healthy => {
546 ServiceHealth.LastSuccess = Some(Utility::CurrentTimestamp());
547 ServiceHealth.FailureCount = 0;
548 ServiceHealth.ErrorMessage = None;
549 },
550 HealthStatus::Degraded | HealthStatus::Unhealthy => {
551 ServiceHealth.FailureCount += 1;
552 ServiceHealth.ErrorMessage = ErrorMessage.clone();
553 },
554 HealthStatus::Unknown => {
555 },
557 }
558 } else {
559 return Err(AirError::Internal(format!("Service not registered: {}", ServiceName)));
560 }
561
562 debug!(
563 "[HealthCheck] Updated health for {}: {:?} ({}ms)",
564 ServiceName, status, ResponseTime
565 );
566 Ok(())
567 }
568
569 async fn RecordHealthCheck(
571 &self,
572 ServiceName:&str,
573 status:HealthStatus,
574 ResponseTime:u64,
575 ErrorMessage:&Option<String>,
576 ) {
577 let mut history = self.HealthHistory.write().await;
578
579 let record = HealthCheckRecord {
580 Timestamp:Utility::CurrentTimestamp(),
581 ServiceName:ServiceName.to_string(),
582 Status:status,
583 ResponseTimeMs:Some(ResponseTime),
584 ErrorMessage:ErrorMessage.clone(),
585 };
586
587 history.push(record);
588
589 if history.len() > self.config.HistoryRetention {
591 history.remove(0);
592 }
593 }
594
595 async fn TriggerRecoveryIfNeeded(&self, ServiceName:&str) {
597 let HealthMap = self.ServiceHealth.read().await;
598
599 if let Some(ServiceHealth) = HealthMap.get(ServiceName) {
600 if ServiceHealth.FailureCount >= self.config.ConsecutiveFailuresThreshold {
602 warn!(
603 "[HealthCheck] Service {} has {} consecutive failures, triggering recovery",
604 ServiceName, ServiceHealth.FailureCount
605 );
606
607 self.PerformRecoveryAction(ServiceName).await;
608 }
609
610 if let Some(ResponseTime) = ServiceHealth.ResponseTimeMs {
612 if ResponseTime > self.config.ResponseTimeThresholdMs {
613 warn!(
614 "[HealthCheck] Service {} response time {}ms exceeds threshold {}ms",
615 ServiceName, ResponseTime, self.config.ResponseTimeThresholdMs
616 );
617
618 self.HandleResponseTimeRecovery(ServiceName, ResponseTime).await;
619 }
620 }
621 }
622 }
623
624 async fn HandleResponseTimeRecovery(&self, ServiceName:&str, ResponseTime:u64) {
626 info!(
627 "[HealthCheck] Handling response time recovery for {}: {}ms",
628 ServiceName, ResponseTime
629 );
630
631 match ServiceName {
632 "grpc" => {
633 warn!(
634 "[HealthCheck] Response time recovery: Optimizing gRPC server for {}",
635 ServiceName
636 );
637 },
642 "connections" => {
643 warn!(
644 "[HealthCheck] Response time recovery: Optimizing connections for {}",
645 ServiceName
646 );
647 },
652 _ => {
653 warn!("[HealthCheck] Response time recovery: Generic optimization for {}", ServiceName);
654 },
655 }
656 }
657
658 async fn HandleCriticalAlerts(&self, ServiceName:&str, status:&HealthStatus) {
660 if *status == HealthStatus::Unhealthy {
661 warn!(
662 "[HealthCheck] CRITICAL: Service {} is UNHEALTHY - immediate attention required",
663 ServiceName
664 );
665
666 }
672 }
673
674 async fn PerformRecoveryAction(&self, ServiceName:&str) {
676 info!("[HealthCheck] Performing recovery action for {}", ServiceName);
677
678 let RecoveryTimeout = tokio::time::Duration::from_secs(self.config.RecoveryTimeoutSec);
679
680 let result = tokio::time::timeout(RecoveryTimeout, async {
681 match ServiceName {
682 "authentication" => self.RestartAuthenticationService().await,
683 "updates" => self.RestartUpdatesService().await,
684 "downloader" => self.RestartDownloaderService().await,
685 "indexing" => self.RestartIndexingService().await,
686 "grpc" => self.RestartGrpcService().await,
687 "connections" => self.ResetConnectionsService().await,
688 _ => {
689 warn!("[HealthCheck] No specific recovery action for {}", ServiceName);
690 Ok(())
691 },
692 }
693 })
694 .await;
695
696 match result {
697 Ok(Ok(())) => {
698 info!("[HealthCheck] Recovery action completed successfully for {}", ServiceName);
699 },
700 Ok(Err(e)) => {
701 warn!("[HealthCheck] Recovery action failed for {}: {:?}", ServiceName, e);
702 },
703 Err(_) => {
704 warn!("[HealthCheck] Recovery action timed out for {}", ServiceName);
705 },
706 }
707 }
708
709 async fn RestartAuthenticationService(&self) -> Result<()> {
711 warn!("[HealthCheck] Recovery: Restarting authentication service");
712 Ok(())
714 }
715
716 async fn RestartUpdatesService(&self) -> Result<()> {
718 warn!("[HealthCheck] Recovery: Restarting updates service");
719 Ok(())
721 }
722
723 async fn RestartDownloaderService(&self) -> Result<()> {
725 warn!("[HealthCheck] Recovery: Restarting downloader service");
726 Ok(())
728 }
729
730 async fn RestartIndexingService(&self) -> Result<()> {
732 warn!("[HealthCheck] Recovery: Restarting indexing service");
733 Ok(())
735 }
736
737 async fn RestartGrpcService(&self) -> Result<()> {
739 warn!("[HealthCheck] Recovery: Restarting gRPC server");
740 Ok(())
742 }
743
744 async fn ResetConnectionsService(&self) -> Result<()> {
746 warn!("[HealthCheck] Recovery: Resetting connections service");
747 Ok(())
749 }
750
751 pub async fn GetOverallHealth(&self) -> HealthStatus {
753 let HealthMap = self.ServiceHealth.read().await;
754
755 let mut HealthyCount = 0;
756 let mut DegradedCount = 0;
757 let mut UnhealthyCount = 0;
758
759 for ServiceHealth in HealthMap.values() {
760 match ServiceHealth.Status {
761 HealthStatus::Healthy => HealthyCount += 1,
762 HealthStatus::Degraded => DegradedCount += 1,
763 HealthStatus::Unhealthy => UnhealthyCount += 1,
764 HealthStatus::Unknown => {},
765 }
766 }
767
768 if UnhealthyCount > 0 {
769 HealthStatus::Unhealthy
770 } else if DegradedCount > 0 {
771 HealthStatus::Degraded
772 } else if HealthyCount > 0 {
773 HealthStatus::Healthy
774 } else {
775 HealthStatus::Unknown
776 }
777 }
778
779 pub async fn GetServiceHealth(&self, service_name:&str) -> Option<ServiceHealth> {
781 let HealthMap = self.ServiceHealth.read().await;
782 HealthMap.get(service_name).cloned()
783 }
784
785 pub async fn GetHealthHistory(&self, service_name:Option<&str>, limit:Option<usize>) -> Vec<HealthCheckRecord> {
787 let History = self.HealthHistory.read().await;
788
789 let mut FilteredHistory:Vec<HealthCheckRecord> = if let Some(service) = service_name {
790 History.iter().filter(|Record| Record.ServiceName == service).cloned().collect()
791 } else {
792 History.clone()
793 };
794
795 FilteredHistory.reverse();
797
798 if let Some(limit) = limit {
800 FilteredHistory.truncate(limit);
801 }
802
803 FilteredHistory
804 }
805
806 pub async fn RegisterRecoveryAction(&self, action:RecoveryAction) -> Result<()> {
808 let mut actions = self.RecoveryActions.write().await;
809 actions.insert(action.Name.clone(), action);
810 Ok(())
811 }
812
813 pub async fn GetHealthStatistics(&self) -> HealthStatistics {
815 let HealthMap = self.ServiceHealth.read().await;
816 let history = self.HealthHistory.read().await;
817 let mut HealthyServices = 0;
819 let mut DegradedServices = 0;
820 let mut UnhealthyServices = 0;
821
822 for ServiceHealth in HealthMap.values() {
823 match ServiceHealth.Status {
824 HealthStatus::Healthy => HealthyServices += 1,
825 HealthStatus::Degraded => DegradedServices += 1,
826 HealthStatus::Unhealthy => UnhealthyServices += 1,
827 HealthStatus::Unknown => {},
828 }
829 }
830
831 let mut Statistics = HealthStatistics {
833 TotalServices:HealthMap.len(),
834 HealthyServices,
835 DegradedServices,
836 UnhealthyServices,
837 TotalChecks:history.len(),
838 AverageResponseTimeMs:0.0,
839 SuccessRate:0.0,
840 };
841
842 if !history.is_empty() {
844 let mut TotalResponseTime = 0;
845 let mut SuccessfulChecks = 0;
846
847 for Record in history.iter() {
848 if let Some(ResponseTime) = Record.ResponseTimeMs {
849 TotalResponseTime += ResponseTime;
850 }
851
852 if Record.Status == HealthStatus::Healthy {
853 SuccessfulChecks += 1;
854 }
855 }
856
857 Statistics.AverageResponseTimeMs = TotalResponseTime as f64 / history.len() as f64;
858 Statistics.SuccessRate = SuccessfulChecks as f64 / history.len() as f64 * 100.0;
859 }
860
861 Statistics
862 }
863}
864
865#[derive(Debug, Clone, Serialize, Deserialize)]
867pub struct HealthStatistics {
868 pub TotalServices:usize,
869 pub HealthyServices:usize,
870 pub DegradedServices:usize,
871 pub UnhealthyServices:usize,
872 pub TotalChecks:usize,
873 pub AverageResponseTimeMs:f64,
874 pub SuccessRate:f64,
875}
876
877impl HealthStatistics {
878 pub fn OverallHealthPercentage(&self) -> f64 {
880 if self.TotalServices == 0 {
881 return 0.0;
882 }
883
884 (self.HealthyServices as f64 / self.TotalServices as f64) * 100.0
885 }
886}
887
888#[derive(Debug, Clone, Serialize, Deserialize)]
890pub struct HealthCheckResponse {
891 pub OverallStatus:HealthStatus,
892 pub ServiceHealth:HashMap<String, ServiceHealth>,
893 pub Statistics:HealthStatistics,
894 pub PerformanceIndicators:PerformanceIndicators,
895 pub ResourceWarnings:Vec<ResourceWarning>,
896 pub Timestamp:u64,
897}
898
899impl HealthCheckResponse {
900 pub fn new(
902 OverallStatus:HealthStatus,
903 ServiceHealth:HashMap<String, ServiceHealth>,
904 Statistics:HealthStatistics,
905 ) -> Self {
906 Self {
907 OverallStatus,
908 ServiceHealth,
909 Statistics,
910 PerformanceIndicators:PerformanceIndicators::default(),
911 ResourceWarnings:Vec::new(),
912 Timestamp:Utility::CurrentTimestamp(),
913 }
914 }
915
916 pub fn with_performance_indicators(mut self, indicators:PerformanceIndicators) -> Self {
918 self.PerformanceIndicators = indicators;
919 self
920 }
921
922 pub fn with_resource_warnings(mut self, warnings:Vec<ResourceWarning>) -> Self {
924 self.ResourceWarnings = warnings;
925 self
926 }
927}
928
929#[derive(Debug, Clone, Serialize, Deserialize)]
931pub struct PerformanceIndicators {
932 pub ResponseTimeP99Ms:f64,
933 pub ResponseTimeP95Ms:f64,
934 pub RequestThroughputPerSec:f64,
935 pub ErrorRatePercent:f64,
936 pub DegradationLevel:DegradationLevel,
937 pub BottleneckService:Option<String>,
938}
939
940impl Default for PerformanceIndicators {
941 fn default() -> Self {
942 Self {
943 ResponseTimeP99Ms:0.0,
944 ResponseTimeP95Ms:0.0,
945 RequestThroughputPerSec:0.0,
946 ErrorRatePercent:0.0,
947 DegradationLevel:DegradationLevel::Optimal,
948 BottleneckService:None,
949 }
950 }
951}
952
953#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
955pub enum DegradationLevel {
956 Optimal,
957 Acceptable,
958 Degraded,
959 Critical,
960}
961
962#[derive(Debug, Clone, Serialize, Deserialize)]
964pub struct ResourceWarning {
965 pub WarningType:ResourceWarningType,
966 pub ServiceName:Option<String>,
967 pub CurrentValue:f64,
968 pub Threshold:f64,
969 pub Severity:WarningSeverity,
970 pub Timestamp:u64,
971}
972
973#[derive(Debug, Clone, Serialize, Deserialize)]
975pub enum ResourceWarningType {
976 HighMemoryUsage,
977 HighCPUUsage,
978 LowDiskSpace,
979 ConnectionPoolExhausted,
980 ThreadPoolExhausted,
981 HighLatency,
982 HighErrorRate,
983 DatabaseConnectivityIssue,
984}
985
986#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
988pub enum WarningSeverity {
989 Low,
990 Medium,
991 High,
992 Critical,
993}