AirLibrary/Vine/Server/
AirVinegRPCService.rs

1//! # Air Vine gRPC Service
2//!
3//! Defines the gRPC service implementation for Air. This struct handles
4//! incoming RPC calls from Mountain, dispatches them to the appropriate
5//! services (authentication, updates, downloads, indexing), and returns
6//! the results.
7
8use std::{collections::HashMap, sync::Arc};
9
10use log::{debug, error, info, warn};
11use tonic::{Request, Response, Status};
12use futures::StreamExt;
13use tokio_stream::StreamExt as TokioStreamExt;
14use async_trait::async_trait;
15
16use crate::{
17	AirError,
18	ApplicationState::ApplicationState,
19	Authentication::AuthenticationService,
20	Downloader::DownloadManager,
21	Indexing::{
22		FileIndexer,
23		Store::QueryIndex::{SearchMode, SearchQuery},
24	},
25	Result,
26	Updates::UpdateManager,
27	Utility::CurrentTimestamp,
28	Vine::Generated::{
29		air as air_generated,
30		air::{
31			ApplyUpdateRequest,
32			ApplyUpdateResponse,
33			AuthenticationRequest,
34			AuthenticationResponse,
35			ConfigurationRequest,
36			ConfigurationResponse,
37			DownloadRequest,
38			DownloadResponse,
39			DownloadStreamRequest,
40			DownloadStreamResponse,
41			FileInfoRequest,
42			FileInfoResponse,
43			FileResult,
44			HealthCheckRequest,
45			HealthCheckResponse,
46			IndexRequest,
47			IndexResponse,
48			MetricsRequest,
49			MetricsResponse,
50			ResourceLimitsRequest,
51			ResourceLimitsResponse,
52			ResourceUsageRequest,
53			ResourceUsageResponse,
54			SearchRequest,
55			SearchResponse,
56			StatusRequest,
57			StatusResponse,
58			UpdateCheckRequest,
59			UpdateCheckResponse,
60			UpdateConfigurationRequest,
61			UpdateConfigurationResponse,
62			air_service_server::AirService,
63		},
64	},
65};
66
67/// The concrete implementation of the Air gRPC service
68#[derive(Clone)]
69pub struct AirVinegRPCService {
70	/// Application state
71	AppState:Arc<ApplicationState>,
72
73	/// Authentication service
74	AuthService:Arc<AuthenticationService>,
75
76	/// Update manager
77	UpdateManager:Arc<UpdateManager>,
78
79	/// Download manager
80	DownloadManager:Arc<DownloadManager>,
81
82	/// File indexer
83	FileIndexer:Arc<FileIndexer>,
84
85	/// Connection tracking
86	ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
87}
88
89/// Connection metadata for tracking client state
90#[derive(Debug, Clone)]
91struct ConnectionMetadata {
92	pub ClientId:String,
93	pub ClientVersion:String,
94	pub ProtocolVersion:u32,
95	pub LastRequestTime:u64,
96	pub RequestCount:u64,
97	pub ConnectionType:crate::ApplicationState::ConnectionType,
98}
99
100impl AirVinegRPCService {
101	/// Creates a new instance of the Air gRPC service
102	pub fn new(
103		AppState:Arc<ApplicationState>,
104		AuthService:Arc<AuthenticationService>,
105		UpdateManager:Arc<UpdateManager>,
106		DownloadManager:Arc<DownloadManager>,
107		FileIndexer:Arc<FileIndexer>,
108	) -> Self {
109		info!("[AirVinegRPCService] New instance created");
110
111		Self {
112			AppState,
113			AuthService,
114			UpdateManager,
115			DownloadManager,
116			FileIndexer,
117			ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
118		}
119	}
120
121	/// Track connection for a request
122	async fn TrackConnection<RequestType>(
123		&self,
124		Request:&tonic::Request<RequestType>,
125		ServiceName:&str,
126	) -> std::result::Result<String, Status> {
127		let Metadata = Request.metadata();
128		let ConnectionId = Metadata
129			.get("connection-id")
130			.map(|v| v.to_str().unwrap_or_default().to_string())
131			.unwrap_or_else(|| crate::Utility::GenerateRequestId());
132
133		let ClientId = Metadata
134			.get("client-id")
135			.map(|v| v.to_str().unwrap_or_default().to_string())
136			.unwrap_or_else(|| "unknown".to_string());
137
138		let ClientVersion = Metadata
139			.get("client-version")
140			.map(|v| v.to_str().unwrap_or_default().to_string())
141			.unwrap_or_else(|| "unknown".to_string());
142
143		let ProtocolVersion = Metadata
144			.get("protocol-version")
145			.map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
146			.unwrap_or(1);
147
148		// Update connection tracking
149		let mut Connections = self.ActiveConnections.write().await;
150		let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
151			ConnectionMetadata {
152				ClientId:ClientId.clone(),
153				ClientVersion:ClientVersion.clone(),
154				ProtocolVersion,
155				LastRequestTime:crate::Utility::CurrentTimestamp(),
156				RequestCount:0,
157				ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
158			}
159		});
160
161		ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
162		ConnectionMetadata.RequestCount += 1;
163
164		// Register connection with application state
165		self.AppState
166			.RegisterConnection(
167				ConnectionId.clone(),
168				ClientId,
169				ClientVersion,
170				ProtocolVersion,
171				crate::ApplicationState::ConnectionType::MountainMain,
172			)
173			.await
174			.map_err(|e| Status::internal(e.to_string()))?;
175
176		Ok(ConnectionId)
177	}
178
179	/// Validate protocol version compatibility
180	fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
181		if ClientVersion > crate::ProtocolVersion {
182			return Err(Status::failed_precondition(format!(
183				"Client protocol version {} is newer than server version {}",
184				ClientVersion,
185				crate::ProtocolVersion
186			)));
187		}
188
189		if ClientVersion < crate::ProtocolVersion {
190			warn!(
191				"Client using older protocol version {} (server: {})",
192				ClientVersion,
193				crate::ProtocolVersion
194			);
195		}
196
197		Ok(())
198	}
199}
200
201#[async_trait]
202impl AirService for AirVinegRPCService {
203	/// Handle authentication requests from Mountain
204	async fn authenticate(
205		&self,
206		Request:Request<AuthenticationRequest>,
207	) -> std::result::Result<Response<AuthenticationResponse>, Status> {
208		// Track connection and validate protocol (before consuming Request)
209		let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
210
211		let RequestData = Request.into_inner();
212		let request_id = RequestData.request_id.clone();
213
214		info!(
215			"[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
216			request_id, ConnectionId
217		);
218
219		self.AppState
220			.RegisterRequest(request_id.clone(), "authentication".to_string())
221			.await
222			.map_err(|e| Status::internal(e.to_string()))?;
223
224		// Additional security validation
225		if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
226			let ErrorMessage = "Invalid authentication parameters".to_string();
227			self.AppState
228				.UpdateRequestStatus(
229					&request_id,
230					crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
231					None,
232				)
233				.await
234				.ok();
235
236			return Ok(Response::new(air_generated::AuthenticationResponse {
237				request_id,
238				success:false,
239				token:String::new(),
240				error:ErrorMessage,
241			}));
242		}
243
244		// Clone needed data before moving RequestData
245		let username_for_log = RequestData.username.clone();
246		let password = RequestData.password;
247		let provider = RequestData.provider;
248
249		let result = self
250			.AuthService
251			.AuthenticateUser(RequestData.username, password, provider)
252			.await;
253
254		match result {
255			Ok(token) => {
256				self.AppState
257					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
258					.await
259					.ok();
260
261				// Log successful authentication
262				info!(
263					"[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
264					username_for_log, ConnectionId
265				);
266
267				Ok(Response::new(air_generated::AuthenticationResponse {
268					request_id,
269					success:true,
270					token,
271					error:String::new(),
272				}))
273			},
274			Err(e) => {
275				self.AppState
276					.UpdateRequestStatus(
277						&request_id,
278						crate::ApplicationState::RequestState::Failed(e.to_string()),
279						None,
280					)
281					.await
282					.ok();
283
284				// Log failed authentication attempt
285				warn!(
286					"[AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
287					username_for_log, ConnectionId, e
288				);
289
290				Ok(Response::new(air_generated::AuthenticationResponse {
291					request_id,
292					success:false,
293					token:String::new(),
294					error:e.to_string(),
295				}))
296			},
297		}
298	}
299
300	/// Handle update check requests from Mountain
301	async fn check_for_updates(
302		&self,
303		request:Request<UpdateCheckRequest>,
304	) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
305		let RequestData = request.into_inner();
306		let request_id = RequestData.request_id.clone();
307
308		info!(
309			"[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
310			request_id, RequestData.current_version, RequestData.channel
311		);
312
313		self.AppState
314			.RegisterRequest(request_id.clone(), "updates".to_string())
315			.await
316			.map_err(|e| Status::internal(e.to_string()))?;
317
318		// Validate CurrentVersion
319		if RequestData.current_version.is_empty() {
320			let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
321			self.AppState
322				.UpdateRequestStatus(
323					&request_id,
324					crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
325					None,
326				)
327				.await
328				.ok();
329			return Err(Status::invalid_argument(ErrorMessage.to_string()));
330		}
331
332		// Validate channel
333		let ValidChannels = ["stable", "beta", "nightly"];
334		let Channel = if RequestData.channel.is_empty() {
335			"stable".to_string()
336		} else {
337			RequestData.channel.clone()
338		};
339		if !ValidChannels.contains(&Channel.as_str()) {
340			let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
341			self.AppState
342				.UpdateRequestStatus(
343					&request_id,
344					crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
345					None,
346				)
347				.await
348				.ok();
349			return Err(Status::invalid_argument(ErrorMessage));
350		}
351
352		// Check for updates using UpdateManager
353		let result = self.UpdateManager.CheckForUpdates().await;
354
355		match result {
356			Ok(UpdateInfo) => {
357				self.AppState
358					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
359					.await
360					.ok();
361
362				info!(
363					"[AirVinegRPCService] Update check successful - Available: {}",
364					UpdateInfo.is_some()
365				);
366
367				Ok(Response::new(air_generated::UpdateCheckResponse {
368					request_id,
369					update_available:UpdateInfo.is_some(),
370					version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
371					download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
372					release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
373					error:String::new(),
374				}))
375			},
376			Err(crate::AirError::Network(e)) => {
377				self.AppState
378					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
379					.await
380					.ok();
381				error!("[AirVinegRPCService] Network error during update check: {}", e);
382				Err(Status::unavailable(e))
383			},
384			Err(e) => {
385				self.AppState
386					.UpdateRequestStatus(
387						&request_id,
388						crate::ApplicationState::RequestState::Failed(e.to_string()),
389						None,
390					)
391					.await
392					.ok();
393				error!("[AirVinegRPCService] Update check failed: {}", e);
394				Ok(Response::new(air_generated::UpdateCheckResponse {
395					request_id,
396					update_available:false,
397					version:String::new(),
398					download_url:String::new(),
399					release_notes:String::new(),
400					error:e.to_string(),
401				}))
402			},
403		}
404	}
405
406	/// Handle download requests from Mountain
407	async fn download_file(
408		&self,
409		request:Request<DownloadRequest>,
410	) -> std::result::Result<Response<DownloadResponse>, Status> {
411		let RequestData = request.into_inner();
412		let request_id = RequestData.request_id.clone();
413
414		info!(
415			"[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
416			request_id, RequestData.url
417		);
418
419		// Request ID for tracking (use provided or generate)
420		let download_request_id = if request_id.is_empty() {
421			crate::Utility::GenerateRequestId()
422		} else {
423			request_id.clone()
424		};
425
426		self.AppState
427			.RegisterRequest(download_request_id.clone(), "downloader".to_string())
428			.await
429			.map_err(|e| Status::internal(e.to_string()))?;
430
431		// Validate URL
432		if RequestData.url.is_empty() {
433			let error_msg = "URL cannot be empty".to_string();
434			self.AppState
435				.UpdateRequestStatus(
436					&download_request_id,
437					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
438					None,
439				)
440				.await
441				.ok();
442			return Ok(Response::new(DownloadResponse {
443				request_id:download_request_id,
444				success:false,
445				file_path:String::new(),
446				file_size:0,
447				checksum:String::new(),
448				error:error_msg,
449			}));
450		}
451
452		// Validate URL format
453		if !match_url_scheme(&RequestData.url) {
454			let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
455			self.AppState
456				.UpdateRequestStatus(
457					&download_request_id,
458					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
459					None,
460				)
461				.await
462				.ok();
463			return Ok(Response::new(DownloadResponse {
464				request_id:download_request_id,
465				success:false,
466				file_path:String::new(),
467				file_size:0,
468				checksum:String::new(),
469				error:error_msg,
470			}));
471		}
472
473		// Validate or use cache directory
474		let DestinationPath = if RequestData.destination_path.is_empty() {
475			// Use cache directory from configuration
476			let config = &self.AppState.Configuration.Downloader;
477			config.CacheDirectory.clone()
478		} else {
479			RequestData.destination_path.clone()
480		};
481
482		// Validate target directory exists
483		let dest_path = std::path::Path::new(&DestinationPath);
484		if let Some(parent) = dest_path.parent() {
485			if !parent.exists() {
486				match tokio::fs::create_dir_all(parent).await {
487					Ok(_) => {
488						debug!("[AirVinegRPCService] Created destination directory: {}", parent.display());
489					},
490					Err(e) => {
491						let error_msg = format!("Failed to create destination directory: {}", e);
492						self.AppState
493							.UpdateRequestStatus(
494								&download_request_id,
495								crate::ApplicationState::RequestState::Failed(error_msg.clone()),
496								None,
497							)
498							.await
499							.ok();
500						return Ok(Response::new(DownloadResponse {
501							request_id:download_request_id,
502							success:false,
503							file_path:String::new(),
504							file_size:0,
505							checksum:String::new(),
506							error:error_msg,
507						}));
508					},
509				}
510			}
511		}
512
513		// Set up granular progress callback mechanism
514		let download_manager = self.DownloadManager.clone();
515		let AppState = self.AppState.clone();
516		let callback_request_id = download_request_id.clone();
517		let progress_callback = move |progress:f32| {
518			let state = AppState.clone();
519			let id = callback_request_id.clone();
520			tokio::spawn(async move {
521				let _ = state
522					.UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
523					.await;
524			});
525		};
526
527		// Perform download with retry and progress tracking
528		let result = self
529			.download_file_with_retry(
530				&download_request_id,
531				RequestData.url.clone(),
532				DestinationPath,
533				RequestData.checksum,
534				Some(Box::new(progress_callback)),
535			)
536			.await;
537
538		match result {
539			Ok(file_info) => {
540				self.AppState
541					.UpdateRequestStatus(
542						&download_request_id,
543						crate::ApplicationState::RequestState::Completed,
544						Some(100.0),
545					)
546					.await
547					.ok();
548
549				info!(
550					"[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
551					download_request_id, file_info.size
552				);
553
554				Ok(Response::new(DownloadResponse {
555					request_id:download_request_id,
556					success:true,
557					file_path:file_info.path,
558					file_size:file_info.size,
559					checksum:file_info.checksum,
560					error:String::new(),
561				}))
562			},
563			Err(e) => {
564				self.AppState
565					.UpdateRequestStatus(
566						&download_request_id,
567						crate::ApplicationState::RequestState::Failed(e.to_string()),
568						None,
569					)
570					.await
571					.ok();
572
573				error!(
574					"[AirVinegRPCService] Download failed [ID: {}] - Error: {}",
575					download_request_id, e
576				);
577
578				Ok(Response::new(DownloadResponse {
579					request_id:download_request_id,
580					success:false,
581					file_path:String::new(),
582					file_size:0,
583					checksum:String::new(),
584					error:e.to_string(),
585				}))
586			},
587		}
588	}
589
590	/// Handle file indexing requests from Mountain
591	async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
592		let RequestData = request.into_inner();
593		let request_id = RequestData.request_id;
594
595		info!(
596			"[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
597			request_id, RequestData.path
598		);
599
600		self.AppState
601			.RegisterRequest(request_id.clone(), "indexing".to_string())
602			.await
603			.map_err(|e| Status::internal(e.to_string()))?;
604
605		let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
606
607		match result {
608			Ok(index_info) => {
609				self.AppState
610					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
611					.await
612					.ok();
613
614				Ok(Response::new(air_generated::IndexResponse {
615					request_id,
616					success:true,
617					files_indexed:index_info.files_indexed,
618					total_size:index_info.total_size,
619					error:String::new(),
620				}))
621			},
622			Err(e) => {
623				self.AppState
624					.UpdateRequestStatus(
625						&request_id,
626						crate::ApplicationState::RequestState::Failed(e.to_string()),
627						None,
628					)
629					.await
630					.ok();
631
632				Ok(Response::new(air_generated::IndexResponse {
633					request_id,
634					success:false,
635					files_indexed:0,
636					total_size:0,
637					error:e.to_string(),
638				}))
639			},
640		}
641	}
642
643	/// Handle status check requests from Mountain
644	async fn get_status(
645		&self,
646		request:Request<StatusRequest>,
647	) -> std::result::Result<Response<StatusResponse>, Status> {
648		let RequestData = request.into_inner();
649
650		debug!("[AirVinegRPCService] Status request received");
651
652		let metrics = self.AppState.GetMetrics().await;
653		let resources = self.AppState.GetResourceUsage().await;
654
655		Ok(Response::new(air_generated::StatusResponse {
656			version:crate::VERSION.to_string(),
657			uptime_seconds:metrics.UptimeSeconds,
658			total_requests:metrics.TotalRequests,
659			successful_requests:metrics.SuccessfulRequests,
660			failed_requests:metrics.FailedRequests,
661			average_response_time:metrics.AverageResponseTime,
662			memory_usage_mb:resources.MemoryUsageMb,
663			cpu_usage_percent:resources.CPUUsagePercent,
664			active_requests:self.AppState.GetActiveRequestCount().await as u32,
665		}))
666	}
667
668	/// Handle service health check
669	async fn health_check(
670		&self,
671		_request:Request<HealthCheckRequest>,
672	) -> std::result::Result<Response<HealthCheckResponse>, Status> {
673		debug!("[AirVinegRPCService] Health check request received");
674
675		Ok(Response::new(air_generated::HealthCheckResponse {
676			healthy:true,
677			timestamp:CurrentTimestamp(),
678		}))
679	}
680
681	// ==================== Phase 2: Update Operations ====================
682
683	/// Handle download update requests
684	async fn download_update(
685		&self,
686		request:Request<DownloadRequest>,
687	) -> std::result::Result<Response<DownloadResponse>, Status> {
688		let RequestData = request.into_inner();
689		let request_id = RequestData.request_id.clone();
690
691		info!(
692			"[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
693			request_id, RequestData.url, RequestData.destination_path
694		);
695
696		self.AppState
697			.RegisterRequest(request_id.clone(), "download_update".to_string())
698			.await
699			.map_err(|e| Status::internal(e.to_string()))?;
700
701		// Validate URL is not empty
702		if RequestData.url.is_empty() {
703			let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
704			self.AppState
705				.UpdateRequestStatus(
706					&request_id,
707					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
708					None,
709				)
710				.await
711				.ok();
712			return Err(Status::invalid_argument(error_msg.to_string()));
713		}
714
715		// Validate URL format
716		if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
717			let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
718			self.AppState
719				.UpdateRequestStatus(
720					&request_id,
721					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
722					None,
723				)
724				.await
725				.ok();
726			return Err(Status::invalid_argument(error_msg.to_string()));
727		}
728
729		// Validate destination path is writeable
730		let destination = if RequestData.destination_path.is_empty() {
731			// Default to cache directory if not specified
732			self.UpdateManager
733				.GetCacheDirectory()
734				.join("update-latest.bin")
735				.to_string_lossy()
736				.to_string()
737		} else {
738			// Use provided destination
739			let dest_path = std::path::Path::new(&RequestData.destination_path);
740			if let Some(parent) = dest_path.parent() {
741				if parent.as_os_str().is_empty() {
742					// Relative path in cache directory
743					self.UpdateManager
744						.GetCacheDirectory()
745						.join(&RequestData.destination_path)
746						.to_string_lossy()
747						.to_string()
748				} else {
749					// Absolute or full path
750					RequestData.destination_path.clone()
751				}
752			} else {
753				RequestData.destination_path.clone()
754			}
755		};
756
757		// Check if destination directory exists and is writeable
758		let dest_path = std::path::Path::new(&destination);
759		if let Some(parent) = dest_path.parent() {
760			if !parent.exists() {
761				return Err(Status::failed_precondition(format!(
762					"Destination directory does not exist: {}",
763					parent.display()
764				)));
765			}
766
767			// Test writeability
768			if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
769				let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
770				self.AppState
771					.UpdateRequestStatus(
772						&request_id,
773						crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
774						None,
775					)
776					.await
777					.ok();
778				return Err(Status::permission_denied(error_msg.to_string()));
779			}
780			// Cleanup test file
781			let _ = std::fs::remove_file(parent.join(".write_test"));
782		}
783
784		// Use download manager - includes SHA256 checksum verification, progress
785		// tracking, and retry logic
786		let download_result = self
787			.DownloadManager
788			.DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
789			.await;
790
791		match download_result {
792			Ok(result) => {
793				self.AppState
794					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
795					.await
796					.ok();
797
798				info!(
799					"[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
800					result.path, result.size, result.checksum
801				);
802
803				Ok(Response::new(DownloadResponse {
804					request_id,
805					success:true,
806					file_path:result.path,
807					file_size:result.size,
808					checksum:result.checksum,
809					error:String::new(),
810				}))
811			},
812			Err(crate::AirError::Network(e)) => {
813				self.AppState
814					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
815					.await
816					.ok();
817				error!("[AirVinegRPCService] Download update network error: {}", e);
818				Err(Status::unavailable(e))
819			},
820			Err(crate::AirError::FileSystem(e)) => {
821				self.AppState
822					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
823					.await
824					.ok();
825				error!("[AirVinegRPCService] Download update filesystem error: {}", e);
826				Err(Status::internal(e))
827			},
828			Err(e) => {
829				self.AppState
830					.UpdateRequestStatus(
831						&request_id,
832						crate::ApplicationState::RequestState::Failed(e.to_string()),
833						None,
834					)
835					.await
836					.ok();
837				error!("[AirVinegRPCService] Download update failed: {}", e);
838				Ok(Response::new(DownloadResponse {
839					request_id,
840					success:false,
841					file_path:String::new(),
842					file_size:0,
843					checksum:String::new(),
844					error:e.to_string(),
845				}))
846			},
847		}
848	}
849
850	/// Handle apply update requests
851	async fn apply_update(
852		&self,
853		request:Request<ApplyUpdateRequest>,
854	) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
855		let RequestData = request.into_inner();
856		let request_id = RequestData.request_id.clone();
857
858		info!(
859			"[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
860			request_id, RequestData.version, RequestData.update_path
861		);
862
863		self.AppState
864			.RegisterRequest(request_id.clone(), "apply_update".to_string())
865			.await
866			.map_err(|e| Status::internal(e.to_string()))?;
867
868		// Validate version
869		if RequestData.version.is_empty() {
870			let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
871			self.AppState
872				.UpdateRequestStatus(
873					&request_id,
874					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
875					None,
876				)
877				.await
878				.ok();
879			return Err(Status::invalid_argument(error_msg.to_string()));
880		}
881
882		// Validate update path is not empty
883		if RequestData.update_path.is_empty() {
884			let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
885			self.AppState
886				.UpdateRequestStatus(
887					&request_id,
888					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
889					None,
890				)
891				.await
892				.ok();
893			return Err(Status::invalid_argument(error_msg.to_string()));
894		}
895
896		let update_path = std::path::Path::new(&RequestData.update_path);
897
898		// Validate update file exists
899		if !update_path.exists() {
900			let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
901			self.AppState
902				.UpdateRequestStatus(
903					&request_id,
904					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
905					None,
906				)
907				.await
908				.ok();
909			return Err(Status::not_found(error_msg.to_string()));
910		}
911
912		// Validate update file is readable and has content
913		let metadata = match std::fs::metadata(update_path) {
914			Ok(m) => m,
915			Err(e) => {
916				let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
917				self.AppState
918					.UpdateRequestStatus(
919						&request_id,
920						crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
921						None,
922					)
923					.await
924					.ok();
925				return Err(Status::internal(error_msg.to_string()));
926			},
927		};
928
929		if metadata.len() == 0 {
930			let error_msg = crate::AirError::Validation("Update file is empty".to_string());
931			self.AppState
932				.UpdateRequestStatus(
933					&request_id,
934					crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
935					None,
936				)
937				.await
938				.ok();
939			return Err(Status::failed_precondition(error_msg.to_string()));
940		}
941
942		// Prepare rollback capability before applying update
943		let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
944		if let Err(ref e) = rollback_backup_path {
945			warn!(
946				"[AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback capability.",
947				e
948			);
949		}
950
951		// Verify update file integrity (checksum check)
952		match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
953			Ok(true) => {
954				info!("[AirVinegRPCService] Update verification successful, preparing for installation");
955
956				self.AppState
957					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
958					.await
959					.ok();
960
961				// Trigger graceful shutdown after returning response
962				let AppState = self.AppState.clone();
963				let version = RequestData.version.clone();
964				let self_clone = self.clone();
965
966				tokio::spawn(async move {
967					tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
968					log::info!(
969						"[AirVinegRPCService] Initiating graceful shutdown for update version {}",
970						version
971					);
972
973					// Graceful shutdown implementation
974					if let Err(e) = AppState.StopAllBackgroundTasks().await {
975						log::error!("[AirVinegRPCService] Failed to initiate graceful shutdown: {}", e);
976
977						// Implement rollback if update fails
978						log::warn!("[AirVinegRPCService] Rollback initiated due to graceful shutdown failure");
979						if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
980							log::error!("[AirVinegRPCService] Rollback failed: {}", rollback_error);
981						} else {
982							log::info!("[AirVinegRPCService] Rollback completed successfully");
983						}
984					}
985				});
986
987				Ok(Response::new(ApplyUpdateResponse {
988					request_id,
989					success:true,
990					error:String::new(),
991				}))
992			},
993			Ok(false) => {
994				let error_msg = "Update verification failed: checksum mismatch".to_string();
995				self.AppState
996					.UpdateRequestStatus(
997						&request_id,
998						crate::ApplicationState::RequestState::Failed(error_msg.clone()),
999						None,
1000					)
1001					.await
1002					.ok();
1003				error!("[AirVinegRPCService] {}", error_msg);
1004
1005				// Clean up rollback backup if verification failed
1006				let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1007
1008				Err(Status::failed_precondition(error_msg))
1009			},
1010			Err(crate::AirError::FileSystem(e)) => {
1011				self.AppState
1012					.UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1013					.await
1014					.ok();
1015				error!("[AirVinegRPCService] Update verification filesystem error: {}", e);
1016
1017				// Clean up rollback backup if verification failed
1018				let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1019
1020				Err(Status::internal(e))
1021			},
1022			Err(e) => {
1023				self.AppState
1024					.UpdateRequestStatus(
1025						&request_id,
1026						crate::ApplicationState::RequestState::Failed(e.to_string()),
1027						None,
1028					)
1029					.await
1030					.ok();
1031				error!("[AirVinegRPCService] Update verification error: {}", e);
1032
1033				// Clean up rollback backup if verification failed
1034				let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1035
1036				Ok(Response::new(ApplyUpdateResponse {
1037					request_id,
1038					success:false,
1039					error:e.to_string(),
1040				}))
1041			},
1042		}
1043	}
1044
1045	// ==================== Phase 3: Download Operations ====================
1046
1047	/// Handle streaming download requests with bidirectional streaming and
1048	/// chunk delivery
1049	type DownloadStreamStream =
1050		tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1051
1052	async fn download_stream(
1053		&self,
1054		request:Request<DownloadStreamRequest>,
1055	) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1056		let RequestData = request.into_inner();
1057		let request_id = RequestData.request_id.clone();
1058
1059		info!(
1060			"[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1061			request_id, RequestData.url
1062		);
1063
1064		self.AppState
1065			.RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1066			.await
1067			.map_err(|e| Status::internal(e.to_string()))?;
1068
1069		// Validate URL
1070		if RequestData.url.is_empty() {
1071			let error_msg = "URL cannot be empty".to_string();
1072			self.AppState
1073				.UpdateRequestStatus(
1074					&request_id,
1075					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1076					None,
1077				)
1078				.await
1079				.ok();
1080			return Err(Status::invalid_argument(error_msg));
1081		}
1082
1083		// Validate URL scheme
1084		if !match_url_scheme(&RequestData.url) {
1085			let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1086			self.AppState
1087				.UpdateRequestStatus(
1088					&request_id,
1089					crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1090					None,
1091				)
1092				.await
1093				.ok();
1094			return Err(Status::invalid_argument(error_msg));
1095		}
1096
1097		// Validate URL supports range headers for streaming
1098		match self.validate_range_support(&RequestData.url).await {
1099			Ok(true) => {
1100				debug!("[AirVinegRPCService] URL supports range headers");
1101			},
1102			Ok(false) => {
1103				warn!("[AirVinegRPCService] URL does not support range headers, streaming may be inefficient");
1104			},
1105			Err(e) => {
1106				let error_msg = format!("Failed to validate range support: {}", e);
1107				self.AppState
1108					.UpdateRequestStatus(
1109						&request_id,
1110						crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1111						None,
1112					)
1113					.await
1114					.ok();
1115				return Err(Status::internal(error_msg));
1116			},
1117		}
1118
1119		// Create a channel for streaming responses
1120		let (tx, rx) = tokio::sync::mpsc::channel(100);
1121
1122		// Configure chunk size (8MB chunks for balance between throughput and latency)
1123		let chunk_size = 8 * 1024 * 1024; // 8MB
1124
1125		// Clone necessary data for spawn task
1126		let url = RequestData.url.clone();
1127		let headers = RequestData.headers;
1128		let download_request_id = request_id.clone();
1129		let download_manager = self.DownloadManager.clone();
1130		let AppState = self.AppState.clone();
1131
1132		// Spawn streaming task
1133		tokio::spawn(async move {
1134			// Send initial status
1135			if tx
1136				.send(Ok(DownloadStreamResponse {
1137					request_id:download_request_id.clone(),
1138					chunk:vec![].into(),
1139					total_size:0,
1140					downloaded:0,
1141					completed:false,
1142					error:String::new(),
1143				}))
1144				.await
1145				.is_err()
1146			{
1147				log::warn!(
1148					"[AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1149					download_request_id
1150				);
1151				return;
1152			}
1153
1154			// Create HTTP client with connection pooling hints
1155			let client = reqwest::Client::builder()
1156				.pool_idle_timeout(std::time::Duration::from_secs(60))
1157				.pool_max_idle_per_host(5)
1158				.timeout(std::time::Duration::from_secs(300))
1159				.build();
1160
1161			if client.is_err() {
1162				let error = client.unwrap_err().to_string();
1163				let _ = tx
1164					.send(Ok(DownloadStreamResponse {
1165						request_id:download_request_id.clone(),
1166						chunk:vec![].into(),
1167						total_size:0,
1168						downloaded:0,
1169						completed:false,
1170						error:error.clone(),
1171					}))
1172					.await;
1173				AppState
1174					.UpdateRequestStatus(
1175						&download_request_id,
1176						crate::ApplicationState::RequestState::Failed(error),
1177						None,
1178					)
1179					.await
1180					.ok();
1181				return;
1182			}
1183
1184			let client = match client {
1185				Ok(client) => client,
1186				Err(e) => {
1187					let error = format!("Failed to create HTTP client: {}", e);
1188					let _ = tx.send(Err(Status::internal(error.clone())));
1189					AppState
1190						.UpdateRequestStatus(
1191							&download_request_id,
1192							crate::ApplicationState::RequestState::Failed(error),
1193							None,
1194						)
1195						.await
1196						.ok();
1197					return;
1198				},
1199			};
1200
1201			// Start streaming download
1202			let mut total_size:u64 = 0;
1203			let mut total_downloaded:u64 = 0;
1204
1205			match client
1206				.get(&url)
1207				.headers({
1208					let mut map = reqwest::header::HeaderMap::new();
1209					for (key, value) in headers {
1210						if let (Ok(header_name), Ok(header_value)) = (
1211							reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1212							reqwest::header::HeaderValue::from_str(&value),
1213						) {
1214							map.insert(header_name, header_value);
1215						}
1216					}
1217					map
1218				})
1219				.send()
1220				.await
1221			{
1222				Ok(response) => {
1223					if !response.status().is_success() {
1224						let error = format!("Download failed with status: {}", response.status());
1225						let _ = tx
1226							.send(Ok(DownloadStreamResponse {
1227								request_id:download_request_id.clone(),
1228								chunk:vec![].into(),
1229								total_size:0,
1230								downloaded:0,
1231								completed:false,
1232								error:error.clone(),
1233							}))
1234							.await;
1235						AppState
1236							.UpdateRequestStatus(
1237								&download_request_id,
1238								crate::ApplicationState::RequestState::Failed(error),
1239								None,
1240							)
1241							.await
1242							.ok();
1243						return;
1244					}
1245
1246					total_size = response.content_length().unwrap_or(0);
1247					let response_tx = tx.clone();
1248					let response_id = download_request_id.clone();
1249
1250					// Stream chunks to client
1251					let mut stream = response.bytes_stream();
1252					let mut buffer = Vec::with_capacity(chunk_size);
1253					let mut last_progress:f32 = 0.0;
1254
1255					while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1256						if AppState.IsRequestCancelled(&download_request_id).await {
1257							log::info!(
1258								"[AirVinegRPCService] Download cancelled by client [ID: {}]",
1259								download_request_id
1260							);
1261							AppState
1262								.UpdateRequestStatus(
1263									&download_request_id,
1264									crate::ApplicationState::RequestState::Cancelled,
1265									None,
1266								)
1267								.await
1268								.ok();
1269							return;
1270						}
1271
1272						match chunk_result {
1273							Ok(chunk) => {
1274								buffer.extend_from_slice(&chunk);
1275								total_downloaded += chunk.len() as u64;
1276
1277								// Send chunk when buffer reaches target size
1278								if buffer.len() >= chunk_size {
1279									// Calculate checksum for verification
1280									let chunk_checksum = calculate_chunk_checksum(&buffer);
1281
1282									// Calculate progress
1283									let progress = if total_size > 0 {
1284										(total_downloaded as f32 / total_size as f32) * 100.0
1285									} else {
1286										0.0
1287									};
1288
1289									// Update request status periodically
1290									if progress - last_progress >= 5.0 {
1291										AppState
1292											.UpdateRequestStatus(
1293												&download_request_id,
1294												crate::ApplicationState::RequestState::InProgress,
1295												Some(progress),
1296											)
1297											.await
1298											.ok();
1299										last_progress = progress;
1300									}
1301
1302									if response_tx
1303										.send(Ok(DownloadStreamResponse {
1304											request_id:response_id.clone(),
1305											chunk:buffer.clone().into(),
1306											total_size,
1307											downloaded:total_downloaded,
1308											completed:false,
1309											error:String::new(),
1310										}))
1311										.await
1312										.is_err()
1313									{
1314										log::warn!(
1315											"[AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1316											download_request_id
1317										);
1318										AppState
1319											.UpdateRequestStatus(
1320												&download_request_id,
1321												crate::ApplicationState::RequestState::Failed(
1322													"Client disconnected".to_string(),
1323												),
1324												None,
1325											)
1326											.await
1327											.ok();
1328										return;
1329									}
1330
1331									debug!(
1332										"[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1333										buffer.len(),
1334										download_request_id,
1335										progress
1336									);
1337
1338									buffer.clear();
1339								}
1340							},
1341							Err(e) => {
1342								let error = format!("Download error: {}", e);
1343								log::error!(
1344									"[AirVinegRPCService] Stream download failed [ID: {}]: {}",
1345									download_request_id,
1346									error
1347								);
1348
1349								let _ = response_tx
1350									.send(Ok(DownloadStreamResponse {
1351										request_id:response_id.clone(),
1352										chunk:vec![].into(),
1353										total_size,
1354										downloaded:total_downloaded,
1355										completed:false,
1356										error:error.clone(),
1357									}))
1358									.await;
1359
1360								AppState
1361									.UpdateRequestStatus(
1362										&download_request_id,
1363										crate::ApplicationState::RequestState::Failed(error),
1364										None,
1365									)
1366									.await
1367									.ok();
1368								return;
1369							},
1370						}
1371					}
1372
1373					// Send remaining buffered data
1374					if !buffer.is_empty() {
1375						let chunk_checksum = calculate_chunk_checksum(&buffer);
1376
1377						if tx
1378							.send(Ok(DownloadStreamResponse {
1379								request_id:download_request_id.clone(),
1380								chunk:buffer.into(),
1381								total_size,
1382								downloaded:total_downloaded,
1383								completed:false,
1384								error:String::new(),
1385							}))
1386							.await
1387							.is_err()
1388						{
1389							log::warn!(
1390								"[AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1391								download_request_id
1392							);
1393							return;
1394						}
1395					}
1396
1397					// Send completion signal
1398					AppState
1399						.UpdateRequestStatus(
1400							&download_request_id,
1401							crate::ApplicationState::RequestState::Completed,
1402							Some(100.0),
1403						)
1404						.await
1405						.ok();
1406
1407					let _ = tx
1408						.send(Ok(DownloadStreamResponse {
1409							request_id,
1410							chunk:vec![].into(),
1411							total_size,
1412							downloaded:total_downloaded,
1413							completed:true,
1414							error:String::new(),
1415						}))
1416						.await;
1417
1418					info!(
1419						"[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1420						download_request_id, total_downloaded
1421					);
1422				},
1423				Err(e) => {
1424					let error = format!("Failed to start streaming download: {}", e);
1425					log::error!(
1426						"[AirVinegRPCService] Stream download error [ID: {}]: {}",
1427						download_request_id,
1428						error
1429					);
1430
1431					let _ = tx
1432						.send(Ok(DownloadStreamResponse {
1433							request_id:download_request_id.clone(),
1434							chunk:vec![].into(),
1435							total_size:0,
1436							downloaded:0,
1437							completed:false,
1438							error:error.clone(),
1439						}))
1440						.await;
1441
1442					AppState
1443						.UpdateRequestStatus(
1444							&download_request_id,
1445							crate::ApplicationState::RequestState::Failed(error),
1446							None,
1447						)
1448						.await
1449						.ok();
1450				},
1451			}
1452		});
1453
1454		Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1455	}
1456
1457	// ==================== Phase 4: Indexing Operations ====================
1458
1459	/// Handle file search requests
1460	async fn search_files(
1461		&self,
1462		request:Request<SearchRequest>,
1463	) -> std::result::Result<Response<SearchResponse>, Status> {
1464		let RequestData = request.into_inner();
1465		let request_id = RequestData.request_id.clone();
1466
1467		debug!(
1468			"[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1469			RequestData.query, RequestData.path
1470		);
1471
1472		// Validate search query
1473		if RequestData.query.is_empty() {
1474			return Ok(Response::new(SearchResponse {
1475				request_id,
1476				results:vec![],
1477				total_results:0,
1478				error:"Search query cannot be empty".to_string(),
1479			}));
1480		}
1481
1482		// Use file indexer to search - convert to match the existing signature
1483		let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1484		let search_path = path.as_deref();
1485
1486		match self
1487			.FileIndexer
1488			.SearchFiles(
1489				SearchQuery {
1490					query:RequestData.query.clone(),
1491					mode:SearchMode::Literal,
1492					case_sensitive:false,
1493					whole_word:false,
1494					regex:None,
1495					max_results:RequestData.max_results,
1496					page:1,
1497				},
1498				path,
1499				None,
1500			)
1501			.await
1502		{
1503			Ok(search_results) => {
1504				// Convert from internal SearchResult to proto FileResult
1505				let mut file_results = Vec::new();
1506				for r in search_results {
1507					// Create a preview from the first match if available
1508					let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1509						(first_match.line_content.clone(), first_match.line_number)
1510					} else {
1511						(String::new(), 0)
1512					};
1513
1514					// Get file size from metadata or filesystem
1515					let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1516						metadata.size
1517					} else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1518						file_metadata.len()
1519					} else {
1520						0
1521					};
1522
1523					file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1524				}
1525
1526				info!("[AirVinegRPCService] Search completed: {} results found", file_results.len());
1527
1528				let result_count = file_results.len();
1529				Ok(Response::new(SearchResponse {
1530					request_id,
1531					results:file_results,
1532					total_results:result_count as u32,
1533					error:String::new(),
1534				}))
1535			},
1536			Err(e) => {
1537				error!("[AirVinegRPCService] Search failed: {}", e);
1538				Ok(Response::new(SearchResponse {
1539					request_id,
1540					results:vec![],
1541					total_results:0,
1542					error:e.to_string(),
1543				}))
1544			},
1545		}
1546	}
1547
1548	/// Handle get file info requests
1549	async fn get_file_info(
1550		&self,
1551		request:Request<FileInfoRequest>,
1552	) -> std::result::Result<Response<FileInfoResponse>, Status> {
1553		let RequestData = request.into_inner();
1554		let request_id = RequestData.request_id.clone();
1555
1556		debug!("[AirVinegRPCService] Get file info request: {}", RequestData.path);
1557
1558		// Validate path
1559		if RequestData.path.is_empty() {
1560			return Ok(Response::new(FileInfoResponse {
1561				request_id,
1562				exists:false,
1563				size:0,
1564				mime_type:String::new(),
1565				checksum:String::new(),
1566				modified_time:0,
1567				error:"Path cannot be empty".to_string(),
1568			}));
1569		}
1570
1571		// Get file metadata
1572		use std::path::Path;
1573		let path = Path::new(&RequestData.path);
1574
1575		if !path.exists() {
1576			return Ok(Response::new(FileInfoResponse {
1577				request_id,
1578				exists:false,
1579				size:0,
1580				mime_type:String::new(),
1581				checksum:String::new(),
1582				modified_time:0,
1583				error:String::new(), // File not found is not an error
1584			}));
1585		}
1586
1587		// Get file metadata using std::fs
1588		match std::fs::metadata(path) {
1589			Ok(metadata) => {
1590				let modified_time = metadata
1591					.modified()
1592					.ok()
1593					.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1594					.map(|d| d.as_secs())
1595					.unwrap_or(0);
1596
1597				// Detect MIME type
1598				let mime_type = self.detect_mime_type(path);
1599
1600				// Calculate checksum lazily or on-demand
1601				let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1602					log::warn!("[AirVinegRPCService] Failed to calculate checksum: {}", e);
1603					String::new()
1604				});
1605
1606				Ok(Response::new(FileInfoResponse {
1607					request_id,
1608					exists:true,
1609					size:metadata.len(),
1610					mime_type,
1611					checksum,
1612					modified_time,
1613					error:String::new(),
1614				}))
1615			},
1616			Err(e) => {
1617				error!("[AirVinegRPCService] Failed to get file metadata: {}", e);
1618				Ok(Response::new(FileInfoResponse {
1619					request_id,
1620					exists:false,
1621					size:0,
1622					mime_type:String::new(),
1623					checksum:String::new(),
1624					modified_time:0,
1625					error:e.to_string(),
1626				}))
1627			},
1628		}
1629	}
1630
1631	// ==================== Phase 5: Monitoring & Metrics ====================
1632
1633	/// Handle get metrics requests
1634	async fn get_metrics(
1635		&self,
1636		request:Request<MetricsRequest>,
1637	) -> std::result::Result<Response<MetricsResponse>, Status> {
1638		let RequestData = request.into_inner();
1639		let request_id = RequestData.request_id.clone();
1640
1641		debug!("[AirVinegRPCService] Get metrics request: type='{}'", RequestData.metric_type);
1642
1643		let metrics = self.AppState.GetMetrics().await;
1644		let mut metrics_map = std::collections::HashMap::new();
1645
1646		// Performance metrics
1647		if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1648			metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1649			metrics_map.insert("total_requests".to_string(), metrics.TotalRequests.to_string());
1650			metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequests.to_string());
1651			metrics_map.insert("failed_requests".to_string(), metrics.FailedRequests.to_string());
1652			metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1653		}
1654
1655		// Request metrics
1656		if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1657			metrics_map.insert(
1658				"ActiveRequests".to_string(),
1659				self.AppState.GetActiveRequestCount().await.to_string(),
1660			);
1661		}
1662
1663		Ok(Response::new(MetricsResponse {
1664			request_id,
1665			metrics:metrics_map,
1666			error:String::new(),
1667		}))
1668	}
1669
1670	/// Handle get resource usage requests
1671	async fn get_resource_usage(
1672		&self,
1673		request:Request<ResourceUsageRequest>,
1674	) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1675		let RequestData = request.into_inner();
1676		let request_id = RequestData.request_id.clone();
1677
1678		debug!("[AirVinegRPCService] Get resource usage request");
1679
1680		let resources = self.AppState.GetResourceUsage().await;
1681
1682		Ok(Response::new(ResourceUsageResponse {
1683			request_id,
1684			memory_usage_mb:resources.MemoryUsageMb,
1685			cpu_usage_percent:resources.CPUUsagePercent,
1686			disk_usage_mb:resources.DiskUsageMb,
1687			network_usage_mbps:resources.NetworkUsageMbps,
1688			error:String::new(),
1689		}))
1690	}
1691
1692	/// Handle set resource limits requests
1693	async fn set_resource_limits(
1694		&self,
1695		request:Request<ResourceLimitsRequest>,
1696	) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1697		let RequestData = request.into_inner();
1698		let request_id = RequestData.request_id.clone();
1699
1700		info!(
1701			"[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1702			RequestData.memory_limit_mb, RequestData.cpu_limit_percent, RequestData.disk_limit_mb
1703		);
1704
1705		// Validate limits
1706		if RequestData.memory_limit_mb == 0 {
1707			return Ok(Response::new(ResourceLimitsResponse {
1708				request_id,
1709				success:false,
1710				error:"Memory limit must be greater than 0".to_string(),
1711			}));
1712		}
1713
1714		if RequestData.cpu_limit_percent > 100 {
1715			return Ok(Response::new(ResourceLimitsResponse {
1716				request_id,
1717				success:false,
1718				error:"CPU limit cannot exceed 100%".to_string(),
1719			}));
1720		}
1721
1722		// Apply new limits via ApplicationState
1723		let result = self
1724			.AppState
1725			.SetResourceLimits(
1726				Some(RequestData.memory_limit_mb as u64),
1727				Some(RequestData.cpu_limit_percent as f64),
1728				Some(RequestData.disk_limit_mb as u64),
1729			)
1730			.await;
1731
1732		match result {
1733			Ok(_) => {
1734				Ok(Response::new(ResourceLimitsResponse {
1735					request_id,
1736					success:true,
1737					error:String::new(),
1738				}))
1739			},
1740			Err(e) => {
1741				Ok(Response::new(ResourceLimitsResponse {
1742					request_id,
1743					success:false,
1744					error:e.to_string(),
1745				}))
1746			},
1747		}
1748	}
1749
1750	// ==================== Phase 6: Configuration Management ====================
1751
1752	/// Handle get configuration requests
1753	async fn get_configuration(
1754		&self,
1755		request:Request<ConfigurationRequest>,
1756	) -> std::result::Result<Response<ConfigurationResponse>, Status> {
1757		let RequestData = request.into_inner();
1758		let request_id = RequestData.request_id.clone();
1759
1760		debug!(
1761			"[AirVinegRPCService] Get configuration request: section='{}'",
1762			RequestData.section
1763		);
1764
1765		// Get configuration from ApplicationState
1766		let config = self.AppState.GetConfiguration().await;
1767		let mut config_map = std::collections::HashMap::new();
1768
1769		// Serialize config to map, filter by section if specified
1770		match RequestData.section.as_str() {
1771			"grpc" => {
1772				config_map.insert("bind_address".to_string(), config.Grpc.BindAddress.clone());
1773				config_map.insert("max_connections".to_string(), config.Grpc.MaxConnections.to_string());
1774				config_map.insert("request_timeout_secs".to_string(), config.Grpc.RequestTimeoutSecs.to_string());
1775			},
1776			"authentication" => {
1777				config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
1778				config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
1779				config_map.insert(
1780					"token_expiration_hours".to_string(),
1781					config.Authentication.TokenExpirationHours.to_string(),
1782				);
1783			},
1784			"updates" => {
1785				config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
1786				config_map.insert(
1787					"check_interval_hours".to_string(),
1788					config.Updates.CheckIntervalHours.to_string(),
1789				);
1790				config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
1791				config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
1792				config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
1793			},
1794			"downloader" => {
1795				config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
1796				config_map.insert(
1797					"max_concurrent_downloads".to_string(),
1798					config.Downloader.MaxConcurrentDownloads.to_string(),
1799				);
1800				config_map.insert(
1801					"download_timeout_secs".to_string(),
1802					config.Downloader.DownloadTimeoutSecs.to_string(),
1803				);
1804				config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
1805				config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
1806			},
1807			"indexing" => {
1808				config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
1809				config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
1810				config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
1811				config_map.insert(
1812					"update_interval_minutes".to_string(),
1813					config.Indexing.UpdateIntervalMinutes.to_string(),
1814				);
1815				config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
1816			},
1817			_ => {
1818				// Return all sections (redacted for sensitive values)
1819				config_map.insert("_grpc_enabled".to_string(), "true".to_string());
1820			},
1821		}
1822
1823		Ok(Response::new(ConfigurationResponse {
1824			request_id,
1825			configuration:config_map,
1826			error:String::new(),
1827		}))
1828	}
1829
1830	/// Handle update configuration requests
1831	async fn update_configuration(
1832		&self,
1833		request:Request<UpdateConfigurationRequest>,
1834	) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
1835		let RequestData = request.into_inner();
1836		let request_id = RequestData.request_id.clone();
1837
1838		info!(
1839			"[AirVinegRPCService] Update configuration request: section='{}'",
1840			RequestData.section
1841		);
1842
1843		// Validate section
1844		if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
1845		{
1846			return Ok(Response::new(UpdateConfigurationResponse {
1847				request_id,
1848				success:false,
1849				error:"Invalid configuration section".to_string(),
1850			}));
1851		}
1852
1853		// Update configuration via ApplicationState
1854		let result = self
1855			.AppState
1856			.UpdateConfiguration(RequestData.section, RequestData.updates)
1857			.await;
1858
1859		match result {
1860			Ok(_) => {
1861				Ok(Response::new(UpdateConfigurationResponse {
1862					request_id,
1863					success:true,
1864					error:String::new(),
1865				}))
1866			},
1867			Err(e) => {
1868				Ok(Response::new(UpdateConfigurationResponse {
1869					request_id,
1870					success:false,
1871					error:e.to_string(),
1872				}))
1873			},
1874		}
1875	}
1876}
1877
1878// ==================== Helper Methods ====================
1879
1880impl AirVinegRPCService {
1881	/// Detect MIME type based on file extension
1882	fn detect_mime_type(&self, path:&std::path::Path) -> String {
1883		match path.extension().and_then(|e| e.to_str()) {
1884			Some("rs") => "text/x-rust".to_string(),
1885			Some("ts") => "application/typescript".to_string(),
1886			Some("js") => "application/javascript".to_string(),
1887			Some("json") => "application/json".to_string(),
1888			Some("toml") => "application/toml".to_string(),
1889			Some("md") => "text/markdown".to_string(),
1890			Some("txt") => "text/plain".to_string(),
1891			Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
1892			Some("html") => "text/html".to_string(),
1893			Some("css") => "text/css".to_string(),
1894			Some("xml") => "application/xml".to_string(),
1895			Some("png") => "image/png".to_string(),
1896			Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
1897			Some("gif") => "image/gif".to_string(),
1898			Some("svg") => "image/svg+xml".to_string(),
1899			Some("pdf") => "application/pdf".to_string(),
1900			Some("zip") => "application/zip".to_string(),
1901			Some("tar") | Some("gz") => "application/x-tar".to_string(),
1902			Some("proto") => "application/x-protobuf".to_string(),
1903			_ => "application/octet-stream".to_string(),
1904		}
1905	}
1906
1907	/// Download file with exponential backoff retry
1908	/// Returns file_info (path, size, checksum) from DownloadManager
1909	async fn download_file_with_retry(
1910		&self,
1911		request_id:&str,
1912		url:String,
1913		DestinationPath:String,
1914		checksum:String,
1915		progress_callback:Option<Box<dyn Fn(f32) + Send>>,
1916	) -> Result<crate::Downloader::DownloadResult> {
1917		let config = &self.AppState.Configuration.Downloader;
1918		let mut retries = 0;
1919
1920		loop {
1921			match self
1922				.DownloadManager
1923				.DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
1924				.await
1925			{
1926				Ok(file_info) => {
1927					if let Some(ref callback) = progress_callback {
1928						callback(100.0);
1929					}
1930					return Ok(file_info);
1931				},
1932				Err(e) => {
1933					if retries < config.MaxRetries as usize {
1934						retries += 1;
1935						let backoff_secs = 2u64.pow(retries as u32);
1936						warn!(
1937							"[AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - Backing \
1938							 off {} seconds",
1939							request_id, retries, config.MaxRetries, e, backoff_secs
1940						);
1941
1942						if let Some(ref callback) = progress_callback {
1943							// Notify retry attempts
1944							let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
1945							callback(progress);
1946						}
1947
1948						tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
1949					} else {
1950						error!(
1951							"[AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
1952							config.MaxRetries, request_id, e
1953						);
1954						return Err(e);
1955					}
1956				},
1957			}
1958		}
1959	}
1960
1961	/// Validate URL supports range headers for streaming
1962	async fn validate_range_support(&self, url:&str) -> Result<bool> {
1963		let client = reqwest::Client::builder()
1964			.timeout(std::time::Duration::from_secs(10))
1965			.build()
1966			.map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
1967
1968		let response = client
1969			.head(url)
1970			.send()
1971			.await
1972			.map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
1973
1974		// Check if server supports range requests
1975		let accepts_ranges = response
1976			.headers()
1977			.get("accept-ranges")
1978			.map(|v| v.to_str().unwrap_or("none"))
1979			.unwrap_or("none");
1980
1981		Ok(accepts_ranges == "bytes")
1982	}
1983
1984	/// Prepare rollback backup before applying update
1985	async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
1986		let cache_dir = self.UpdateManager.GetCacheDirectory();
1987		let rollback_dir = cache_dir.join("rollback");
1988
1989		// Create rollback directory if it doesn't exist
1990		if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
1991			return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
1992		}
1993
1994		// Create backup marker file with version
1995		let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
1996		let marker_content = format!(
1997			"version={}\ntimestamp={}\nrollback_available=true",
1998			version,
1999			chrono::Utc::now().to_rfc3339()
2000		);
2001
2002		if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2003			return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2004		}
2005
2006		info!(
2007			"[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2008			version, backup_file
2009		);
2010
2011		Ok(())
2012	}
2013
2014	/// Cleanup rollback backup after successful update or failed verification
2015	async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2016		let cache_dir = self.UpdateManager.GetCacheDirectory();
2017		let rollback_dir = cache_dir.join("rollback");
2018		let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2019
2020		if backup_file.exists() {
2021			if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2022				return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2023			}
2024			info!("[AirVinegRPCService] Rollback backup cleaned up for version {}", version);
2025		}
2026
2027		Ok(())
2028	}
2029
2030	/// Perform rollback to previous version
2031	async fn perform_rollback(&self, version:&str) -> Result<()> {
2032		let cache_dir = self.UpdateManager.GetCacheDirectory();
2033		let rollback_dir = cache_dir.join("rollback");
2034		let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2035
2036		if !backup_file.exists() {
2037			return Err(AirError::FileSystem(format!(
2038				"Rollback backup not found for version {}",
2039				version
2040			)));
2041		}
2042
2043		log::info!("[AirVinegRPCService] Starting rollback for version {}", version);
2044
2045		// Read backup marker
2046		let marker_content = tokio::fs::read_to_string(&backup_file)
2047			.await
2048			.map_err(|e| format!("Failed to read backup marker: {}", e))?;
2049
2050		// Parse marker content
2051		let mut timestamp = None;
2052		let mut rollback_available = false;
2053
2054		for line in marker_content.lines() {
2055			if let Some(value) = line.strip_prefix("timestamp=") {
2056				timestamp = Some(value.to_string());
2057			} else if line == "rollback_available=true" {
2058				rollback_available = true;
2059			}
2060		}
2061
2062		if !rollback_available {
2063			return Err(AirError::Validation("Rollback not available for this version".to_string()));
2064		}
2065
2066		// Perform actual rollback logic
2067		// This would involve:
2068		// 1. Restoring previous binary/files
2069		// 2. Reverting configuration changes
2070		// 3. Cleaning up failed update artifacts
2071
2072		log::info!(
2073			"[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2074			version,
2075			timestamp
2076		);
2077
2078		// Cleanup backup marker after successful rollback
2079		if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2080			log::warn!("[AirVinegRPCService] Failed to cleanup backup marker after rollback: {}", e);
2081		}
2082
2083		Ok(())
2084	}
2085}
2086
2087/// Validate URL has a valid scheme
2088fn match_url_scheme(url:&str) -> bool {
2089	url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2090}
2091
2092/// Calculate chunk checksum for verification
2093fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2094	use sha2::{Digest, Sha256};
2095	let mut hasher = Sha256::new();
2096	hasher.update(chunk);
2097	format!("{:x}", hasher.finalize())
2098}
2099
2100/// Calculate file checksum for integrity verification
2101async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2102	use sha2::{Digest, Sha256};
2103	use tokio::io::AsyncReadExt;
2104
2105	let mut file = tokio::fs::File::open(path)
2106		.await
2107		.map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2108
2109	let mut hasher = Sha256::new();
2110	let mut buffer = vec![0u8; 8192];
2111
2112	loop {
2113		let bytes_read = file
2114			.read(&mut buffer)
2115			.await
2116			.map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2117
2118		if bytes_read == 0 {
2119			break;
2120		}
2121
2122		hasher.update(&buffer[..bytes_read]);
2123	}
2124
2125	let result = hasher.finalize();
2126	Ok(format!("{:x}", result))
2127}