1use std::{env, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
167
168use log::{debug, error, info, warn};
169use tokio::{signal, time::interval};
170use AirLibrary::{
172 AirError,
173 ApplicationState::ApplicationState,
174 Authentication::AuthenticationService,
175 CLI::{CliParser, Command, ConfigCommand, DebugCommand, OutputFormatter},
176 Configuration::{AirConfiguration, ConfigurationManager},
177 Daemon::DaemonManager,
178 DefaultBindAddress,
179 DefaultConfigFile,
180 Downloader::DownloadManager,
181 HealthCheck::{HealthCheckLevel, HealthCheckManager, HealthStatistics},
182 Indexing::FileIndexer,
183 Logging,
184 Metrics::{self, MetricsCollector, MetricsData},
185 ProtocolVersion,
186 Tracing,
187 Updates::UpdateManager,
188 VERSION,
189 Vine::Generated::air::air_service_server::AirServiceServer,
190 Vine::Server::AirVinegRPCService::AirVinegRPCService,
191};
192
193macro_rules! Trace {
199 ($($arg:tt)*) => {{
200 debug!($($arg)*);
201 }};
202}
203
204async fn WaitForShutdownSignal() {
216 info!("[Shutdown] Waiting for termination signal...");
217
218 let ctrl_c = async {
219 match signal::ctrl_c().await {
220 Ok(()) => info!("[Shutdown] Received Ctrl+C signal"),
221 Err(e) => error!("[Shutdown] Failed to install Ctrl+C handler: {}", e),
222 }
223 };
224
225 #[cfg(unix)]
226 let terminate = async {
227 match signal::unix::signal(signal::unix::SignalKind::terminate()) {
228 Ok(mut sig) => {
229 sig.recv().await;
230 info!("[Shutdown] Received SIGTERM signal");
231 },
232 Err(e) => error!("[Shutdown] Failed to install signal handler: {}", e),
233 }
234 };
235
236 #[cfg(not(unix))]
237 let terminate = std::future::pending::<()>();
238
239 tokio::select! {
240 _ = ctrl_c => {},
241 _ = terminate => {},
242 }
243
244 info!("[Shutdown] Signal received, initiating graceful shutdown");
245}
246
247fn InitializeLogging() {
264 let json_output = match std::env::var("AIR_LOG_JSON") {
266 Ok(val) if !val.is_empty() => {
267 let normalized = val.to_lowercase();
268 if normalized != "true" && normalized != "false" {
269 eprintln!(
270 "Warning: Invalid AIR_LOG_JSON value '{}', expected 'true' or 'false'. Using default: false",
271 val
272 );
273 false
274 } else {
275 normalized == "true"
276 }
277 },
278 Ok(_) => false,
279 Err(_) => false,
280 };
281
282 let log_file_path = std::env::var("AIR_LOG_FILE").ok().and_then(|path| {
284 if path.is_empty() {
285 None
286 } else {
287 if let Some(parent) = std::path::PathBuf::from(&path).parent() {
289 if parent.as_os_str().is_empty() {
290 Some(path)
292 } else if parent.exists() {
293 Some(path)
294 } else {
295 eprintln!(
296 "Warning: Log file directory does not exist: {}. Logging to stdout only.",
297 parent.display()
298 );
299 None
300 }
301 } else {
302 Some(path)
303 }
304 }
305 });
306
307 let log_result = Logging::InitializeLogger(json_output, log_file_path.clone());
309
310 match log_result {
311 Ok(_) => {
312 let log_info = match &log_file_path {
313 Some(path) => format!("file: {}", path),
314 None => "stdout/stderr".to_string(),
315 };
316 info!("[Boot] Logging initialized - JSON: {}, Output: {}", json_output, log_info);
317 },
318 Err(e) => {
319 eprintln!("[ERROR] Failed to initialize structured logging: {}", e);
321 eprintln!("[ERROR] Logging will fall back to stderr-only output");
322 },
323 }
324}
325
326fn ParseArguments() -> (Option<String>, Option<String>, Option<Command>) {
346 let args:Vec<String> = std::env::args().collect();
348
349 if args.len() > 1024 {
351 eprintln!("[ERROR] Too many command line arguments (max: 1024)");
352 std::process::exit(1);
353 }
354
355 for (i, arg) in args.iter().enumerate() {
357 if arg.len() > 4096 {
358 eprintln!("[ERROR] Argument at position {} is too long (max: 4096 characters)", i);
359 std::process::exit(1);
360 }
361 }
362
363 if args.len() > 1 {
365 match args[1].as_str() {
366 "status" | "restart" | "config" | "metrics" | "logs" | "debug" | "help" | "version" | "-h" | "--help"
367 | "-v" | "--version" => {
368 match CliParser::parse(args.clone()) {
370 Ok(cmd) => {
371 debug!("[Boot] CLI command parsed: {:?}", cmd);
372 return (None, None, Some(cmd));
373 },
374 Err(e) => {
375 eprintln!("[ERROR] Error parsing CLI command: {}", e);
376 eprintln!("[ERROR] Run 'Air help' for usage information");
377 std::process::exit(1);
378 },
379 }
380 },
381 _ => {},
382 }
383 }
384
385 let mut config_path:Option<String> = None;
387 let mut bind_address:Option<String> = None;
388
389 let mut i = 0;
390 while i < args.len() {
391 match args[i].as_str() {
392 "--config" | "-c" => {
393 if i + 1 < args.len() {
394 let path = &args[i + 1];
395 if path.contains("..") || path.contains('\0') {
397 eprintln!("[ERROR] Invalid config path: contains '..' or null character");
398 std::process::exit(1);
399 }
400 config_path = Some(path.clone());
401 i += 1;
402 } else {
403 eprintln!("[ERROR] --config flag requires a path argument");
404 std::process::exit(1);
405 }
406 },
407 "--bind" | "-b" => {
408 if i + 1 < args.len() {
409 let addr = &args[i + 1];
410 if addr.is_empty() || addr.len() > 256 {
412 eprintln!("[ERROR] Invalid bind address: must be 1-256 characters");
413 std::process::exit(1);
414 }
415 if addr.contains('\0') {
417 eprintln!("[ERROR] Invalid bind address: contains null character");
418 std::process::exit(1);
419 }
420 bind_address = Some(addr.clone());
421 i += 1;
422 } else {
423 eprintln!("[ERROR] --bind flag requires an address argument");
424 std::process::exit(1);
425 }
426 },
427 _ => {
428 },
431 }
432 i += 1;
433 }
434
435 debug!("[Boot] Daemon mode - config: {:?}, bind: {:?}", config_path, bind_address);
436
437 (config_path, bind_address, None)
438}
439
440async fn HandleCommand(cmd:Command) -> Result<(), Box<dyn std::error::Error>> {
463 let validation_result = validate_command(&cmd);
465 if let Err(e) = validation_result {
466 eprintln!("[ERROR] Command validation failed: {}", e);
467 return Err(e.into());
468 }
469
470 match cmd {
471 Command::Help { command } => {
472 if let Some(ref cmd) = command {
474 if cmd.len() > 128 {
475 eprintln!("[ERROR] Command name too long (max: 128 characters)");
476 return Err("Command name too long".into());
477 }
478 }
479 println!("{}", OutputFormatter::format_help(command.as_deref(), VERSION));
480 Ok(())
481 },
482
483 Command::Version => {
484 println!("Air {} ({})", VERSION, env!("CARGO_PKG_NAME"));
485 println!("Protocol: Version {} (gRPC)", ProtocolVersion);
486 println!("Port: {} (Air), {} (Cocoon)", DefaultBindAddress, "[::1]:50052");
487 println!("Build: {} {}", env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_NAME"));
488 Ok(())
489 },
490
491 Command::Status { service, verbose, json } => {
492 if let Some(ref svc) = service {
494 if svc.is_empty() || svc.len() > 64 {
495 return Err("Service name must be 1-64 characters".into());
496 }
497 }
498
499 if let Some(svc) = service {
504 println!("📊 Status for service: {}", svc);
505
506 match attempt_daemon_connection().await {
508 Ok(_) => {
509 println!(" Status: ⚠️ Running (basic check)");
510 println!(" Note: Detailed status not yet implemented");
511 },
512 Err(e) => {
513 println!(" Status: ❌ Cannot connect to daemon");
514 println!(" Error: {}", e);
515 println!("");
516 println!(" To start the daemon, run: Air --daemon");
517 return Err(format!("Cannot connect to daemon: {}", e).into());
518 },
519 }
520 } else {
521 println!("📊 Air Daemon Status");
522 println!("");
523
524 match attempt_daemon_connection().await {
526 Ok(_) => {
527 println!(" Overall: ⚠️ Running (basic check)");
528 println!(" Note: Detailed status monitoring not yet implemented");
529 println!("");
530 println!(" Services:");
531 println!(" gRPC Server: ✅ Listening");
532 println!(" Authentication: ⚠️ Not checked");
533 println!(" Updates: ⚠️ Not checked");
534 println!(" Download Manager: ⚠️ Not checked");
535 println!(" File Indexer: ⚠️ Not checked");
536 },
537 Err(e) => {
538 println!(" Overall: ❌ Daemon not running");
539 println!(" Error: {}", e);
540 println!("");
541 println!(" To start the daemon, run: Air --daemon");
542 return Err("Daemon not running".into());
543 },
544 }
545 }
546
547 if verbose {
548 println!("");
549 println!("🔍 Verbose Information:");
550 println!(" Debug mode: Disabled by default");
551 println!(" Log level: info");
552 println!(" Config file: {}", DefaultConfigFile);
553 println!("");
554 println!(" TODO: Implement detailed service status with:");
555 println!(" - Service uptime");
556 println!(" - Request/response statistics");
557 println!(" - Error rates and recent errors");
558 println!(" - Resource usage");
559 println!(" - Active connections");
560 }
561
562 if json {
563 println!("");
564 println!("📋 JSON Output:");
565 println!(
566 "{}",
567 serde_json::json!({
568 "overall": "running",
569 "services": {
570 "grpc": "listening",
571 "status": "not_implemented"
572 },
573 "note": "Detailed JSON output not yet implemented"
574 })
575 );
576 }
577
578 Ok(())
579 },
580
581 Command::Restart { service, force } => {
582 if let Some(ref svc) = service {
584 if svc.is_empty() || svc.len() > 64 {
585 return Err("Service name must be 1-64 characters".into());
586 }
587 }
588
589 println!("🔄 Restart Command");
591 println!("");
592
593 if let Some(svc) = service {
594 println!("Restarting service: {}", svc);
595 println!(" Note: Individual service restart not yet implemented");
596 println!(" Workaround: Restart the entire daemon");
597 } else {
598 println!("Restarting all services...");
599 println!(" Note: Full daemon restart not yet implemented");
600 println!(" Workaround: Use: kill <pid> && Air --daemon");
601 }
602
603 if force {
604 println!("");
605 println!("⚠️ Force mode enabled");
606 println!(" Warning: This will terminate in-progress operations");
607 println!(" TODO: Implement force restart with proper coordination");
608 }
609
610 Err("Restart command not yet implemented".into())
611 },
612
613 Command::Config(config_cmd) => {
614 match config_cmd {
615 ConfigCommand::Get { key } => {
616 if key.is_empty() || key.len() > 256 {
618 return Err("Configuration key must be 1-256 characters".into());
619 }
620 if key.contains('\0') || key.contains('\n') {
621 return Err("Configuration key contains invalid characters".into());
622 }
623
624 println!("⚙️ Get Configuration");
626 println!(" Key: {}", key);
627 println!("");
628 println!(" Note: Config retrieval not yet implemented");
629 println!(" Workaround: Check config file directly: cat {}", DefaultConfigFile);
630
631 Err("Config 'get' command not yet implemented".into())
632 },
633
634 ConfigCommand::Set { key, value } => {
635 if key.is_empty() || key.len() > 256 {
637 return Err("Configuration key must be 1-256 characters".into());
638 }
639 if value.len() > 8192 {
640 return Err("Configuration value too long (max: 8192 characters)".into());
641 }
642 if key.contains('\0') || key.contains('\n') {
643 return Err("Configuration key contains invalid characters".into());
644 }
645
646 println!("⚙️ Set Configuration");
648 println!(" Key: {}", key);
649 println!(" Value: {}", value);
650 println!("");
651 println!(" Note: Config update not yet implemented");
652 println!(" Workaround: Edit config file directly, then use 'Air config reload'");
653
654 println!("");
655 println!(" ⚠️ Warning: Config changes without reload won't take effect");
656 println!(" ⚠️ Warning: Some settings may require full daemon restart");
657
658 Err("Config 'set' command not yet implemented".into())
659 },
660
661 ConfigCommand::Reload { validate } => {
662 println!("🔄 Reload Configuration");
664 println!("");
665
666 match attempt_daemon_connection().await {
667 Ok(_) => {
668 println!(" Status: ⚠️ Daemon is running");
669 println!("");
670 println!(" Note: Config reload not yet implemented");
671 println!(" Workaround: Restart daemon to apply config changes");
672 println!("");
673 if validate {
674 println!(" ℹ️ Validation mode requested");
675 println!(" (Will be implemented with config reload)");
676 }
677 },
678 Err(e) => {
679 println!(" Status: ❌ Cannot connect to daemon");
680 println!(" Error: {}", e);
681 return Err(format!("Cannot reload config: {}", e).into());
682 },
683 }
684
685 Err("Config 'reload' command not yet implemented".into())
686 },
687
688 ConfigCommand::Show { json } => {
689 println!("⚙️ Show Configuration");
691 println!("");
692
693 if json {
694 println!(" JSON output requested");
695 println!(" Note: JSON config export not yet implemented");
696 } else {
697 println!(" Current Configuration:");
698 println!(" Note: Config display not yet implemented");
699 println!(" Workaround: View config file: cat {}", DefaultConfigFile);
700 }
701
702 println!("");
703 println!(" Default config file: {}", DefaultConfigFile);
704 println!(" Config directory: ~/.config/Air/");
705
706 Err("Config 'show' command not yet implemented".into())
707 },
708
709 ConfigCommand::Validate { path } => {
710 if let Some(ref p) = path {
712 if p.is_empty() || p.len() > 512 {
713 return Err("Config path must be 1-512 characters".into());
714 }
715 if p.contains("..") || p.contains('\0') {
716 return Err("Config path contains invalid characters".into());
717 }
718 }
719
720 println!("✅ Validate Configuration");
721 println!("");
722
723 let config_path = path.unwrap_or_else(|| DefaultConfigFile.to_string());
724 println!(" Config file: {}", config_path);
725 println!("");
726
727 match std::path::Path::new(&config_path).exists() {
729 true => {
730 println!(" ✅ Config file exists");
731 println!(" Note: Detailed validation not yet implemented");
732 println!(" Workaround: Use: Air --validate-config");
733 },
734 false => {
735 println!(" ❌ Config file not found");
736 println!(" Hint: Create a config file or use defaults");
737 },
738 }
739
740 Err("Config 'validate' command not yet implemented".into())
741 },
742 }
743 },
744
745 Command::Metrics { json, service } => {
746 if let Some(ref svc) = service {
748 if svc.is_empty() || svc.len() > 64 {
749 return Err("Service name must be 1-64 characters".into());
750 }
751 }
752
753 println!("📊 Metrics");
754 println!("");
755
756 match attempt_daemon_connection().await {
758 Ok(_) => {
759 println!(" Status: ✅ Daemon is running");
760 println!("");
761 println!(" Note: Metrics collection is partially implemented");
762 println!("");
763 println!(" Current Metrics (basic):");
764 println!(" Uptime: Not tracked yet");
765 println!(" Requests: Not tracked yet");
766 println!(" Errors: Not tracked yet");
767 println!(" Memory: Not tracked yet");
768 println!(" CPU: Not tracked yet");
769 println!("");
770 println!(" TODO: Implement comprehensive metrics:");
771 println!(" - Request/response counters");
772 println!(" - Latency percentiles");
773 println!(" - Error rate tracking");
774 println!(" - Resource usage");
775 println!(" - Connection pool stats");
776 println!(" - Background queue depth");
777 },
778 Err(e) => {
779 println!(" Status: ❌ Cannot connect to daemon");
780 println!(" Error: {}", e);
781 return Err(format!("Cannot retrieve metrics: {}", e).into());
782 },
783 }
784
785 if json {
786 println!("");
787 println!("📋 JSON Output:");
788 println!(
789 "{}",
790 serde_json::json!({
791 "note": "Detailed metrics not yet implemented",
792 "suggestion": "Use /metrics endpoint when daemon is running"
793 })
794 );
795 }
796
797 if let Some(svc) = service {
798 println!("");
799 println!(" Service-specific metrics requested: {}", svc);
800 println!(" Note: Service isolation not yet implemented");
801 }
802
803 Ok(())
804 },
805
806 Command::Logs { service, tail, filter, follow } => {
807 if let Some(ref svc) = service {
809 if svc.is_empty() || svc.len() > 64 {
810 return Err("Service name must be 1-64 characters".into());
811 }
812 }
813 if let Some(n) = tail {
814 if n < 1 || n > 10000 {
815 return Err("Tail count must be 1-10000 lines".into());
816 }
817 }
818 if let Some(ref f) = filter {
819 if f.is_empty() || f.len() > 512 {
820 return Err("Filter string must be 1-512 characters".into());
821 }
822 }
823
824 println!("📝 Logs");
825 println!("");
826
827 let log_file = std::env::var("AIR_LOG_FILE").ok();
829 let log_dir = std::env::var("AIR_LOG_DIR").ok();
830
831 match (log_file, log_dir) {
832 (Some(file), _) => {
833 println!(" Log file: {}", file);
834
835 if std::path::Path::new(&file).exists() {
837 println!(" Status: ✅ Log file exists");
838 println!("");
839
840 println!(" Note: Log viewing not yet implemented");
842 println!(" Workaround: Use standard tools:");
843 println!(" - tail -n {} {}", tail.unwrap_or(100), file);
844
845 if let Some(f) = filter {
846 println!(" - grep '{}' {} | tail -n {}", f, file, tail.unwrap_or(100));
847 }
848
849 if follow {
850 println!(" - tail -f {}", file);
851 }
852 } else {
853 println!(" Status: ❌ Log file not found");
854 println!(" Check logging configuration");
855 }
856 },
857 (_, Some(dir)) => {
858 println!(" Log directory: {}", dir);
859 println!(" Note: Log file viewing not yet implemented");
860 println!(" Workaround: Find and view log files in the directory");
861 },
862 _ => {
863 println!(" Log file: Not configured");
864 println!(" Set via: AIR_LOG_FILE=/path/to/Air.log");
865 println!("");
866 println!(" Logs are likely going to stdout/stderr");
867 println!(" Use journalctl (Linux/macOS) or Event Viewer (Windows)");
868 },
869 }
870
871 if let Some(svc) = service {
872 println!("");
873 println!(" Service-specific logs requested: {}", svc);
874 println!(" Note: Service log isolation not yet implemented");
875 }
876
877 Err("Logs command not yet fully implemented".into())
879 },
880
881 Command::Debug(debug_cmd) => {
882 match debug_cmd {
883 DebugCommand::DumpState { service, json } => {
884 if let Some(ref svc) = service {
886 if svc.is_empty() || svc.len() > 64 {
887 return Err("Service name must be 1-64 characters".into());
888 }
889 }
890
891 println!("🔧 Debug: Dump State");
892 println!("");
893
894 if let Some(svc) = service {
895 println!(" Service: {}", svc);
896 println!(" Note: Service state isolation not yet implemented");
897 } else {
898 println!(" Dumping all service states...");
899 println!(" Note: State dumping not yet implemented");
900 }
901
902 if json {
903 println!("");
904 println!(" JSON format requested");
905 println!(" Note: JSON state export not yet implemented");
906 }
907
908 println!("");
909 println!(" TODO: Implement state dump for:");
910 println!(" - Application state");
911 println!(" - Service states");
912 println!(" - Connection pool");
913 println!(" - Background tasks");
914 println!(" - Metrics cache");
915 println!(" - Configuration snapshot");
916
917 Err("Debug 'dump-state' command not yet implemented".into())
918 },
919
920 DebugCommand::DumpConnections { format } => {
921 println!("🔧 Debug: Dump Connections");
922 println!("");
923
924 match attempt_daemon_connection().await {
925 Ok(_) => {
926 println!(" Status: ✅ Daemon is running");
927 println!("");
928 println!(" Active Connections: 0");
929 println!(" Note: Connection tracking not yet implemented");
930 },
931 Err(e) => {
932 println!(" Status: ❌ Cannot connect to daemon");
933 println!(" Error: {}", e);
934 return Err(format!("Cannot dump connections: {}", e).into());
935 },
936 }
937
938 if let Some(fmt) = format {
939 println!("");
940 println!(" Format: {}", fmt);
941 println!(" Note: Custom format not yet implemented");
942 }
943
944 println!("");
945 println!(" TODO: Implement connection dump with:");
946 println!(" - Connection ID");
947 println!(" - Remote address");
948 println!(" - Connected at timestamp");
949 println!(" - Last activity");
950 println!(" - Active requests");
951 println!(" - Bytes transferred");
952
953 Err("Debug 'dump-connections' command not yet implemented".into())
954 },
955
956 DebugCommand::HealthCheck { verbose, service } => {
957 if let Some(ref svc) = service {
959 if svc.is_empty() || svc.len() > 64 {
960 return Err("Service name must be 1-64 characters".into());
961 }
962 }
963
964 println!("🔧 Debug: Health Check");
965 println!("");
966
967 match attempt_daemon_connection().await {
968 Ok(_) => {
969 println!(" Overall: ⚠️ Basic check passed");
970 println!("");
971
972 if let Some(svc) = service {
973 println!(" Service: {}", svc);
974 println!(" Status: Not checked (detailed checks not implemented)");
975 } else {
976 println!(" Services:");
977 println!(" gRPC Server: ✅ Responding");
978 println!(" Authentication: ⏸️ Not checked");
979 println!(" Updates: ⏸️ Not checked");
980 println!(" Download Manager: ⏸️ Not checked");
981 println!(" File Indexer: ⏸️ Not checked");
982 }
983
984 if verbose {
985 println!("");
986 println!(" 🔍 Verbose Information:");
987 println!(" Last health check: Not tracked");
988 println!(" Health check interval: 30s (default)");
989 println!(" Failure threshold: 3 (configurable)");
990 println!(" Recovery threshold: 2 (configurable)");
991 }
992 },
993 Err(e) => {
994 println!(" Overall: ❌ Daemon unreachable");
995 println!(" Error: {}", e);
996 return Err(format!("Health check failed: {}", e).into());
997 },
998 }
999
1000 Err("Debug 'health-check' not detailed yet".into())
1001 },
1002
1003 DebugCommand::Diagnostics { level } => {
1004 println!("🔧 Debug: Diagnostics");
1005 println!("");
1006 println!(" Level: {:?}", level);
1007 println!("");
1008
1009 println!(" System Information:");
1011 println!(" OS: {}", std::env::consts::OS);
1012 println!(" Arch: {}", std::env::consts::ARCH);
1013 println!(" Air Version: {}", VERSION);
1014 println!("");
1015
1016 match attempt_daemon_connection().await {
1017 Ok(_) => {
1018 println!(" Daemon: ✅ Running");
1019 },
1020 Err(e) => {
1021 println!(" Daemon: ❌ Running");
1022 println!(" Error: {}", e);
1023 },
1024 }
1025
1026 println!("");
1027 println!(" TODO: Implement diagnostics:");
1028 println!(" - Thread dump");
1029 println!(" - Memory profiling");
1030 println!(" - Lock contention analysis");
1031 println!(" - Resource leak detection");
1032 println!(" - Performance bottlenecks");
1033
1034 Ok(())
1035 },
1036 }
1037 },
1038 }
1039}
1040
1041fn validate_command(cmd:&Command) -> Result<(), String> {
1048 match cmd {
1049 Command::Help { command } => {
1050 if let Some(cmd) = command {
1051 if cmd.len() > 128 {
1052 return Err("Command name too long (max: 128)".to_string());
1053 }
1054 }
1055 },
1056 _ => {},
1057 }
1058 Ok(())
1059}
1060
1061async fn attempt_daemon_connection() -> Result<(), String> {
1072 use tokio::{
1073 net::TcpStream,
1074 time::{Duration, timeout},
1075 };
1076
1077 let addr = DefaultBindAddress;
1078
1079 let connection_result = timeout(Duration::from_secs(5), async { TcpStream::connect(addr).await }).await;
1081
1082 match connection_result {
1083 Ok(Ok(_)) => Ok(()),
1084 Ok(Err(e)) => Err(format!("Connection failed: {}", e)),
1085 Err(_) => Err("Connection timeout (5s)".to_string()),
1086 }
1087}
1088
1089fn HandleMetricsRequest() -> String {
1109 let timeout_duration = std::time::Duration::from_millis(100);
1111
1112 let metrics_collector = Metrics::GetMetrics();
1113
1114 let export_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| metrics_collector.ExportMetrics()));
1116
1117 match export_result {
1118 Ok(Ok(metrics_text)) => {
1119 if metrics_text.len() > 10_000_000 {
1121 error!(
1122 "[Metrics] Exported metrics unreasonably large (size: {} bytes)",
1123 metrics_text.len()
1124 );
1125 format!("# ERROR: Metrics export too large (max: 10MB)\n")
1126 } else {
1127 metrics_text
1128 }
1129 },
1130 Ok(Err(e)) => {
1131 error!("[Metrics] Failed to export metrics: {}", e);
1132 format!("# ERROR: Failed to export metrics: {}\n", e)
1133 },
1134 Err(_) => {
1135 error!("[Metrics] Metrics export panicked");
1136 format!("# ERROR: Metrics export failed due to internal error\n")
1137 },
1138 }
1139}
1140
1141async fn Main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1182 InitializeLogging();
1186
1187 info!("[Boot] ===========================================");
1188 info!("[Boot] Starting Air Daemon 🪁");
1189 info!("[Boot] ===========================================");
1190 info!("[Boot] Version: {} ({})", env!("CARGO_PKG_VERSION"), env!("CARGO_PKG_NAME"));
1191 let build_timestamp = env::var("BUILD_TIMESTAMP").unwrap_or_else(|_| "unknown".to_string());
1192 info!("[Boot] Build: {}", build_timestamp);
1193 info!("[Boot] Target: {}-{}", std::env::consts::OS, std::env::consts::ARCH);
1194
1195 info!("[Boot] Validating environment...");
1199
1200 if let Err(e) = validate_environment().await {
1201 error!("[Boot] Environment validation failed: {}", e);
1202 return Err(format!("Environment validation failed: {}", e).into());
1203 }
1204
1205 info!("[Boot] Environment validation passed");
1206
1207 Trace!("[Boot] [Observability] Initializing observability systems...");
1211
1212 if let Err(e) = Metrics::InitializeMetrics() {
1214 error!("[Boot] Failed to initialize metrics: {}", e);
1215 } else {
1217 info!("[Boot] [Observability] Metrics system initialized");
1218 }
1219
1220 if let Err(e) = Tracing::initialize_tracing(None) {
1222 error!("[Boot] Failed to initialize tracing: {}", e);
1223 } else {
1225 info!("[Boot] [Observability] Tracing system initialized");
1226 }
1227
1228 info!("[Boot] [Observability] Observability systems initialized");
1229
1230 Trace!("[Boot] [Args] Parsing command line arguments...");
1234
1235 let (config_path, bind_address, cli_command) = ParseArguments();
1236
1237 if let Some(cmd) = cli_command {
1239 info!("[Boot] CLI command detected, executing...");
1240 let result = HandleCommand(cmd).await;
1241
1242 match &result {
1243 Ok(_) => {
1244 info!("[Boot] CLI command completed successfully");
1245 std::process::exit(0);
1246 },
1247 Err(e) => {
1248 error!("[Boot] CLI command failed: {}", e);
1249 std::process::exit(1);
1250 },
1251 }
1252 }
1253
1254 Trace!("[Boot] [Configuration] Loading configuration...");
1258
1259 let config_manager = match ConfigurationManager::New(config_path) {
1260 Ok(cm) => cm,
1261 Err(e) => {
1262 error!("[Boot] Failed to create configuration manager: {}", e);
1263 return Err(format!("Configuration manager initialization failed: {}", e).into());
1264 },
1265 };
1266
1267 let configuration:std::sync::Arc<AirLibrary::Configuration::AirConfiguration> =
1269 match tokio::time::timeout(Duration::from_secs(10), config_manager.LoadConfiguration()).await {
1270 Ok(Ok(config)) => {
1271 info!("[Boot] [Configuration] Configuration loaded successfully");
1272 std::sync::Arc::new(config)
1273 },
1274 Ok(Err(e)) => {
1275 error!("[Boot] Failed to load configuration: {}", e);
1276 return Err(format!("Configuration load failed: {}", e).into());
1277 },
1278 Err(_) => {
1279 error!("[Boot] Configuration load timed out");
1280 return Err("Configuration load timed out".into());
1281 },
1282 };
1283
1284 validate_configuration(&configuration)?;
1286
1287 Trace!("[Boot] [Daemon] Initializing daemon lifecycle management...");
1291
1292 let daemon_manager = match DaemonManager::New(None) {
1293 Ok(dm) => dm,
1294 Err(e) => {
1295 error!("[Boot] Failed to create daemon manager: {}", e);
1296 return Err(format!("Daemon manager initialization failed: {}", e).into());
1297 },
1298 };
1299
1300 match tokio::time::timeout(Duration::from_secs(5), daemon_manager.AcquireLock()).await {
1302 Ok(Ok(_)) => {
1303 info!("[Boot] [Daemon] Daemon lock acquired successfully");
1304 },
1305 Ok(Err(e)) => {
1306 error!("[Boot] Failed to acquire daemon lock: {}", e);
1307 error!("[Boot] Another instance may already be running");
1308 return Err(format!("Daemon lock acquisition failed: {}", e).into());
1309 },
1310 Err(_) => {
1311 error!("[Boot] Daemon lock acquisition timed out");
1312 return Err("Daemon lock acquisition timed out".into());
1313 },
1314 }
1315
1316 Trace!("[Boot] [Health] Initializing health check system...");
1320
1321 let health_manager:std::sync::Arc<HealthCheckManager> = Arc::new(HealthCheckManager::new(None));
1322
1323 info!("[Boot] [Health] Health check system initialized");
1324
1325 Trace!("[Boot] [State] Initializing application state...");
1329
1330 let AppState:std::sync::Arc<ApplicationState> =
1331 match tokio::time::timeout(Duration::from_secs(10), ApplicationState::New(configuration.clone())).await {
1332 Ok(Ok(state)) => {
1333 info!("[Boot] [State] Application state initialized");
1334 Arc::new(state)
1335 },
1336 Ok(Err(e)) => {
1337 error!("[Boot] Failed to initialize application state: {}", e);
1338 let _ = daemon_manager.ReleaseLock().await;
1340 return Err(format!("Application state initialization failed: {}", e).into());
1341 },
1342 Err(_) => {
1343 error!("[Boot] Application state initialization timed out");
1344 let _ = daemon_manager.ReleaseLock().await;
1345 return Err("Application state initialization timed out".into());
1346 },
1347 };
1348
1349 Trace!("[Boot] [Services] Initializing core services...");
1353
1354 let auth_service:std::sync::Arc<AuthenticationService> =
1356 match tokio::time::timeout(Duration::from_secs(10), AuthenticationService::new(AppState.clone())).await {
1357 Ok(Ok(svc)) => Arc::new(svc),
1358 Ok(Err(e)) => {
1359 error!("[Boot] Failed to initialize authentication service: {}", e);
1360 return Err(format!("Authentication service initialization failed: {}", e).into());
1361 },
1362 Err(_) => {
1363 error!("[Boot] Authentication service initialization timed out");
1364 return Err("Authentication service initialization timed out".into());
1365 },
1366 };
1367
1368 let update_manager:std::sync::Arc<UpdateManager> =
1369 match tokio::time::timeout(Duration::from_secs(10), UpdateManager::new(AppState.clone())).await {
1370 Ok(Ok(svc)) => Arc::new(svc),
1371 Ok(Err(e)) => {
1372 error!("[Boot] Failed to initialize update manager: {}", e);
1373 return Err(format!("Update manager initialization failed: {}", e).into());
1374 },
1375 Err(_) => {
1376 error!("[Boot] Update manager initialization timed out");
1377 return Err("Update manager initialization timed out".into());
1378 },
1379 };
1380
1381 let download_manager:std::sync::Arc<DownloadManager> =
1382 match tokio::time::timeout(Duration::from_secs(10), DownloadManager::new(AppState.clone())).await {
1383 Ok(Ok(svc)) => Arc::new(svc),
1384 Ok(Err(e)) => {
1385 error!("[Boot] Failed to initialize download manager: {}", e);
1386 return Err(format!("Download manager initialization failed: {}", e).into());
1387 },
1388 Err(_) => {
1389 error!("[Boot] Download manager initialization timed out");
1390 return Err("Download manager initialization timed out".into());
1391 },
1392 };
1393
1394 let file_indexer:std::sync::Arc<FileIndexer> =
1395 match tokio::time::timeout(Duration::from_secs(10), FileIndexer::new(AppState.clone())).await {
1396 Ok(Ok(svc)) => Arc::new(svc),
1397 Ok(Err(e)) => {
1398 error!("[Boot] Failed to initialize file indexer: {}", e);
1399 return Err(format!("File indexer initialization failed: {}", e).into());
1400 },
1401 Err(_) => {
1402 error!("[Boot] File indexer initialization timed out");
1403 return Err("File indexer initialization timed out".into());
1404 },
1405 };
1406
1407 info!("[Boot] [Services] All core services initialized successfully");
1408
1409 Trace!("[Boot] [Health] Registering services for health monitoring...");
1413
1414 let service_registrations = vec![
1416 ("authentication", HealthCheckLevel::Functional),
1417 ("updates", HealthCheckLevel::Functional),
1418 ("downloader", HealthCheckLevel::Functional),
1419 ("indexing", HealthCheckLevel::Functional),
1420 ("grpc", HealthCheckLevel::Responsive),
1421 ("connections", HealthCheckLevel::Alive),
1422 ];
1423
1424 for (service_name, level) in service_registrations {
1425 match tokio::time::timeout(
1426 Duration::from_secs(5),
1427 health_manager.RegisterService(service_name.to_string(), level),
1428 )
1429 .await
1430 {
1431 Ok(result) => {
1432 match result {
1433 Ok(_) => {
1434 debug!("[Boot] [Health] Registered service: {}", service_name);
1435 },
1436 Err(e) => {
1437 warn!("[Boot] Failed to register service {}: {}", service_name, e);
1438 },
1441 }
1442 },
1443 Err(_) => {
1444 warn!("[Boot] Service registration timed out: {}", service_name);
1445 },
1446 }
1447 }
1448
1449 info!("[Boot] [Health] Service health monitoring configured");
1450
1451 Trace!("[Boot] [Vine] Initializing gRPC server...");
1455
1456 let bind_addr:SocketAddr = match bind_address {
1458 Some(addr) => {
1459 match addr.parse() {
1460 Ok(parsed) => {
1461 info!("[Boot] [Vine] Using custom bind address: {}", parsed);
1462 parsed
1463 },
1464 Err(e) => {
1465 error!("[Boot] Invalid bind address '{}': {}", addr, e);
1466 return Err(format!("Invalid bind address: {}", e).into());
1467 },
1468 }
1469 },
1470 None => {
1471 match DefaultBindAddress.parse() {
1472 Ok(parsed) => parsed,
1473 Err(e) => {
1474 error!("[Boot] Invalid default bind address '{}': {}", DefaultBindAddress, e);
1475 return Err(format!("Invalid default bind address: {}", e).into());
1476 },
1477 }
1478 },
1479 };
1480
1481 info!("[Boot] [Vine] Configuring gRPC server on {}", bind_addr);
1482
1483 let vine_service = AirVinegRPCService::new(
1485 AppState.clone(),
1486 auth_service.clone(),
1487 update_manager.clone(),
1488 download_manager.clone(),
1489 file_indexer.clone(),
1490 );
1491
1492 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
1494
1495 let server_handle:tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> =
1497 tokio::spawn(async move {
1498 info!("[Vine] Starting gRPC server on {}", bind_addr);
1499
1500 let svc = AirServiceServer::new(vine_service);
1501
1502 let server = tonic::transport::Server::builder()
1503 .add_service(svc)
1504 .serve_with_shutdown(bind_addr, async {
1505 let _ = shutdown_rx.await;
1507 info!("[Vine] Shutdown signal received, stopping server...");
1508 });
1509
1510 info!("[Vine] gRPC server listening on {}", bind_addr);
1511
1512 match server.await {
1513 Ok(_) => {
1514 info!("[Vine] gRPC server stopped cleanly");
1515 Ok(())
1516 },
1517 Err(e) => {
1518 error!("[Vine] gRPC server error: {}", e);
1519 Err(e.into())
1520 },
1521 }
1522 });
1523
1524 tokio::time::sleep(Duration::from_millis(100)).await;
1526
1527 if server_handle.is_finished() {
1529 error!("[Boot] gRPC server failed to start");
1530 let _ = daemon_manager.ReleaseLock().await;
1531 return Err("gRPC server failed to start".into());
1532 }
1533
1534 Trace!("[Boot] [Monitoring] Starting background monitoring tasks...");
1538
1539 let connection_monitor_handle:tokio::task::JoinHandle<()> = tokio::spawn({
1541 let AppState = AppState.clone();
1542 let health_manager = health_manager.clone();
1543 async move {
1544 let mut interval = interval(Duration::from_secs(60)); loop {
1546 interval.tick().await;
1547
1548 if let Err(e) = AppState.UpdateResourceUsage().await {
1550 warn!("[ConnectionMonitor] Failed to update resource usage: {}", e);
1551 }
1552
1553 let resources = AppState.GetResourceUsage().await;
1555
1556 let metrics_collector = Metrics::GetMetrics();
1558 metrics_collector.UpdateResourceMetrics(
1559 (resources.MemoryUsageMb * 1024.0 * 1024.0) as u64, resources.CPUUsagePercent,
1561 AppState.GetActiveConnectionCount().await as u64,
1562 0, );
1564
1565 if let Err(e) = AppState.CleanupStaleConnections(300).await {
1567 warn!("[ConnectionMonitor] Failed to cleanup stale connections: {}", e);
1568 }
1569
1570 match health_manager.CheckService("connections").await {
1572 Ok(_) => {},
1573 Err(e) => {
1574 warn!("[ConnectionMonitor] Health check failed: {}", e);
1575
1576 let metrics_collector = Metrics::GetMetrics();
1578 metrics_collector.RecordRequestFailure("health_check_failed", 0.0);
1579 },
1580 }
1581
1582 debug!(
1583 "[ConnectionMonitor] Active connections: {}",
1584 AppState.GetActiveConnectionCount().await
1585 );
1586 }
1587 }
1588 });
1589
1590 if let Err(e) = AppState.RegisterBackgroundTask(connection_monitor_handle).await {
1592 warn!("[Boot] Failed to register connection monitor: {}", e);
1593 }
1595
1596 let health_monitor_handle:tokio::task::JoinHandle<()> = tokio::spawn({
1598 let health_manager = health_manager.clone();
1599 async move {
1600 let mut interval = interval(Duration::from_secs(30)); loop {
1602 interval.tick().await;
1603
1604 let services = ["authentication", "updates", "downloader", "indexing", "grpc"];
1606 for service in services.iter() {
1607 if let Err(e) = health_manager.CheckService(service).await {
1608 warn!("[HealthMonitor] Health check failed for {}: {}", service, e);
1609 }
1610 }
1611
1612 let overall_health = health_manager.GetOverallHealth().await;
1614 debug!("[HealthMonitor] Overall health: {:?}", overall_health);
1615 }
1616 }
1617 });
1618
1619 if let Err(e) = AppState.RegisterBackgroundTask(health_monitor_handle).await {
1621 warn!("[Boot] Failed to register health monitor: {}", e);
1622 }
1624
1625 Trace!("[Boot] [Startup] Starting background services...");
1629
1630 let auth_handle = auth_service.StartBackgroundTasks().await?;
1632 let update_handle = update_manager.StartBackgroundTasks().await?;
1633 let download_handle = download_manager.StartBackgroundTasks().await?;
1634 let _indexing_handle = None::<tokio::task::JoinHandle<()>>;
1636
1637 info!("[Boot] [Startup] All services started successfully");
1638
1639 info!("===========================================");
1643 info!("[Runtime] Air Daemon 🪁 is now running");
1644 info!("[Runtime] Listening on {} for Mountain connections", bind_addr);
1645 info!("[Runtime] Protocol Version: {}", ProtocolVersion);
1646 info!("[Runtime] Cocoon Port: 50052");
1647 info!("===========================================");
1648 info!("");
1649 info!("Running. Press Ctrl+C to stop.");
1650 info!("");
1651
1652 WaitForShutdownSignal().await;
1654
1655 info!("[Shutdown] Signaling gRPC server to stop...");
1657 let _ = shutdown_tx.send(());
1658
1659 match tokio::time::timeout(Duration::from_secs(30), server_handle).await {
1661 Ok(Ok(Ok(_))) => {
1662 info!("[Shutdown] gRPC server stopped normally");
1663 },
1664 Ok(Ok(Err(e))) => {
1665 warn!("[Shutdown] gRPC server stopped with error: {}", e);
1666 },
1667 Ok(Err(e)) => {
1668 warn!("[Shutdown] gRPC server task panicked: {:?}", e);
1669 },
1670 Err(_) => {
1671 warn!("[Shutdown] gRPC server shutdown timed out");
1672 },
1673 }
1674
1675 info!("===========================================");
1679 info!("[Shutdown] Initiating graceful shutdown...");
1680 info!("===========================================");
1681
1682 info!("[Shutdown] Stopping background tasks...");
1684 if let Err(_) =
1685 tokio::time::timeout(Duration::from_secs(10), async { AppState.StopAllBackgroundTasks().await }).await
1686 {
1687 warn!("[Shutdown] Background tasks stop timed out or failed");
1688 }
1689
1690 info!("[Shutdown] Stopping background services...");
1692 auth_service.StopBackgroundTasks().await;
1693 update_manager.StopBackgroundTasks().await;
1694 download_manager.StopBackgroundTasks().await;
1695
1696 info!("[Shutdown] Waiting for services to complete...");
1698 let _ = tokio::time::timeout(Duration::from_secs(10), async {
1699 let _ = auth_handle.await;
1700 let _ = update_handle.await;
1701 let _ = download_handle.await;
1702 })
1703 .await;
1704
1705 info!("[Shutdown] Collecting final statistics...");
1707
1708 let metrics = AppState.GetMetrics().await;
1709 let resources = AppState.GetResourceUsage().await;
1710 let health_stats:HealthStatistics = health_manager.GetHealthStatistics().await;
1711
1712 let metrics_data = Metrics::GetMetrics().GetMetricsData();
1714
1715 info!("===========================================");
1716 info!("[Shutdown] Final Statistics");
1717 info!("===========================================");
1718 info!("[Shutdown] Requests:");
1719 info!(" - Successful: {}", metrics.SuccessfulRequests);
1720 info!(" - Failed: {}", metrics.FailedRequests);
1721 info!("[Shutdown] Metrics:");
1722 info!(" - Success rate: {:.2}%", metrics_data.SuccessRate());
1723 info!(" - Error rate: {:.2}%", metrics_data.ErrorRate());
1724 info!("[Shutdown] Resources:");
1725 info!(" - Memory: {:.2} MB", resources.MemoryUsageMb);
1726 info!(" - CPU: {:.2}%", resources.CPUUsagePercent);
1727 info!("[Shutdown] Health:");
1728 info!(" - Overall: {:.2}%", health_stats.OverallHealthPercentage());
1729 info!(
1730 " - Healthy services: {}/{}",
1731 health_stats.HealthyServices, health_stats.TotalServices
1732 );
1733 info!("===========================================");
1734
1735 info!("[Shutdown] Releasing daemon lock...");
1737 if let Err(e) = daemon_manager.ReleaseLock().await {
1738 warn!("[Shutdown] Failed to release daemon lock: {}", e);
1739 }
1740
1741 info!("[Shutdown] All services stopped");
1742 info!("[Shutdown] Air Daemon 🪁 has shut down gracefully");
1743 info!("===========================================");
1744
1745 Ok(())
1746}
1747
1748async fn validate_environment() -> Result<(), String> {
1757 info!("[Environment] OS: {}, Arch: {}", std::env::consts::OS, std::env::consts::ARCH);
1759
1760 if let Ok(home) = std::env::var("HOME") {
1762 if home.is_empty() {
1763 return Err("HOME environment variable is not set".to_string());
1764 }
1765 }
1766
1767 let lock_path = "/tmp/Air-test-lock.tmp";
1769 if std::fs::write(lock_path, b"test").is_err() {
1770 return Err("Cannot write to /tmp directory".to_string());
1771 }
1772 let _ = std::fs::remove_file(lock_path);
1773
1774 Ok(())
1775}
1776
1777fn validate_configuration(config:&AirConfiguration) -> Result<(), String> {
1786 debug!("[Config] Configuration passed basic validation");
1788 Ok(())
1789}
1790
1791#[tokio::main]
1792async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { Main().await }