1use std::{
80 path::{Path, PathBuf},
81 sync::Arc,
82 time::{Duration, Instant},
83};
84
85use serde::{Deserialize, Serialize};
86use tokio::{
87 fs,
88 sync::{RwLock, broadcast, mpsc},
89 time::sleep,
90};
91use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Result as NotifyResult, Watcher};
92use chrono::{DateTime, Utc};
93use log::{debug, error, info, trace, warn};
94
95use crate::{AirError, Configuration::AirConfiguration, Result};
96
97pub struct ConfigHotReload {
103 active_config:Arc<RwLock<AirConfiguration>>,
105
106 previous_config:Arc<RwLock<Option<AirConfiguration>>>,
108
109 last_config_hash:Arc<RwLock<Option<String>>>,
111
112 config_path:PathBuf,
114
115 watcher:Option<Arc<RwLock<notify::RecommendedWatcher>>>,
117
118 change_sender:broadcast::Sender<ConfigChangeEvent>,
120
121 reload_tx:mpsc::Sender<ReloadRequest>,
123
124 change_history:Arc<RwLock<Vec<ConfigChangeRecord>>>,
126
127 last_reload:Arc<RwLock<Option<DateTime<Utc>>>>,
129
130 last_reload_duration:Arc<RwLock<Option<Duration>>>,
132
133 enabled:Arc<RwLock<bool>>,
135
136 debounce_delay:Duration,
138
139 last_change_time:Arc<RwLock<Option<Instant>>>,
141
142 stats:Arc<RwLock<ReloadStats>>,
144
145 validators:Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
147
148 max_retries:u32,
150
151 retry_delay:Duration,
153
154 auto_rollback_enabled:Arc<RwLock<bool>>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct ConfigChangeEvent {
161 pub timestamp:DateTime<Utc>,
162 pub old_config_hash:Option<String>,
163 pub new_config_hash:String,
164 pub changes:Vec<ConfigChange>,
165 pub success:bool,
166}
167
168pub enum ReloadRequest {
170 Manual,
172 Signal,
174 FileChange,
176 Periodic,
178}
179
180#[derive(Debug, Clone, Default)]
182struct ReloadStats {
183 total_attempts:u64,
184 successful_reloads:u64,
185 failed_reloads:u64,
186 validation_errors:u64,
187 parse_errors:u64,
188 rollback_attempts:u64,
189 last_error:Option<String>,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ConfigChangeRecord {
195 pub timestamp:DateTime<Utc>,
196 pub changes:Vec<ConfigChange>,
197 pub validated:bool,
198 pub reason:String,
199 pub rollback_performed:bool,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ConfigChange {
205 pub path:String,
206 pub old_value:serde_json::Value,
207 pub new_value:serde_json::Value,
208}
209
210pub trait ConfigValidator: Send + Sync {
212 fn validate(&self, config:&AirConfiguration) -> Result<()>;
214
215 fn name(&self) -> &str;
217
218 fn priority(&self) -> u32 { 0 }
220}
221
222pub struct GrpcConfigValidator;
228
229impl ConfigValidator for GrpcConfigValidator {
230 fn validate(&self, config:&AirConfiguration) -> Result<()> {
231 if config.Grpc.BindAddress.is_empty() {
232 return Err(AirError::Configuration("gRPC bind address cannot be empty".to_string()));
233 }
234
235 if !crate::Configuration::ConfigurationManager::IsValidAddress(&config.Grpc.BindAddress) {
237 return Err(AirError::Configuration(format!(
238 "Invalid gRPC bind address '{}': must be host:port or [IPv6]:port",
239 config.Grpc.BindAddress
240 )));
241 }
242
243 if config.Grpc.MaxConnections < 10 || config.Grpc.MaxConnections > 10000 {
245 return Err(AirError::Configuration(format!(
246 "gRPC MaxConnections {} is out of range [10, 10000]",
247 config.Grpc.MaxConnections
248 )));
249 }
250
251 if config.Grpc.RequestTimeoutSecs < 1 || config.Grpc.RequestTimeoutSecs > 3600 {
253 return Err(AirError::Configuration(format!(
254 "gRPC RequestTimeoutSecs {} is out of range [1, 3600]",
255 config.Grpc.RequestTimeoutSecs
256 )));
257 }
258
259 Ok(())
260 }
261
262 fn name(&self) -> &str { "GrpcConfigValidator" }
263
264 fn priority(&self) -> u32 {
265 100 }
267}
268
269pub struct AuthConfigValidator;
271
272impl ConfigValidator for AuthConfigValidator {
273 fn validate(&self, config:&AirConfiguration) -> Result<()> {
274 if config.Authentication.Enabled {
275 if config.Authentication.CredentialsPath.is_empty() {
276 return Err(AirError::Configuration(
277 "Authentication credentials path cannot be empty when enabled".to_string(),
278 ));
279 }
280
281 if config.Authentication.CredentialsPath.contains("..") {
283 return Err(AirError::Configuration(
284 "Authentication credentials path contains '..' which is not allowed".to_string(),
285 ));
286 }
287 }
288
289 if config.Authentication.TokenExpirationHours < 1 || config.Authentication.TokenExpirationHours > 8760 {
291 return Err(AirError::Configuration(format!(
292 "Token expiration {} hours is out of range [1, 8760]",
293 config.Authentication.TokenExpirationHours
294 )));
295 }
296
297 if config.Authentication.MaxSessions < 1 || config.Authentication.MaxSessions > 1000 {
299 return Err(AirError::Configuration(format!(
300 "Max sessions {} is out of range [1, 1000]",
301 config.Authentication.MaxSessions
302 )));
303 }
304
305 Ok(())
306 }
307
308 fn name(&self) -> &str { "AuthConfigValidator" }
309
310 fn priority(&self) -> u32 {
311 90 }
313}
314
315pub struct UpdateConfigValidator;
317
318impl ConfigValidator for UpdateConfigValidator {
319 fn validate(&self, config:&AirConfiguration) -> Result<()> {
320 if config.Updates.Enabled {
321 if config.Updates.UpdateServerUrl.is_empty() {
322 return Err(AirError::Configuration(
323 "Update server URL cannot be empty when updates are enabled".to_string(),
324 ));
325 }
326
327 if !config.Updates.UpdateServerUrl.starts_with("https://") {
329 return Err(AirError::Configuration(format!(
330 "Update server URL must use HTTPS: {}",
331 config.Updates.UpdateServerUrl
332 )));
333 }
334
335 if !crate::Configuration::ConfigurationManager::IsValidUrl(&config.Updates.UpdateServerUrl) {
337 return Err(AirError::Configuration(format!(
338 "Invalid update server URL: {}",
339 config.Updates.UpdateServerUrl
340 )));
341 }
342 }
343
344 if config.Updates.CheckIntervalHours < 1 || config.Updates.CheckIntervalHours > 168 {
346 return Err(AirError::Configuration(format!(
347 "Update check interval {} hours is out of range [1, 168]",
348 config.Updates.CheckIntervalHours
349 )));
350 }
351
352 Ok(())
353 }
354
355 fn name(&self) -> &str { "UpdateConfigValidator" }
356
357 fn priority(&self) -> u32 {
358 50 }
360}
361
362pub struct DownloadConfigValidator;
364
365impl ConfigValidator for DownloadConfigValidator {
366 fn validate(&self, config:&AirConfiguration) -> Result<()> {
367 if config.Downloader.Enabled {
368 if config.Downloader.CacheDirectory.is_empty() {
369 return Err(AirError::Configuration(
370 "Download cache directory cannot be empty when enabled".to_string(),
371 ));
372 }
373
374 if config.Downloader.CacheDirectory.contains("..") {
376 return Err(AirError::Configuration(
377 "Download cache directory contains '..' which is not allowed".to_string(),
378 ));
379 }
380
381 if config.Downloader.MaxConcurrentDownloads < 1 || config.Downloader.MaxConcurrentDownloads > 50 {
383 return Err(AirError::Configuration(format!(
384 "Max concurrent downloads {} is out of range [1, 50]",
385 config.Downloader.MaxConcurrentDownloads
386 )));
387 }
388
389 if config.Downloader.DownloadTimeoutSecs < 10 || config.Downloader.DownloadTimeoutSecs > 3600 {
391 return Err(AirError::Configuration(format!(
392 "Download timeout {} seconds is out of range [10, 3600]",
393 config.Downloader.DownloadTimeoutSecs
394 )));
395 }
396
397 if config.Downloader.MaxRetries > 10 {
399 return Err(AirError::Configuration(format!(
400 "Max retries {} exceeds maximum (10)",
401 config.Downloader.MaxRetries
402 )));
403 }
404 }
405
406 Ok(())
407 }
408
409 fn name(&self) -> &str { "DownloadConfigValidator" }
410
411 fn priority(&self) -> u32 {
412 50 }
414}
415
416pub struct IndexingConfigValidator;
418
419impl ConfigValidator for IndexingConfigValidator {
420 fn validate(&self, config:&AirConfiguration) -> Result<()> {
421 if config.Indexing.Enabled {
422 if config.Indexing.IndexDirectory.is_empty() {
423 return Err(AirError::Configuration(
424 "Index directory cannot be empty when indexing is enabled".to_string(),
425 ));
426 }
427
428 if config.Indexing.IndexDirectory.contains("..") {
430 return Err(AirError::Configuration(
431 "Index directory contains '..' which is not allowed".to_string(),
432 ));
433 }
434
435 if config.Indexing.FileTypes.is_empty() {
437 return Err(AirError::Configuration(
438 "File types to index cannot be empty when indexing is enabled".to_string(),
439 ));
440 }
441
442 if config.Indexing.MaxFileSizeMb < 1 || config.Indexing.MaxFileSizeMb > 1024 {
444 return Err(AirError::Configuration(format!(
445 "Max file size {} MB is out of range [1, 1024]",
446 config.Indexing.MaxFileSizeMb
447 )));
448 }
449
450 if config.Indexing.UpdateIntervalMinutes < 1 || config.Indexing.UpdateIntervalMinutes > 1440 {
452 return Err(AirError::Configuration(format!(
453 "Index update interval {} minutes is out of range [1, 1440]",
454 config.Indexing.UpdateIntervalMinutes
455 )));
456 }
457 }
458
459 Ok(())
460 }
461
462 fn name(&self) -> &str { "IndexingConfigValidator" }
463
464 fn priority(&self) -> u32 {
465 40 }
467}
468
469pub struct LoggingConfigValidator;
471
472impl ConfigValidator for LoggingConfigValidator {
473 fn validate(&self, config:&AirConfiguration) -> Result<()> {
474 let valid_levels = ["trace", "debug", "info", "warn", "error"];
475
476 if !valid_levels.contains(&config.Logging.Level.as_str()) {
477 return Err(AirError::Configuration(format!(
478 "Invalid log level '{}': must be one of: {}",
479 config.Logging.Level,
480 valid_levels.join(", ")
481 )));
482 }
483
484 if config.Logging.MaxFileSizeMb < 1 || config.Logging.MaxFileSizeMb > 1000 {
486 return Err(AirError::Configuration(format!(
487 "Max log file size {} MB is out of range [1, 1000]",
488 config.Logging.MaxFileSizeMb
489 )));
490 }
491
492 if config.Logging.MaxFiles < 1 || config.Logging.MaxFiles > 50 {
494 return Err(AirError::Configuration(format!(
495 "Max log files {} is out of range [1, 50]",
496 config.Logging.MaxFiles
497 )));
498 }
499
500 Ok(())
501 }
502
503 fn name(&self) -> &str { "LoggingConfigValidator" }
504
505 fn priority(&self) -> u32 {
506 30 }
508}
509
510pub struct PerformanceConfigValidator;
512
513impl ConfigValidator for PerformanceConfigValidator {
514 fn validate(&self, config:&AirConfiguration) -> Result<()> {
515 if config.Performance.MemoryLimitMb < 64 || config.Performance.MemoryLimitMb > 16384 {
517 return Err(AirError::Configuration(format!(
518 "Memory limit {} MB is out of range [64, 16384]",
519 config.Performance.MemoryLimitMb
520 )));
521 }
522
523 if config.Performance.CPULimitPercent < 10 || config.Performance.CPULimitPercent > 100 {
525 return Err(AirError::Configuration(format!(
526 "CPU limit {}% is out of range [10, 100]",
527 config.Performance.CPULimitPercent
528 )));
529 }
530
531 if config.Performance.DiskLimitMb < 100 || config.Performance.DiskLimitMb > 102400 {
533 return Err(AirError::Configuration(format!(
534 "Disk limit {} MB is out of range [100, 102400]",
535 config.Performance.DiskLimitMb
536 )));
537 }
538
539 if config.Performance.BackgroundTaskIntervalSecs < 1 || config.Performance.BackgroundTaskIntervalSecs > 3600 {
541 return Err(AirError::Configuration(format!(
542 "Background task interval {} seconds is out of range [1, 3600]",
543 config.Performance.BackgroundTaskIntervalSecs
544 )));
545 }
546
547 Ok(())
548 }
549
550 fn name(&self) -> &str { "PerformanceConfigValidator" }
551
552 fn priority(&self) -> u32 {
553 20 }
555}
556
557impl ConfigHotReload {
562 pub async fn New(config_path:PathBuf, initial_config:AirConfiguration) -> Result<Self> {
573 let (change_sender, _) = broadcast::channel(100);
574 let (reload_tx, reload_rx) = mpsc::channel(100);
575
576 let manager = Self {
577 active_config:Arc::new(RwLock::new(initial_config.clone())),
578 previous_config:Arc::new(RwLock::new(None)),
579 last_config_hash:Arc::new(RwLock::new(None)),
580 config_path,
581 watcher:None,
582 change_sender,
583 reload_tx,
584 change_history:Arc::new(RwLock::new(Vec::new())),
585 last_reload:Arc::new(RwLock::new(None)),
586 last_reload_duration:Arc::new(RwLock::new(None)),
587 enabled:Arc::new(RwLock::new(true)),
588 debounce_delay:Duration::from_millis(500),
589 last_change_time:Arc::new(RwLock::new(None)),
590 stats:Arc::new(RwLock::new(ReloadStats::default())),
591 validators:Arc::new(RwLock::new(Self::DefaultValidators())),
592 max_retries:3,
593 retry_delay:Duration::from_secs(1),
594 auto_rollback_enabled:Arc::new(RwLock::new(true)),
595 };
596
597 let hash = crate::Configuration::ConfigurationManager::ComputeHash(&initial_config)?;
599 *manager.last_config_hash.write().await = Some(hash);
600
601 manager.StartReloadProcessor(reload_rx);
603
604 Ok(manager)
605 }
606
607 fn DefaultValidators() -> Vec<Box<dyn ConfigValidator>> {
609 vec![
610 Box::new(GrpcConfigValidator),
611 Box::new(AuthConfigValidator),
612 Box::new(UpdateConfigValidator),
613 Box::new(DownloadConfigValidator),
614 Box::new(IndexingConfigValidator),
615 Box::new(LoggingConfigValidator),
616 Box::new(PerformanceConfigValidator),
617 ]
618 }
619
620 pub async fn EnableFileWatching(&mut self) -> Result<()> {
622 info!("[HotReload] Enabling file watching for configuration changes");
623
624 let config_path = self.config_path.clone();
625
626 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
628
629 let mut watcher = RecommendedWatcher::new(
630 move |res:NotifyResult<Event>| {
631 if let Ok(event) = res {
632 let _ = tx.blocking_send(event);
633 }
634 },
635 notify::Config::default(),
636 )
637 .map_err(|e| AirError::Configuration(format!("Failed to create file watcher: {}", e)))?;
638
639 let watch_path = if config_path.is_file() {
641 config_path.parent().unwrap_or(&config_path).to_path_buf()
642 } else {
643 config_path.clone()
644 };
645
646 watcher
647 .watch(&watch_path, RecursiveMode::NonRecursive)
648 .map_err(|e| AirError::Configuration(format!("Failed to watch path '{}': {}", watch_path.display(), e)))?;
649
650 let reload_tx = self.reload_tx.clone();
652 let config_path_clone = config_path.clone();
653
654 tokio::spawn(async move {
655 while let Some(event) = rx.recv().await {
656 log::trace!("[HotReload] File event detected: {:?}", event.kind);
657
658 let should_reload = event
660 .paths
661 .iter()
662 .any(|p| p == &config_path_clone || p == config_path_clone.as_path())
663 && event.kind != EventKind::Access(notify::event::AccessKind::Any);
664
665 if should_reload {
666 let _ = reload_tx.send(ReloadRequest::FileChange).await;
667 }
668 }
669 });
670
671 self.watcher = Some(Arc::new(RwLock::new(watcher)));
672 *self.enabled.write().await = true;
673
674 info!("[HotReload] File watching enabled for: {}", config_path.display());
675 Ok(())
676 }
677
678 pub async fn DisableFileWatching(&mut self) -> Result<()> {
680 *self.enabled.write().await = false;
681
682 if let Some(watcher) = self.watcher.take() {
683 drop(watcher);
684 }
685
686 info!("[HotReload] File watching disabled");
687 Ok(())
688 }
689
690 fn StartReloadProcessor(&self, mut reload_rx:mpsc::Receiver<ReloadRequest>) {
692 let enabled = self.enabled.clone();
693 let debounce_delay = self.debounce_delay;
694 let last_change_time = self.last_change_time.clone();
695
696 tokio::spawn(async move {
697 while let Some(request) = reload_rx.recv().await {
698 if !*enabled.read().await {
699 continue;
700 }
701
702 let now = Instant::now();
704 {
705 let mut last_change = last_change_time.write().await;
706 if let Some(last) = *last_change {
707 if now.duration_since(last) < debounce_delay {
708 continue; }
710 }
711 *last_change = Some(now);
712 }
713
714 sleep(debounce_delay).await;
715
716 match request {
718 ReloadRequest::Manual => {
719 info!("[HotReload] Processing manual reload request");
720 },
721 ReloadRequest::Signal => {
722 info!("[HotReload] Processing signal-based reload request");
723 },
724 ReloadRequest::FileChange => {
725 debug!("[HotReload] Processing file change reload request");
726 },
727 ReloadRequest::Periodic => {
728 trace!("[HotReload] Processing periodic reload check");
729 },
730 }
731 }
732 });
733 }
734
735 pub async fn Reload(&self) -> Result<()> {
737 debug!("[HotReload] Reloading configuration from: {}", self.config_path.display());
738
739 if !*self.enabled.read().await {
741 return Err(AirError::Configuration("Hot-reload is disabled".to_string()));
742 }
743
744 let start_time = Instant::now();
745
746 {
748 let mut stats = self.stats.write().await;
749 stats.total_attempts += 1;
750 }
751
752 let mut last_error = None;
754 for attempt in 0..=self.max_retries {
755 match self.AttemptReload().await {
756 Ok(()) => {
757 let duration = start_time.elapsed();
758 *self.last_reload_duration.write().await = Some(duration);
759
760 {
762 let mut stats = self.stats.write().await;
763 stats.successful_reloads += 1;
764 stats.last_error = None;
765 }
766
767 info!("[HotReload] Configuration reloaded successfully in {:?}", duration);
768 return Ok(());
769 },
770 Err(e) => {
771 last_error = Some(e.clone());
772 if attempt < self.max_retries {
773 let delay = self.retry_delay * 2_u32.pow(attempt);
774 warn!(
775 "[HotReload] Reload attempt {} failed, retrying in {:?}: {}",
776 attempt + 1,
777 delay,
778 e
779 );
780 sleep(delay).await;
781 }
782 },
783 }
784 }
785
786 {
788 let mut stats = self.stats.write().await;
789 stats.failed_reloads += 1;
790 stats.last_error = last_error.as_ref().map(|e| e.to_string());
791 }
792
793 let error = last_error.unwrap_or_else(|| AirError::Configuration("Unknown error".to_string()));
794
795 if *self.auto_rollback_enabled.read().await {
797 info!("[HotReload] Attempting rollback due to reload failure");
798 if let Err(rollback_err) = self.Rollback().await {
799 error!("[HotReload] Rollback also failed: {}", rollback_err);
800 }
801 }
802
803 Err(error)
804 }
805
806 async fn AttemptReload(&self) -> Result<()> {
808 let content = fs::read_to_string(&self.config_path).await;
810 if let Err(e) = content {
811 let mut stats = self.stats.write().await;
812 stats.parse_errors += 1;
813 return Err(AirError::Configuration(format!("Failed to read config file: {}", e)));
814 }
815 let content = content.unwrap();
816
817 let new_config:std::result::Result<AirConfiguration, toml::de::Error> = toml::from_str(&content);
818 if let Err(e) = new_config {
819 let mut stats = self.stats.write().await;
820 stats.parse_errors += 1;
821 return Err(AirError::Configuration(format!("Failed to parse config file: {}", e)));
822 }
823 let new_config = new_config.unwrap();
824
825 self.ValidateConfig(&new_config).await?;
827
828 let new_hash = crate::Configuration::ConfigurationManager::ComputeHash(&new_config)?;
830 let current_hash = self.last_config_hash.read().await.clone();
831
832 if let Some(ref hash) = current_hash {
833 if hash == &new_hash {
834 debug!("[HotReload] Configuration unchanged, skipping reload");
835 return Ok(());
836 }
837 }
838
839 let old_config = self.active_config.read().await.clone();
841 let old_hash = current_hash;
842
843 *self.active_config.write().await = new_config.clone();
844 *self.previous_config.write().await = Some(old_config.clone());
845 *self.last_config_hash.write().await = Some(new_hash.clone());
846 *self.last_reload.write().await = Some(Utc::now());
847
848 let changes = self.ComputeChanges(&old_config, &new_config);
850
851 let record = ConfigChangeRecord {
852 timestamp:Utc::now(),
853 changes:changes.clone(),
854 validated:true,
855 reason:"Reload".to_string(),
856 rollback_performed:false,
857 };
858
859 let mut history = self.change_history.write().await;
860 history.push(record);
861
862 let history_len = history.len();
864 if history_len > 1000 {
865 history.drain(0..history_len - 1000);
866 }
867 drop(history);
868
869 let event = ConfigChangeEvent {
871 timestamp:Utc::now(),
872 old_config_hash:old_hash,
873 new_config_hash:new_hash,
874 changes,
875 success:true,
876 };
877
878 let _ = self.change_sender.send(event);
879
880 Ok(())
881 }
882
883 pub async fn ReloadAndValidate(&self) -> Result<()> { self.Reload().await }
885
886 pub async fn TriggerReload(&self) -> Result<()> {
888 self.reload_tx
889 .send(ReloadRequest::Manual)
890 .await
891 .map_err(|e| AirError::Configuration(format!("Failed to trigger reload: {}", e)))?;
892 Ok(())
893 }
894
895 async fn ValidateConfig(&self, config:&AirConfiguration) -> Result<()> {
897 let validators = self.validators.read().await;
898
899 let mut sorted_validators:Vec<_> = validators.iter().collect();
901 sorted_validators.sort_by(|a, b| b.priority().cmp(&a.priority()));
902
903 for validator in sorted_validators {
904 let result = validator.validate(config);
905 if let Err(e) = result {
906 let mut stats = self.stats.write().await;
907 stats.validation_errors += 1;
908 stats.last_error = Some(format!("{}: {}", validator.name(), e));
909 error!("[HotReload] Validation failed ({}): {}", validator.name(), e);
910 return Err(AirError::Configuration(format!("{}: {}", validator.name(), e)));
911 }
912
913 trace!("[HotReload] Validator '{}' passed", validator.name());
914 }
915
916 info!("[HotReload] Configuration validation passed ({} validators)", validators.len());
917 Ok(())
918 }
919
920 pub async fn RegisterValidator(&self, validator:Box<dyn ConfigValidator>) {
922 let mut validators = self.validators.write().await;
923 validators.push(validator);
924 info!("[HotReload] Registered validator (total: {})", validators.len());
925 }
926
927 pub async fn Rollback(&self) -> Result<()> {
929 let previous = {
930 let prev = self.previous_config.read().await.clone();
931 prev.ok_or_else(|| AirError::Configuration("No previous configuration to rollback to".to_string()))?
932 };
933
934 self.ValidateConfig(&previous).await?;
936
937 let _old_config = self.active_config.read().await.clone();
939 let old_hash = self.last_config_hash.read().await.clone();
940
941 *self.active_config.write().await = previous.clone();
942 let new_hash = crate::Configuration::ConfigurationManager::ComputeHash(&previous)?;
943 *self.last_config_hash.write().await = Some(new_hash.clone());
944
945 let record = ConfigChangeRecord {
947 timestamp:Utc::now(),
948 changes:vec![],
949 validated:true,
950 reason:"Rollback".to_string(),
951 rollback_performed:true,
952 };
953
954 {
955 let mut stats = self.stats.write().await;
956 stats.rollback_attempts += 1;
957 }
958
959 self.change_history.write().await.push(record);
960
961 let event = ConfigChangeEvent {
963 timestamp:Utc::now(),
964 old_config_hash:old_hash,
965 new_config_hash:new_hash,
966 changes:vec![],
967 success:true,
968 };
969
970 let _ = self.change_sender.send(event);
971
972 info!("[HotReload] Configuration rolled back successfully");
973 Ok(())
974 }
975
976 pub async fn GetConfig(&self) -> AirConfiguration { self.active_config.read().await.clone() }
978
979 pub async fn GetConfigRef(&self) -> tokio::sync::RwLockReadGuard<'_, AirConfiguration> {
981 self.active_config.read().await
982 }
983
984 pub async fn SetValue(&self, path:&str, value:&str) -> Result<()> {
986 let mut config = self.GetConfig().await;
987
988 Self::SetConfigValue(&mut config, path, value)?;
990
991 self.ValidateConfig(&config).await?;
993
994 let content = toml::to_string_pretty(&config)
996 .map_err(|e| AirError::Configuration(format!("Serialization failed: {}", e)))?;
997
998 fs::write(&self.config_path, content)
999 .await
1000 .map_err(|e| AirError::Configuration(format!("Failed to write config: {}", e)))?;
1001
1002 self.Reload().await?;
1004
1005 info!("[HotReload] Configuration value updated: {} = {}", path, value);
1006 Ok(())
1007 }
1008
1009 pub async fn GetValue(&self, path:&str) -> Result<serde_json::Value> {
1011 let config = self.active_config.read().await;
1012 let config_json = serde_json::to_value(&*config)
1013 .map_err(|e| AirError::Configuration(format!("Serialization failed: {}", e)))?;
1014
1015 let mut current = config_json;
1016 for key in path.split('.') {
1017 current = current
1018 .get(key)
1019 .ok_or_else(|| AirError::Configuration(format!("Key not found: {}", path)))?
1020 .clone();
1021 }
1022
1023 Ok(current)
1024 }
1025
1026 fn SetConfigValue(config:&mut AirConfiguration, path:&str, value:&str) -> Result<()> {
1028 let parts:Vec<&str> = path.split('.').collect();
1029
1030 match parts.as_slice() {
1031 ["grpc", "bind_address"] => config.Grpc.BindAddress = value.to_string(),
1032 ["grpc", "max_connections"] => {
1033 config.Grpc.MaxConnections = value
1034 .parse()
1035 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1036 },
1037 ["grpc", "request_timeout_secs"] => {
1038 config.Grpc.RequestTimeoutSecs = value
1039 .parse()
1040 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1041 },
1042 ["authentication", "enabled"] => {
1043 config.Authentication.Enabled = value
1044 .parse()
1045 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1046 },
1047 ["authentication", "credentials_path"] => {
1048 config.Authentication.CredentialsPath = value.to_string();
1049 },
1050 ["updates", "enabled"] => {
1051 config.Updates.Enabled = value
1052 .parse()
1053 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1054 },
1055 ["updates", "auto_download"] => {
1056 config.Updates.AutoDownload = value
1057 .parse()
1058 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1059 },
1060 ["updates", "auto_install"] => {
1061 config.Updates.AutoInstall = value
1062 .parse()
1063 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1064 },
1065 ["downloader", "enabled"] => {
1066 config.Downloader.Enabled = value
1067 .parse()
1068 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1069 },
1070 ["indexing", "enabled"] => {
1071 config.Indexing.Enabled = value
1072 .parse()
1073 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1074 },
1075 ["logging", "level"] => {
1076 config.Logging.Level = value.to_lowercase();
1077 },
1078 ["logging", "console_enabled"] => {
1079 config.Logging.ConsoleEnabled = value
1080 .parse()
1081 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1082 },
1083 _ => {
1084 return Err(AirError::Configuration(format!("Unknown configuration path: {}", path)));
1085 },
1086 }
1087
1088 Ok(())
1089 }
1090
1091 fn ComputeChanges(&self, old:&AirConfiguration, new:&AirConfiguration) -> Vec<ConfigChange> {
1093 let mut changes = Vec::new();
1094
1095 let old_json = serde_json::to_value(old).unwrap_or_default();
1096 let new_json = serde_json::to_value(new).unwrap_or_default();
1097
1098 Self::DiffJson("", &old_json, &new_json, &mut changes);
1099
1100 changes
1101 }
1102
1103 fn DiffJson(prefix:&str, old:&serde_json::Value, new:&serde_json::Value, changes:&mut Vec<ConfigChange>) {
1105 match (old, new) {
1106 (serde_json::Value::Object(old_map), serde_json::Value::Object(new_map)) => {
1107 for (key, new_val) in new_map {
1108 let new_prefix = if prefix.is_empty() { key.clone() } else { format!("{}.{}", prefix, key) };
1109
1110 if let Some(old_val) = old_map.get(key) {
1111 Self::DiffJson(&new_prefix, old_val, new_val, changes);
1112 } else {
1113 changes.push(ConfigChange {
1114 path:new_prefix,
1115 old_value:serde_json::Value::Null,
1116 new_value:new_val.clone(),
1117 });
1118 }
1119 }
1120 },
1121 (old_val, new_val) if old_val != new_val => {
1122 changes.push(ConfigChange {
1123 path:prefix.to_string(),
1124 old_value:old_val.clone(),
1125 new_value:new_val.clone(),
1126 });
1127 },
1128 _ => {},
1129 }
1130 }
1131
1132 pub async fn GetChangeHistory(&self, limit:Option<usize>) -> Vec<ConfigChangeRecord> {
1134 let history = self.change_history.read().await;
1135
1136 if let Some(limit) = limit {
1137 history.iter().rev().take(limit).cloned().collect()
1138 } else {
1139 history.iter().rev().cloned().collect()
1140 }
1141 }
1142
1143 pub async fn GetLastReload(&self) -> Option<DateTime<Utc>> { *self.last_reload.read().await }
1145
1146 pub async fn GetLastReloadDuration(&self) -> Option<Duration> { *self.last_reload_duration.read().await }
1148
1149 pub async fn GetStats(&self) -> ReloadStats { self.stats.read().await.clone() }
1151
1152 pub async fn IsEnabled(&self) -> bool { *self.enabled.read().await }
1154
1155 pub async fn SetAutoRollback(&self, enabled:bool) {
1157 *self.auto_rollback_enabled.write().await = enabled;
1158 info!("[HotReload] Auto-rollback {}", if enabled { "enabled" } else { "disabled" });
1159 }
1160
1161 pub fn SubscribeChanges(&self) -> broadcast::Receiver<ConfigChangeEvent> { self.change_sender.subscribe() }
1165
1166 pub fn GetConfigPath(&self) -> &Path { &self.config_path }
1168
1169 pub async fn SetDebounceDelay(&self, delay:Duration) {
1171 info!("[HotReload] Debounce delay set to {:?}", delay);
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use tempfile::NamedTempFile;
1181
1182 use super::*;
1183
1184 #[tokio::test]
1185 async fn test_config_hot_reload_creation() {
1186 let config = AirConfiguration::default();
1187 let temp_file = NamedTempFile::new().unwrap();
1188 let path = temp_file.path().to_path_buf();
1189
1190 let manager = ConfigHotReload::New(path, config).await.expect("Failed to create manager");
1191
1192 assert_eq!(manager.GetLastReload().await, None);
1193 assert!(
1194 !manager.GetChangeHistory(Some(10)).await.is_empty() || manager.GetChangeHistory(Some(10)).await.is_empty()
1195 );
1196 }
1197
1198 #[tokio::test]
1199 async fn test_get_set_value() {
1200 let config = AirConfiguration::default();
1201 let temp_file = NamedTempFile::new().unwrap();
1202 let path = temp_file.path().to_path_buf();
1203
1204 let content = toml::to_string_pretty(&config).unwrap();
1206 fs::write(&path, content).await.unwrap();
1207
1208 let manager = ConfigHotReload::New(path, config).await.expect("Failed to create manager");
1209
1210 let value = manager.GetValue("grpc.bind_address").await.unwrap();
1212 assert_eq!(value, "[::1]:50053");
1213 }
1214
1215 #[tokio::test]
1216 async fn test_validator_priority() {
1217 let grpc = GrpcConfigValidator;
1218 let auth = AuthConfigValidator;
1219 let perf = PerformanceConfigValidator;
1220
1221 assert!(grpc.priority() > auth.priority());
1222 assert!(auth.priority() > perf.priority());
1223 }
1224
1225 #[tokio::test]
1226 async fn test_compute_changes() {
1227 let config = AirConfiguration::default();
1228 let manager = ConfigHotReload::New(PathBuf::from("/tmp/test.toml"), config)
1229 .await
1230 .expect("Failed to create manager");
1231
1232 let mut new_config = AirConfiguration::default();
1233 new_config.grpc.bind_address = "[::1]:50054".to_string();
1234
1235 let changes = manager.ComputeChanges(&AirConfiguration::default(), &new_config);
1236
1237 assert!(!changes.is_empty());
1238 assert!(changes.iter().any(|c| c.path == "grpc.bind_address"));
1239 }
1240}