1use std::{
104 collections::{HashMap, VecDeque},
105 path::{Path, PathBuf},
106 sync::Arc,
107 time::{Duration, Instant},
108};
109
110use serde::{Deserialize, Serialize};
111use tokio::sync::{RwLock, Semaphore};
112
113use crate::{AirError, ApplicationState::ApplicationState, Configuration::ConfigurationManager, Result, Utility};
114
115pub struct DownloadManager {
117 AppState:Arc<ApplicationState>,
119
120 ActiveDownloads:Arc<RwLock<HashMap<String, DownloadStatus>>>,
122
123 DownloadQueue:Arc<RwLock<VecDeque<QueuedDownload>>>,
125
126 CacheDirectory:PathBuf,
128
129 client:reqwest::Client,
131
132 ChecksumVerifier:Arc<crate::Security::ChecksumVerifier>,
134
135 BandwidthLimiter:Arc<Semaphore>,
137
138 ConcurrentLimiter:Arc<Semaphore>,
140
141 statistics:Arc<RwLock<DownloadStatistics>>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct DownloadStatus {
148 pub DownloadId:String,
149 pub url:String,
150 pub destination:PathBuf,
151 pub TotalSize:u64,
152 pub downloaded:u64,
153 pub progress:f32,
154 pub status:DownloadState,
155 pub error:Option<String>,
156 pub StartedAt:Option<chrono::DateTime<chrono::Utc>>,
157 pub CompletedAt:Option<chrono::DateTime<chrono::Utc>>,
158 pub ChunksCompleted:usize,
159 pub TotalChunks:usize,
160 pub DownloadRateBytesPerSec:u64,
161 pub ExpectedChecksum:Option<String>,
162 pub ActualChecksum:Option<String>,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
167pub enum DownloadState {
168 Pending,
169 Queued,
170 Downloading,
171 Verifying,
172 Completed,
173 Failed,
174 Cancelled,
175 Paused,
176 Resuming,
177}
178
179#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
181pub enum DownloadPriority {
182 High = 3,
183 Normal = 2,
184 Low = 1,
185 Background = 0,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct QueuedDownload {
191 DownloadId:String,
192 url:String,
193 destination:PathBuf,
194 checksum:String,
195 priority:DownloadPriority,
196 AddedAt:chrono::DateTime<chrono::Utc>,
197 MaxFileSize:Option<u64>,
198 ValidateDiskSpace:bool,
199}
200
201#[derive(Debug, Clone)]
203pub struct DownloadResult {
204 pub path:String,
205 pub size:u64,
206 pub checksum:String,
207 pub duration:Duration,
208 pub AverageRate:u64,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct DownloadStatistics {
214 pub TotalDownloads:u64,
215 pub SuccessfulDownloads:u64,
216 pub FailedDownloads:u64,
217 pub CancelledDownloads:u64,
218 pub TotalBytesDownloaded:u64,
219 pub TotalDownloadTimeSecs:f64,
220 pub AverageDownloadRate:f64,
221 pub PeakDownloadRate:u64,
222 pub ActiveDownloads:usize,
223 pub QueuedDownloads:usize,
224}
225
226pub type ProgressCallback = Arc<dyn Fn(DownloadStatus) + Send + Sync>;
228
229#[derive(Debug, Clone)]
231pub struct DownloadConfig {
232 pub url:String,
233 pub destination:String,
234 pub checksum:String,
235 pub MaxFileSize:Option<u64>,
236 pub ChunkSize:usize,
237 pub MaxRetries:u32,
238 pub TimeoutSecs:u64,
239 pub priority:DownloadPriority,
240 pub ValidateDiskSpace:bool,
241}
242
243impl Default for DownloadConfig {
244 fn default() -> Self {
245 Self {
246 url:String::new(),
247 destination:String::new(),
248 checksum:String::new(),
249 MaxFileSize:None,
250 ChunkSize:8 * 1024 * 1024, MaxRetries:5,
252 TimeoutSecs:300,
253 priority:DownloadPriority::Normal,
254 ValidateDiskSpace:true,
255 }
256 }
257}
258
259impl DownloadManager {
260 pub async fn new(AppState:Arc<ApplicationState>) -> Result<Self> {
262 let config = &AppState.Configuration.Downloader;
263
264 let CacheDirectory = ConfigurationManager::ExpandPath(&config.CacheDirectory)?;
266
267 let CacheDirectoryClone = CacheDirectory.clone();
269
270 let CacheDirectoryCloneForInit = CacheDirectoryClone.clone();
272
273 tokio::fs::create_dir_all(&CacheDirectory)
275 .await
276 .map_err(|e| AirError::Configuration(format!("Failed to create cache directory: {}", e)))?;
277
278 let client = reqwest::Client::builder()
280 .timeout(Duration::from_secs(config.DownloadTimeoutSecs))
281 .connect_timeout(Duration::from_secs(30))
282 .pool_idle_timeout(Duration::from_secs(90))
283 .pool_max_idle_per_host(10)
284 .tcp_keepalive(Duration::from_secs(60))
285 .user_agent("Land-AirDownloader/0.1.0")
286 .build()
287 .map_err(|e| AirError::Network(format!("Failed to create HTTP client: {}", e)))?;
288
289 let BandwidthLimiter = Arc::new(Semaphore::new(100));
291
292 let ConcurrentLimiter = Arc::new(Semaphore::new(5));
294
295 let manager = Self {
296 AppState,
297 ActiveDownloads:Arc::new(RwLock::new(HashMap::new())),
298 DownloadQueue:Arc::new(RwLock::new(VecDeque::new())),
299 CacheDirectory:CacheDirectoryCloneForInit,
300 client,
301 ChecksumVerifier:Arc::new(crate::Security::ChecksumVerifier::New()),
302 BandwidthLimiter,
303 ConcurrentLimiter,
304 statistics:Arc::new(RwLock::new(DownloadStatistics::default())),
305 };
306
307 manager
309 .AppState
310 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Running)
311 .await
312 .map_err(|e| AirError::Internal(e.to_string()))?;
313
314 log::info!(
315 "[DownloadManager] Initialized with cache directory: {}",
316 CacheDirectory.display()
317 );
318
319 Ok(manager)
320 }
321
322 pub async fn DownloadFile(&self, url:String, DestinationPath:String, checksum:String) -> Result<DownloadResult> {
324 self.DownloadFileWithConfig(DownloadConfig { url, destination:DestinationPath, checksum, ..Default::default() })
325 .await
326 }
327
328 pub async fn DownloadFileWithConfig(&self, config:DownloadConfig) -> Result<DownloadResult> {
330 let SanitizedUrl = Self::ValidateAndSanitizeUrl(&config.url)?;
332
333 let DownloadId = Utility::GenerateRequestId();
335
336 log::info!(
337 "[DownloadManager] Starting download [ID: {}] - URL: {}",
338 DownloadId,
339 SanitizedUrl
340 );
341
342 if SanitizedUrl.is_empty() {
344 return Err(AirError::Network("URL cannot be empty".to_string()));
345 }
346
347 let Destination = if config.destination.is_empty() {
349 let Filename = SanitizedUrl
351 .split('/')
352 .last()
353 .and_then(|s| s.split('?').next())
354 .unwrap_or("download.bin");
355 self.CacheDirectory.join(Filename)
356 } else {
357 ConfigurationManager::ExpandPath(&config.destination)?
358 };
359
360 Utility::ValidateFilePath(
362 Destination
363 .to_str()
364 .ok_or_else(|| AirError::Configuration("Invalid destination path".to_string()))?,
365 )?;
366
367 let ExpectedChecksum = if config.checksum.is_empty() { None } else { Some(config.checksum.clone()) };
369
370 self.RegisterDownload(&DownloadId, &SanitizedUrl, &Destination, ExpectedChecksum.clone())
372 .await?;
373
374 if config.ValidateDiskSpace {
376 if let Some(MaxSize) = config.MaxFileSize {
377 self.ValidateDiskSpace(&SanitizedUrl, &Destination, MaxSize * 2).await?;
378 } else {
379 self.ValidateDiskSpace(&SanitizedUrl, &Destination, 1024 * 1024 * 1024).await?; }
381 }
382
383 if let Some(Parent) = Destination.parent() {
385 tokio::fs::create_dir_all(Parent)
386 .await
387 .map_err(|e| AirError::FileSystem(format!("Failed to create destination directory: {}", e)))?;
388 }
389
390 let StartTime = Instant::now();
391
392 let Result = self.DownloadWithRetry(&DownloadId, &SanitizedUrl, &Destination, &config).await;
394
395 let Duration = StartTime.elapsed();
396
397 match Result {
398 Ok(mut FileInfo) => {
399 FileInfo.duration = Duration;
400
401 self.UpdateStatistics(true, FileInfo.size, Duration).await;
403
404 self.UpdateDownloadStatus(&DownloadId, DownloadState::Completed, Some(100.0), None)
405 .await?;
406
407 log::info!(
408 "[DownloadManager] Download completed [ID: {}] - Size: {} bytes in {:.2}s ({:.2} MB/s)",
409 DownloadId,
410 FileInfo.size,
411 Duration.as_secs_f64(),
412 FileInfo.size as f64 / 1_048_576.0 / Duration.as_secs_f64()
413 );
414
415 Ok(FileInfo)
416 },
417 Err(E) => {
418 self.UpdateStatistics(false, 0, Duration).await;
420
421 self.UpdateDownloadStatus(&DownloadId, DownloadState::Failed, None, Some(E.to_string()))
422 .await?;
423
424 if Destination.exists() {
426 let _ = tokio::fs::remove_file(&Destination).await;
427 log::warn!("[DownloadManager] Cleaned up failed download: {}", Destination.display());
428 }
429
430 log::error!("[DownloadManager] Download failed [ID: {}] - Error: {}", DownloadId, E);
431
432 Err(E)
433 },
434 }
435 }
436
437 fn ValidateAndSanitizeUrl(url:&str) -> Result<String> {
439 let url = url.trim();
440
441 if url.is_empty() {
443 return Err(AirError::Network("URL cannot be empty".to_string()));
444 }
445
446 let parsed = url::Url::parse(url).map_err(|e| AirError::Network(format!("Invalid URL format: {}", e)))?;
448
449 match parsed.scheme() {
451 "http" | "https" => (),
452 scheme => {
453 return Err(AirError::Network(format!(
454 "Unsupported URL scheme: '{}'. Only http and https are allowed.",
455 scheme
456 )));
457 },
458 }
459
460 if parsed.host().is_none() {
462 return Err(AirError::Network("URL must have a valid host".to_string()));
463 }
464
465 #[cfg(debug_assertions)]
467 {
468 }
470 #[cfg(not(debug_assertions))]
471 {
472 if let Some(host) = parsed.host_str() {
473 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
474 return Err(AirError::Network("Localhost addresses are not allowed".to_string()));
475 }
476 if host.starts_with("192.168.") || host.starts_with("10.") || host.starts_with("172.16.") {
477 return Err(AirError::Network("Private network addresses are not allowed".to_string()));
478 }
479 }
480 }
481
482 let mut sanitized = parsed.clone();
484
485 if sanitized.password().is_some() {
487 sanitized.set_password(Some("")).ok();
488 }
489
490 Ok(sanitized.to_string())
491 }
492
493 async fn ValidateDiskSpace(&self, url:&str, destination:&Path, RequiredBytes:u64) -> Result<()> {
495 let DestPath = if destination.is_absolute() {
497 destination.to_path_buf()
498 } else {
499 std::env::current_dir()
500 .map_err(|e| AirError::FileSystem(format!("Failed to get current directory: {}", e)))?
501 .join(destination)
502 };
503
504 let MountPoint = self.FindMountPoint(&DestPath)?;
506
507 log::debug!(
510 "[DownloadManager] Validating disk space for URL {} (requires {} bytes) on mount point: {}",
511 url,
512 RequiredBytes,
513 MountPoint.display()
514 );
515
516 #[cfg(unix)]
517 {
518 match self.GetDiskStatvfs(&MountPoint) {
519 Ok((AvailableBytes, TotalBytes)) => {
520 if AvailableBytes < RequiredBytes {
521 log::warn!(
522 "[DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
523 AvailableBytes,
524 RequiredBytes
525 );
526 return Err(AirError::FileSystem(format!(
527 "Insufficient disk space: {} bytes available, {} bytes required",
528 AvailableBytes, RequiredBytes
529 )));
530 }
531
532 log::debug!(
533 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required (total: {})",
534 AvailableBytes,
535 RequiredBytes,
536 TotalBytes
537 );
538 },
539 Err(e) => {
540 log::warn!("[DownloadManager] Failed to check disk space: {}, proceeding anyway", e);
541 },
542 }
543 }
544
545 #[cfg(windows)]
546 {
547 match self.GetDiskSpaceWindows(&MountPoint) {
548 Ok(AvailableBytes) => {
549 if AvailableBytes < RequiredBytes {
550 log::warn!(
551 "[DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
552 AvailableBytes,
553 RequiredBytes
554 );
555 return Err(AirError::FileSystem(format!(
556 "Insufficient disk space: {} bytes available, {} bytes required",
557 available_bytes, RequiredBytes
558 )));
559 }
560 log::debug!(
561 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required",
562 available_bytes,
563 RequiredBytes
564 );
565 },
566 Err(e) => {
567 log::warn!("[DownloadManager] Failed to check disk space: {}, proceeding anyway", e);
568 },
569 }
570 }
571
572 #[cfg(not(any(unix, windows)))]
573 {
574 log::warn!("[DownloadManager] Disk space validation not available on this platform");
575 }
576
577 Ok(())
578 }
579
580 #[cfg(unix)]
582 fn GetDiskStatvfs(&self, path:&Path) -> Result<(u64, u64)> {
583 log::debug!("[DownloadManager] Checking disk space at: {}", path.display());
594 Ok((u64::MAX, u64::MAX))
595 }
596
597 #[cfg(windows)]
599 fn GetDiskSpaceWindows(&self, path:&Path) -> Result<u64> {
600 log::debug!("[DownloadManager] Checking disk space at: {}", path.display());
611 Ok(u64::MAX)
612 }
613
614 fn FindMountPoint(&self, path:&Path) -> Result<PathBuf> {
616 #[cfg(unix)]
617 {
618 let mut current = path
619 .canonicalize()
620 .map_err(|e| AirError::FileSystem(format!("Failed to canonicalize path: {}", e)))?;
621
622 loop {
623 if current.as_os_str().is_empty() || current == Path::new("/") {
624 return Ok(PathBuf::from("/"));
625 }
626
627 let metadata = std::fs::metadata(¤t)
628 .map_err(|e| AirError::FileSystem(format!("Failed to get metadata: {}", e)))?;
629
630 #[cfg(unix)]
632 let CurrentDevice = {
633 use std::os::unix::fs::MetadataExt;
634 metadata.dev()
635 };
636 #[cfg(not(unix))]
637 let CurrentDevice = 0u64; let parent = current.parent();
640
641 if let Some(parent_path) = parent {
642 let ParentMetadata = std::fs::metadata(parent_path)
643 .map_err(|e| AirError::FileSystem(format!("Failed to get parent metadata: {}", e)))?;
644
645 #[cfg(unix)]
646 let ParentDevice = {
647 use std::os::unix::fs::MetadataExt;
648 ParentMetadata.dev()
649 };
650 #[cfg(not(unix))]
651 let ParentDevice = 0u64; if ParentDevice != CurrentDevice {
654 return Ok(current);
655 }
656 } else {
657 return Ok(current);
658 }
659
660 current.pop();
661 }
662 }
663
664 #[cfg(windows)]
665 {
666 let PathStr = path.to_string_lossy();
668 if PathStr.len() >= 3 && PathStr.chars().nth(1) == Some(':') {
669 return Ok(PathBuf::from(&PathStr[..3]));
670 }
671 Ok(PathBuf::from("C:\\"))
672 }
673
674 #[cfg(not(any(unix, windows)))]
675 {
676 Ok(path.to_path_buf())
677 }
678 }
679
680 async fn DownloadWithRetry(
682 &self,
683 DownloadId:&str,
684 url:&str,
685 destination:&PathBuf,
686 config:&DownloadConfig,
687 ) -> Result<DownloadResult> {
688 let RetryPolicy = crate::Resilience::RetryPolicy {
689 MaxRetries:config.MaxRetries,
690 InitialIntervalMs:1000,
691 MaxIntervalMs:32000,
692 BackoffMultiplier:2.0,
693 JitterFactor:0.1,
694 BudgetPerMinute:100,
695 ErrorClassification:std::collections::HashMap::new(),
696 };
697
698 let RetryManager = crate::Resilience::RetryManager::new(RetryPolicy.clone());
699 let CircuitBreaker = crate::Resilience::CircuitBreaker::new(
700 "downloader".to_string(),
701 crate::Resilience::CircuitBreakerConfig::default(),
702 );
703
704 let mut attempt = 0;
705
706 loop {
707 if CircuitBreaker.GetState().await == crate::Resilience::CircuitState::Open {
709 if !CircuitBreaker.AttemptRecovery().await {
710 return Err(AirError::Network(
711 "Circuit breaker is open, too many recent failures".to_string(),
712 ));
713 }
714 }
715
716 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
718 if status.status == DownloadState::Cancelled {
719 return Err(AirError::Network("Download cancelled".to_string()));
720 }
721 }
722
723 match self.PerformDownload(DownloadId, url, destination, config).await {
724 Ok(file_info) => {
725 if let Some(ref ExpectedChecksum) = ExpectedChecksumFromConfig(config) {
727 self.UpdateDownloadStatus(DownloadId, DownloadState::Verifying, Some(100.0), None)
728 .await?;
729
730 if let Err(e) = self.VerifyChecksum(destination, ExpectedChecksum).await {
731 log::warn!("[DownloadManager] Checksum verification failed [ID: {}]: {}", DownloadId, e);
732 CircuitBreaker.RecordFailure().await;
733
734 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
735 attempt += 1;
736 let delay = RetryManager.CalculateRetryDelay(attempt);
737 log::info!(
738 "[DownloadManager] Retrying download [ID: {}] (attempt {}/{}) after {:?}",
739 DownloadId,
740 attempt + 1,
741 config.MaxRetries + 1,
742 delay
743 );
744 tokio::time::sleep(delay).await;
745 continue;
746 } else {
747 return Err(AirError::Network(format!(
748 "Checksum verification failed after {} retries: {}",
749 attempt, e
750 )));
751 }
752 }
753 }
754
755 CircuitBreaker.RecordSuccess().await;
756 return Ok(file_info);
757 },
758 Err(e) => {
759 CircuitBreaker.RecordFailure().await;
760
761 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
762 attempt += 1;
763 log::warn!(
764 "[DownloadManager] Download failed [ID: {}], retrying (attempt {}/{}): {}",
765 DownloadId,
766 attempt + 1,
767 config.MaxRetries + 1,
768 e
769 );
770
771 let delay = RetryManager.CalculateRetryDelay(attempt);
772 tokio::time::sleep(delay).await;
773 } else {
774 return Err(e);
775 }
776 },
777 }
778 }
779 }
780
781 async fn PerformDownload(
783 &self,
784 DownloadId:&str,
785 url:&str,
786 destination:&PathBuf,
787 config:&DownloadConfig,
788 ) -> Result<DownloadResult> {
789 let _concurrent_permit = self
791 .ConcurrentLimiter
792 .acquire()
793 .await
794 .map_err(|e| AirError::Internal(format!("Failed to acquire download permit: {}", e)))?;
795
796 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(0.0), None)
797 .await?;
798
799 let TempDestination = destination.with_extension("tmp");
801
802 let mut ExistingSize:u64 = 0;
804 if TempDestination.exists() {
805 if let Ok(metadata) = tokio::fs::metadata(&TempDestination).await {
806 ExistingSize = metadata.len();
807 log::info!("[DownloadManager] Resuming download from {} bytes", ExistingSize);
808 }
809 }
810
811 let mut req = self.client.get(url).timeout(Duration::from_secs(config.TimeoutSecs));
813 if ExistingSize > 0 {
814 let RangeHeader = format!("bytes={}-", ExistingSize);
815 req = req.header(reqwest::header::RANGE, RangeHeader);
816 req = req.header(reqwest::header::IF_MATCH, "*"); }
818
819 let response = req
820 .send()
821 .await
822 .map_err(|e| AirError::Network(format!("Failed to start download: {}", e)))?;
823
824 let FinalUrl = response.url().clone();
826 let response = if FinalUrl.as_str() != url {
827 log::info!("[DownloadManager] Redirected to: {}", FinalUrl);
828 response
829 } else {
830 response
831 };
832
833 let StatusCode = response.status();
835 if !StatusCode.is_success() && StatusCode != reqwest::StatusCode::PARTIAL_CONTENT {
836 return Err(AirError::Network(format!("Download failed with status: {}", StatusCode)));
837 }
838
839 let TotalSize = if let Some(cl) = response.content_length() {
841 if StatusCode == reqwest::StatusCode::PARTIAL_CONTENT {
842 cl + ExistingSize
843 } else {
844 cl
845 }
846 } else {
847 0
848 };
849
850 if let Some(max_size) = config.MaxFileSize {
852 if TotalSize > 0 && TotalSize > max_size {
853 return Err(AirError::Network(format!(
854 "File too large: {} bytes exceeds maximum allowed size: {} bytes",
855 TotalSize, max_size
856 )));
857 }
858 }
859
860 let mut file = tokio::fs::OpenOptions::new()
862 .create(true)
863 .append(true)
864 .open(&TempDestination)
865 .await
866 .map_err(|e| AirError::FileSystem(format!("Failed to open destination file: {}", e)))?;
867
868 use tokio::io::AsyncWriteExt;
869 use futures_util::StreamExt;
870
871 let mut downloaded = ExistingSize;
872 let mut LastProgressUpdate = Instant::now();
873 let BytesStream = response.bytes_stream();
874
875 tokio::pin!(BytesStream);
876
877 while let Some(result) = BytesStream.next().await {
878 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
880 match status.status {
881 DownloadState::Cancelled => {
882 let _ = tokio::fs::remove_file(&TempDestination).await;
884 return Err(AirError::Network("Download cancelled".to_string()));
885 },
886 DownloadState::Paused => {
887 loop {
889 tokio::time::sleep(Duration::from_millis(250)).await;
890 if let Some(s) = self.GetDownloadStatus(DownloadId).await {
891 match s.status {
892 DownloadState::Paused => continue,
893 DownloadState::Cancelled => {
894 let _ = tokio::fs::remove_file(&TempDestination).await;
895 return Err(AirError::Network("Download cancelled".to_string()));
896 },
897 _ => {
898 log::info!("[DownloadManager] Resuming paused download [ID: {}]", DownloadId);
899 break;
900 },
901 }
902 } else {
903 break;
904 }
905 }
906 },
907 _ => {},
908 }
909 }
910
911 match result {
912 Ok(chunk) => {
913 let ChunkSize = chunk.len();
915 if let Ok(permit) = self.BandwidthLimiter.try_acquire_many((ChunkSize / (1024 * 1024) + 1) as u32) {
916 drop(permit);
917 } else {
918 tokio::time::sleep(Duration::from_millis(10)).await;
920 }
921
922 file.write_all(&chunk)
923 .await
924 .map_err(|e| AirError::FileSystem(format!("Failed to write file: {}", e)))?;
925
926 downloaded += ChunkSize as u64;
927
928 if LastProgressUpdate.elapsed() > Duration::from_millis(500) {
930 LastProgressUpdate = Instant::now();
931
932 if TotalSize > 0 {
933 let progress = (downloaded as f32 / TotalSize as f32) * 100.0;
934 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(progress), None)
935 .await?;
936 }
937
938 let rate = self.CalculateDownloadRate(DownloadId, downloaded).await;
940 self.UpdateDownloadRate(DownloadId, rate).await;
941 }
942 },
943 Err(e) => {
944 if e.is_timeout() || e.is_connect() {
946 log::warn!("[DownloadManager] Connection/timeout error, may retry: {}", e);
947 return Err(AirError::Network(format!("Network error: {}", e)));
948 }
949 return Err(AirError::Network(format!("Failed to read response: {}", e)));
950 },
951 }
952 }
953
954 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(100.0), None)
956 .await?;
957
958 file.flush()
960 .await
961 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
962
963 tokio::fs::rename(&TempDestination, destination)
965 .await
966 .map_err(|e| AirError::FileSystem(format!("Failed to commit download: {}", e)))?;
967
968 let checksum = self.CalculateChecksum(destination).await?;
970
971 self.UpdateActualChecksum(DownloadId, &checksum).await;
973
974 Ok(DownloadResult {
975 path:destination.to_string_lossy().to_string(),
976 size:downloaded,
977 checksum,
978 duration:Duration::from_secs(0),
979 AverageRate:0,
980 })
981 }
982
983 pub async fn VerifyChecksum(&self, FilePath:&PathBuf, ExpectedChecksum:&str) -> Result<()> {
985 if !FilePath.exists() {
987 return Err(AirError::FileSystem(format!(
988 "File not found for checksum verification: {}",
989 FilePath.display()
990 )));
991 }
992
993 let ActualChecksum = self.ChecksumVerifier.CalculateSha256(FilePath).await?;
994
995 let NormalizedExpected = ExpectedChecksum.trim().to_lowercase().replace("sha256:", "");
997 let NormalizedActual = ActualChecksum.trim().to_lowercase();
998
999 if NormalizedActual != NormalizedExpected {
1000 log::error!(
1001 "[DownloadManager] Checksum mismatch for {}: expected {}, got {}",
1002 FilePath.display(),
1003 NormalizedExpected,
1004 NormalizedActual
1005 );
1006 return Err(AirError::Network(format!(
1007 "Checksum verification failed: expected {}, got {}",
1008 NormalizedExpected, NormalizedActual
1009 )));
1010 }
1011
1012 log::info!("[DownloadManager] Checksum verified for file: {}", FilePath.display());
1013
1014 Ok(())
1015 }
1016
1017 pub async fn CalculateChecksum(&self, FilePath:&PathBuf) -> Result<String> {
1019 if !FilePath.exists() {
1021 return Err(AirError::FileSystem(format!(
1022 "File not found for checksum calculation: {}",
1023 FilePath.display()
1024 )));
1025 }
1026
1027 self.ChecksumVerifier.CalculateSha256(FilePath).await
1028 }
1029
1030 async fn RegisterDownload(
1032 &self,
1033 DownloadId:&str,
1034 url:&str,
1035 destination:&PathBuf,
1036 ExpectedChecksum:Option<String>,
1037 ) -> Result<()> {
1038 let mut downloads = self.ActiveDownloads.write().await;
1039 let mut stats = self.statistics.write().await;
1040
1041 stats.ActiveDownloads += 1;
1042
1043 downloads.insert(
1044 DownloadId.to_string(),
1045 DownloadStatus {
1046 DownloadId:DownloadId.to_string(),
1047 url:url.to_string(),
1048 destination:destination.clone(),
1049 TotalSize:0,
1050 downloaded:0,
1051 progress:0.0,
1052 status:DownloadState::Pending,
1053 error:None,
1054 StartedAt:Some(chrono::Utc::now()),
1055 CompletedAt:None,
1056 ChunksCompleted:0,
1057 TotalChunks:1,
1058 DownloadRateBytesPerSec:0,
1059 ExpectedChecksum:ExpectedChecksum.clone(),
1060 ActualChecksum:None,
1061 },
1062 );
1063
1064 Ok(())
1065 }
1066
1067 async fn UpdateDownloadStatus(
1069 &self,
1070 DownloadId:&str,
1071 status:DownloadState,
1072 progress:Option<f32>,
1073 error:Option<String>,
1074 ) -> Result<()> {
1075 let mut downloads = self.ActiveDownloads.write().await;
1076
1077 if let Some(download) = downloads.get_mut(DownloadId) {
1078 if status == DownloadState::Completed || status == DownloadState::Failed {
1079 download.CompletedAt = Some(chrono::Utc::now());
1080 }
1081 download.status = status;
1082 if let Some(progress) = progress {
1083 download.progress = progress;
1084 }
1085 download.error = error;
1086 }
1087
1088 Ok(())
1089 }
1090
1091 async fn UpdateDownloadRate(&self, DownloadId:&str, rate:u64) {
1093 let mut downloads = self.ActiveDownloads.write().await;
1094 if let Some(download) = downloads.get_mut(DownloadId) {
1095 download.DownloadRateBytesPerSec = rate;
1096 }
1097 }
1098
1099 async fn UpdateActualChecksum(&self, DownloadId:&str, checksum:&str) {
1101 let mut downloads = self.ActiveDownloads.write().await;
1102 if let Some(download) = downloads.get_mut(DownloadId) {
1103 download.ActualChecksum = Some(checksum.to_string());
1104 }
1105 }
1106
1107 async fn CalculateDownloadRate(&self, DownloadId:&str, CurrentBytes:u64) -> u64 {
1109 let downloads = self.ActiveDownloads.read().await;
1110 if let Some(download) = downloads.get(DownloadId) {
1111 if let Some(StartedAt) = download.StartedAt {
1112 let elapsed = chrono::Utc::now().signed_duration_since(StartedAt);
1113 let ElapsedSecs = elapsed.num_seconds() as u64;
1114 if ElapsedSecs > 0 {
1115 return CurrentBytes / ElapsedSecs;
1116 }
1117 }
1118 }
1119 0
1120 }
1121
1122 async fn UpdateStatistics(&self, success:bool, bytes:u64, duration:Duration) {
1124 let mut stats = self.statistics.write().await;
1125
1126 if success {
1127 stats.SuccessfulDownloads += 1;
1128 stats.TotalBytesDownloaded += bytes;
1129 stats.TotalDownloadTimeSecs += duration.as_secs_f64();
1130
1131 if stats.TotalDownloadTimeSecs > 0.0 {
1132 stats.AverageDownloadRate = stats.TotalBytesDownloaded as f64 / stats.TotalDownloadTimeSecs
1133 }
1134
1135 let CurrentRate = if duration.as_secs_f64() > 0.0 {
1137 (bytes as f64 / duration.as_secs_f64()) as u64
1138 } else {
1139 0
1140 };
1141 if CurrentRate > stats.PeakDownloadRate {
1142 stats.PeakDownloadRate = CurrentRate;
1143 }
1144 } else {
1145 stats.FailedDownloads += 1;
1146 }
1147
1148 stats.TotalDownloads += 1;
1149 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1150 }
1151
1152 pub async fn GetDownloadStatus(&self, DownloadId:&str) -> Option<DownloadStatus> {
1154 let downloads = self.ActiveDownloads.read().await;
1155 downloads.get(DownloadId).cloned()
1156 }
1157
1158 pub async fn GetAllDownloads(&self) -> Vec<DownloadStatus> {
1160 let downloads = self.ActiveDownloads.read().await;
1161 downloads.values().cloned().collect()
1162 }
1163
1164 pub async fn CancelDownload(&self, DownloadId:&str) -> Result<()> {
1166 log::info!("[DownloadManager] Cancelling download [ID: {}]", DownloadId);
1167
1168 self.UpdateDownloadStatus(DownloadId, DownloadState::Cancelled, None, None)
1169 .await?;
1170
1171 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1173 let TempPath = status.destination.with_extension("tmp");
1174 if TempPath.exists() {
1175 let _ = tokio::fs::remove_file(&TempPath).await;
1176 }
1177 }
1178
1179 {
1181 let mut stats = self.statistics.write().await;
1182 stats.CancelledDownloads += 1;
1183 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1184 }
1185
1186 Ok(())
1187 }
1188
1189 pub async fn PauseDownload(&self, DownloadId:&str) -> Result<()> {
1191 self.UpdateDownloadStatus(DownloadId, DownloadState::Paused, None, None).await?;
1192 log::info!("[DownloadManager] Download paused [ID: {}]", DownloadId);
1193 Ok(())
1194 }
1195
1196 pub async fn ResumeDownload(&self, DownloadId:&str) -> Result<()> {
1198 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1199 if status.status == DownloadState::Paused {
1200 self.UpdateDownloadStatus(DownloadId, DownloadState::Resuming, None, None)
1201 .await?;
1202 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, None, None)
1204 .await?;
1205 log::info!("[DownloadManager] Download resumed [ID: {}]", DownloadId);
1206 } else {
1207 return Err(AirError::Network("Can only resume paused downloads".to_string()));
1208 }
1209 } else {
1210 return Err(AirError::Network("Download not found".to_string()));
1211 }
1212 Ok(())
1213 }
1214
1215 pub async fn GetActiveDownloadCount(&self) -> usize {
1217 let downloads = self.ActiveDownloads.read().await;
1218 downloads
1219 .iter()
1220 .filter(|(_, s)| {
1221 matches!(
1222 s.status,
1223 DownloadState::Downloading | DownloadState::Verifying | DownloadState::Resuming
1224 )
1225 })
1226 .count()
1227 }
1228
1229 pub async fn GetStatistics(&self) -> DownloadStatistics {
1231 let stats = self.statistics.read().await;
1232 stats.clone()
1233 }
1234
1235 pub async fn QueueDownload(
1237 &self,
1238 url:String,
1239 destination:String,
1240 checksum:String,
1241 priority:DownloadPriority,
1242 ) -> Result<String> {
1243 let DownloadId = Utility::GenerateRequestId();
1244
1245 let destination = if destination.is_empty() {
1246 let filename = url.split('/').last().unwrap_or("download.bin");
1247 self.CacheDirectory.join(filename)
1248 } else {
1249 ConfigurationManager::ExpandPath(&destination)?
1250 };
1251
1252 let queued_download = QueuedDownload {
1253 DownloadId:DownloadId.clone(),
1254 url,
1255 destination,
1256 checksum,
1257 priority,
1258 AddedAt:chrono::Utc::now(),
1259 MaxFileSize:None,
1260 ValidateDiskSpace:true,
1261 };
1262
1263 let mut queue = self.DownloadQueue.write().await;
1264 queue.push_back(queued_download);
1265
1266 queue.make_contiguous().sort_by(|a, b| {
1268 match b.priority.cmp(&a.priority) {
1269 std::cmp::Ordering::Equal => {
1270 a.AddedAt.cmp(&b.AddedAt)
1272 },
1273 order => order,
1274 }
1275 });
1276
1277 {
1278 let mut stats = self.statistics.write().await;
1279 stats.QueuedDownloads += 1;
1280 }
1281
1282 log::info!(
1283 "[DownloadManager] Download queued [ID: {}] with priority {:?}",
1284 DownloadId,
1285 priority
1286 );
1287
1288 Ok(DownloadId)
1289 }
1290
1291 pub async fn ProcessQueue(&self) -> Result<Option<String>> {
1293 let mut queue = self.DownloadQueue.write().await;
1294
1295 if let Some(queued) = queue.pop_front() {
1296 let download_id = queued.DownloadId.clone();
1297 drop(queue); let config = DownloadConfig {
1300 url:queued.url.clone(),
1301 destination:queued.destination.to_string_lossy().to_string(),
1302 checksum:queued.checksum.clone(),
1303 priority:queued.priority,
1304 MaxFileSize:queued.MaxFileSize,
1305 ValidateDiskSpace:queued.ValidateDiskSpace,
1306 ..Default::default()
1307 };
1308
1309 {
1310 let mut stats = self.statistics.write().await;
1311 stats.QueuedDownloads = stats.QueuedDownloads.saturating_sub(1);
1312 }
1313
1314 let manager = self.clone();
1316 let download_id_clone = download_id.clone();
1317 tokio::spawn(async move {
1318 if let Err(e) = manager.DownloadFileWithConfig(config).await {
1319 log::error!("[DownloadManager] Queued download failed [ID: {}]: {}", download_id_clone, e);
1320 let _ = manager
1322 .UpdateDownloadStatus(&download_id_clone, DownloadState::Failed, None, Some(e.to_string()))
1323 .await;
1324 }
1325 });
1326
1327 Ok(Some(download_id))
1328 } else {
1329 Ok(None)
1330 }
1331 }
1332
1333 pub async fn StartBackgroundTasks(&self) -> Result<tokio::task::JoinHandle<()>> {
1335 let manager = self.clone();
1336
1337 let handle = tokio::spawn(async move {
1338 manager.BackgroundTaskLoop().await;
1339 });
1340
1341 log::info!("[DownloadManager] Background tasks started");
1342
1343 Ok(handle)
1344 }
1345
1346 async fn BackgroundTaskLoop(&self) {
1348 let mut interval = tokio::time::interval(Duration::from_secs(60));
1349
1350 loop {
1351 interval.tick().await;
1352
1353 if let Err(e) = self.ProcessQueue().await {
1355 log::error!("[DownloadManager] Queue processing error: {}", e);
1356 }
1357
1358 self.CleanupCompletedDownloads().await;
1360
1361 if let Err(e) = self.CleanupCache().await {
1363 log::error!("[DownloadManager] Cache cleanup failed: {}", e);
1364 }
1365 }
1366 }
1367
1368 async fn CleanupCompletedDownloads(&self) {
1370 let mut downloads = self.ActiveDownloads.write().await;
1371
1372 let mut cleaned_count = 0;
1373 downloads.retain(|_, download| {
1374 let is_final = matches!(
1375 download.status,
1376 DownloadState::Completed | DownloadState::Failed | DownloadState::Cancelled
1377 );
1378 if is_final {
1379 cleaned_count += 1;
1380 }
1381 !is_final
1382 });
1383
1384 if cleaned_count > 0 {
1385 log::debug!("[DownloadManager] Cleaned up {} completed downloads", cleaned_count);
1386 }
1387 }
1388
1389 async fn CleanupCache(&self) -> Result<()> {
1391 let max_age_days = 7;
1392 let now = chrono::Utc::now();
1393
1394 let mut entries = tokio::fs::read_dir(&self.CacheDirectory)
1395 .await
1396 .map_err(|e| AirError::FileSystem(format!("Failed to read cache directory: {}", e)))?;
1397
1398 let mut cleaned_count = 0;
1399
1400 while let Some(entry) = entries
1401 .next_entry()
1402 .await
1403 .map_err(|e| AirError::FileSystem(format!("Failed to read cache entry: {}", e)))?
1404 {
1405 let metadata = entry
1406 .metadata()
1407 .await
1408 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
1409
1410 if metadata.is_file() {
1411 let path = entry.path();
1412
1413 let IsActive = {
1415 let downloads = self.ActiveDownloads.read().await;
1416 downloads.values().any(|d| d.destination == path)
1417 };
1418
1419 if IsActive {
1420 continue;
1421 }
1422
1423 let modified = metadata
1424 .modified()
1425 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
1426
1427 let modified_time = chrono::DateTime::<chrono::Utc>::from(modified);
1428 let age = now.signed_duration_since(modified_time);
1429
1430 if age.num_days() > max_age_days {
1431 match tokio::fs::remove_file(&path).await {
1432 Ok(_) => {
1433 log::debug!(
1434 "[DownloadManager] Removed old cache file: {}",
1435 entry.file_name().to_string_lossy()
1436 );
1437 cleaned_count += 1;
1438 },
1439 Err(e) => {
1440 log::warn!(
1441 "[DownloadManager] Failed to remove cache file {}: {}",
1442 entry.file_name().to_string_lossy(),
1443 e
1444 );
1445 },
1446 }
1447 }
1448 }
1449 }
1450
1451 if cleaned_count > 0 {
1452 log::info!("[DownloadManager] Cleaned up {} old cache files", cleaned_count);
1453 }
1454
1455 Ok(())
1456 }
1457
1458 pub async fn StopBackgroundTasks(&self) {
1460 log::info!("[DownloadManager] Stopping background tasks");
1461
1462 let ids_to_cancel:Vec<String> = {
1464 let downloads = self.ActiveDownloads.read().await;
1465 downloads
1466 .iter()
1467 .filter(|(_, s)| matches!(s.status, DownloadState::Downloading))
1468 .map(|(id, _)| id.clone())
1469 .collect()
1470 };
1471
1472 for id in ids_to_cancel {
1474 let _ = self.CancelDownload(&id).await;
1475 }
1476
1477 let _ = self
1479 .AppState
1480 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Stopped)
1481 .await;
1482 }
1483
1484 pub async fn SetBandwidthLimit(&mut self, mb_per_sec:usize) {
1489 let permits = mb_per_sec.max(1).min(1000);
1491 self.BandwidthLimiter = Arc::new(Semaphore::new(permits));
1492 log::info!("[DownloadManager] Bandwidth limit set to {} MB/s", mb_per_sec);
1493 }
1494
1495 pub async fn SetMaxConcurrentDownloads(&mut self, max:usize) {
1499 let permits = max.max(1).min(20);
1500 self.ConcurrentLimiter = Arc::new(Semaphore::new(permits));
1501 log::info!("[DownloadManager] Max concurrent downloads set to {}", max);
1502 }
1503}
1504
1505impl Clone for DownloadManager {
1506 fn clone(&self) -> Self {
1507 Self {
1508 AppState:self.AppState.clone(),
1509 ActiveDownloads:self.ActiveDownloads.clone(),
1510 DownloadQueue:self.DownloadQueue.clone(),
1511 CacheDirectory:self.CacheDirectory.clone(),
1512 client:self.client.clone(),
1513 ChecksumVerifier:self.ChecksumVerifier.clone(),
1514 BandwidthLimiter:self.BandwidthLimiter.clone(),
1515 ConcurrentLimiter:self.ConcurrentLimiter.clone(),
1516 statistics:self.statistics.clone(),
1517 }
1518 }
1519}
1520
1521impl Default for DownloadStatistics {
1522 fn default() -> Self {
1523 Self {
1524 TotalDownloads:0,
1525 SuccessfulDownloads:0,
1526 FailedDownloads:0,
1527 CancelledDownloads:0,
1528 TotalBytesDownloaded:0,
1529 TotalDownloadTimeSecs:0.0,
1530 AverageDownloadRate:0.0,
1531 PeakDownloadRate:0,
1532 ActiveDownloads:0,
1533 QueuedDownloads:0,
1534 }
1535 }
1536}
1537
1538fn ExpectedChecksumFromConfig(config:&DownloadConfig) -> Option<&str> {
1540 if config.checksum.is_empty() { None } else { Some(&config.checksum) }
1541}
1542
1543#[derive(Debug, Clone)]
1545struct ChunkInfo {
1546 start:u64,
1547 end:u64,
1548 downloaded:u64,
1549 temp_path:PathBuf,
1550}
1551
1552#[derive(Debug)]
1554struct ParallelDownloadResult {
1555 chunks:Vec<ChunkInfo>,
1556 total_size:u64,
1557}
1558
1559impl DownloadManager {
1596 pub async fn DownloadFileWithChunks(
1606 &self,
1607 url:String,
1608 destination:String,
1609 checksum:String,
1610 chunk_size_mb:usize,
1611 ) -> Result<DownloadResult> {
1612 log::info!(
1613 "[DownloadManager] Starting chunked download - URL: {}, Chunk size: {} MB",
1614 url,
1615 chunk_size_mb
1616 );
1617
1618 let sanitized_url = Self::ValidateAndSanitizeUrl(&url)?;
1620
1621 let total_size = self.GetRemoteFileSize(&sanitized_url).await?;
1623
1624 log::info!("[DownloadManager] Remote file size: {} bytes", total_size);
1625
1626 let chunk_threshold = 50 * 1024 * 1024; if total_size < chunk_threshold {
1629 log::info!("[DownloadManager] File too small for chunked download, using normal download");
1630 return self.DownloadFile(url, destination, checksum).await;
1631 }
1632
1633 let chunk_size = (chunk_size_mb * 1024 * 1024) as u64;
1635 let num_chunks = ((total_size + chunk_size - 1) / chunk_size) as usize;
1636 let num_concurrent = num_chunks.min(4); log::info!(
1639 "[DownloadManager] Downloading in {} chunks ({} concurrent)",
1640 num_chunks,
1641 num_concurrent
1642 );
1643
1644 let DownloadId = Utility::GenerateRequestId();
1645 let DestinationPath = if destination.is_empty() {
1646 let filename = sanitized_url.split('/').last().unwrap_or("download.bin");
1647 self.CacheDirectory.join(filename)
1648 } else {
1649 ConfigurationManager::ExpandPath(&destination)?
1650 };
1651
1652 let temp_dir = DestinationPath.with_extension("chunks");
1654 tokio::fs::create_dir_all(&temp_dir)
1655 .await
1656 .map_err(|e| AirError::FileSystem(format!("Failed to create temp directory: {}", e)))?;
1657
1658 let mut chunks = Vec::with_capacity(num_chunks);
1660 for i in 0..num_chunks {
1661 let start = (i as u64) * chunk_size;
1662 let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
1663
1664 chunks.push(ChunkInfo { start, end, downloaded:0, temp_path:temp_dir.join(format!("chunk_{:04}", i)) });
1665 }
1666
1667 let downloaded_tracker = Arc::new(RwLock::new(0u64));
1669 let completed_tracker = Arc::new(RwLock::new(0usize));
1670
1671 let mut handles = Vec::new();
1673 for (i, chunk) in chunks.iter().enumerate() {
1674 let manager = self.clone();
1675 let url_clone = sanitized_url.clone();
1676 let chunk_clone = chunk.clone();
1677 let downloaded_tracker = downloaded_tracker.clone();
1678 let completed_tracker = completed_tracker.clone();
1679 let _Did = DownloadId.clone();
1680
1681 let handle = tokio::spawn(async move {
1682 manager.DownloadChunk(&url_clone, &chunk_clone, i).await?;
1683
1684 {
1686 let mut downloaded = downloaded_tracker.write().await;
1687 let mut completed = completed_tracker.write().await;
1688 *downloaded += chunk_clone.end - chunk_clone.start + 1;
1689 *completed += 1;
1690
1691 let progress = (*downloaded as f32 / total_size as f32) * 100.0;
1692 log::info!(
1693 "Chunk {} completed ({}/{}) - Progress: {:.1}%",
1694 i + 1,
1695 *completed,
1696 num_chunks,
1697 progress
1698 );
1699 }
1700
1701 Ok::<_, AirError>(())
1702 });
1703
1704 if (i + 1) % num_concurrent == 0 {
1706 for handle in handles.drain(..) {
1707 handle.await??;
1708 }
1709 }
1710
1711 handles.push(handle);
1712 }
1713
1714 for handle in handles {
1716 handle.await??;
1717 }
1718
1719 log::info!("[DownloadManager] Reassembling chunks into final file");
1721 self.ReassembleChunks(&chunks, &DestinationPath).await?;
1722
1723 tokio::fs::remove_dir_all(&temp_dir).await.map_err(|e| {
1725 log::warn!("[DownloadManager] Failed to clean up temp directory: {}", e);
1726 AirError::FileSystem(e.to_string())
1727 })?;
1728
1729 if !checksum.is_empty() {
1731 self.VerifyChecksum(&DestinationPath, &checksum).await?;
1732 }
1733
1734 let actual_checksum = self.CalculateChecksum(&DestinationPath).await?;
1735
1736 log::info!("[DownloadManager] Chunked download completed successfully");
1737
1738 Ok(DownloadResult {
1739 path:DestinationPath.to_string_lossy().to_string(),
1740 size:total_size,
1741 checksum:actual_checksum,
1742 duration:Duration::from_secs(0),
1743 AverageRate:0,
1744 })
1745 }
1746
1747 async fn GetRemoteFileSize(&self, url:&str) -> Result<u64> {
1749 let response = self
1750 .client
1751 .head(url)
1752 .timeout(Duration::from_secs(30))
1753 .send()
1754 .await
1755 .map_err(|e| AirError::Network(format!("Failed to get file size: {}", e)))?;
1756
1757 if !response.status().is_success() {
1758 return Err(AirError::Network(format!("Failed to get file size: {}", response.status())));
1759 }
1760
1761 response
1762 .content_length()
1763 .ok_or_else(|| AirError::Network("Content-Length header not found".to_string()))
1764 }
1765
1766 async fn DownloadChunk(&self, url:&str, chunk:&ChunkInfo, chunk_index:usize) -> Result<()> {
1768 log::debug!(
1769 "[DownloadManager] Downloading chunk {} (bytes {}-{})",
1770 chunk_index,
1771 chunk.start,
1772 chunk.end
1773 );
1774
1775 let range_header = format!("bytes={}-{}", chunk.start, chunk.end);
1776
1777 let response = self
1778 .client
1779 .get(url)
1780 .header(reqwest::header::RANGE, range_header)
1781 .timeout(Duration::from_secs(300))
1782 .send()
1783 .await
1784 .map_err(|e| AirError::Network(format!("Failed to start chunk download: {}", e)))?;
1785
1786 if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
1787 return Err(AirError::Network(format!(
1788 "Chunk download failed with status: {}",
1789 response.status()
1790 )));
1791 }
1792
1793 let bytes = response
1795 .bytes()
1796 .await
1797 .map_err(|e| AirError::Network(format!("Failed to read chunk bytes: {}", e)))?;
1798
1799 tokio::fs::write(&chunk.temp_path, &bytes)
1800 .await
1801 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk: {}", e)))?;
1802
1803 log::debug!("[DownloadManager] Chunk {} downloaded: {} bytes", chunk_index, bytes.len());
1804
1805 Ok(())
1806 }
1807
1808 async fn ReassembleChunks(&self, chunks:&[ChunkInfo], destination:&Path) -> Result<()> {
1810 use tokio::io::AsyncWriteExt;
1811
1812 let mut file = tokio::fs::File::create(destination)
1813 .await
1814 .map_err(|e| AirError::FileSystem(format!("Failed to create destination file: {}", e)))?;
1815
1816 let mut sorted_chunks:Vec<_> = chunks.iter().collect();
1818 sorted_chunks.sort_by_key(|c| c.start);
1819
1820 for chunk in sorted_chunks {
1821 let contents = tokio::fs::read(&chunk.temp_path)
1822 .await
1823 .map_err(|e| AirError::FileSystem(format!("Failed to read chunk: {}", e)))?;
1824
1825 file.write_all(&contents)
1826 .await
1827 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk to file: {}", e)))?;
1828
1829 log::debug!("[DownloadManager] Reassembled chunk (bytes {}-{})", chunk.start, chunk.end);
1830 }
1831
1832 file.flush()
1833 .await
1834 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1835
1836 log::info!("[DownloadManager] All chunks reassembled successfully");
1837
1838 Ok(())
1839 }
1840}