@@ -320,7 +320,7 @@ async fn upload_file_raw(
320320 bw_limit_kbps : u64 ,
321321 progress : & Progress ,
322322) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
323- use std :: io :: Read ;
323+
324324
325325 let meta = std:: fs:: metadata ( local_path) ?;
326326 let size = meta. len ( ) ;
@@ -347,7 +347,7 @@ async fn upload_file_raw(
347347 . map_err ( |e| format ! ( "failed to write header: {e}" ) ) ?;
348348
349349 // Stream raw file data
350- let mut file = std :: fs:: File :: open ( local_path) ?;
350+ let mut file = tokio :: fs:: File :: open ( local_path) . await ?;
351351 let mut buf = vec ! [ 0u8 ; RAW_CHUNK_SIZE ] ;
352352 let bytes_per_tick = if bw_limit_kbps > 0 {
353353 bw_limit_kbps * 1024 / 10
@@ -357,8 +357,9 @@ async fn upload_file_raw(
357357 let mut sent_this_tick: u64 = 0 ;
358358 let mut tick_start = Instant :: now ( ) ;
359359
360+ use tokio:: io:: AsyncReadExt ;
360361 loop {
361- let n = file. read ( & mut buf) ?;
362+ let n = file. read ( & mut buf) . await ?;
362363 if n == 0 {
363364 break ;
364365 }
@@ -555,51 +556,53 @@ async fn download(
555556 }
556557
557558 let file_entries: Vec < _ > = entries. iter ( ) . filter ( |e| !e. is_dir ) . cloned ( ) . collect ( ) ;
559+ let total_files = file_entries. len ( ) ;
558560 let total_bytes: u64 = file_entries. iter ( ) . map ( |e| e. size ) . sum ( ) ;
559- let progress = Arc :: new ( Progress :: new ( file_entries . len ( ) , cli. quiet ) ) ;
561+ let progress = Arc :: new ( Progress :: new ( total_files , cli. quiet ) ) ;
560562 progress. set_total_bytes ( total_bytes) ;
561563
562- // Server will send uni streams for each file
564+ // Accept streams as they arrive from server, process up to j concurrently
563565 let sem = Arc :: new ( Semaphore :: new ( cli. jobs ) ) ;
564566 let mut handles = Vec :: new ( ) ;
567+ let mut received = 0 ;
568+
569+ while received < total_files {
570+ let mut recv = conn. accept_uni ( ) . await
571+ . map_err ( |e| format ! ( "failed to accept download stream: {e}" ) ) ?;
572+
573+ // Read type byte
574+ let mut tb = [ 0u8 ; 1 ] ;
575+ recv. read_exact ( & mut tb) . await
576+ . map_err ( |e| format ! ( "failed to read stream type: {e}" ) ) ?;
577+
578+ if tb[ 0 ] != RAW_DOWNLOAD_DATA {
579+ return Err ( format ! ( "unexpected stream type: {:#x}" , tb[ 0 ] ) . into ( ) ) ;
580+ }
581+
582+ let header = RawFileHeader :: decode ( & mut recv) . await ?;
583+ let local_path = dest. join ( & header. path ) ;
584+ let file_path = header. path . clone ( ) ;
565585
566- for entry in file_entries. into_iter ( ) {
567586 let sem = sem. clone ( ) ;
568- let conn = conn. clone ( ) ;
569- let dest = dest. clone ( ) ;
570587 let progress = progress. clone ( ) ;
571588 let preserve = cli. preserve ;
572589 let bw_limit = cli. limit ;
573590
574591 let handle = tokio:: spawn ( async move {
575592 let _permit = sem. acquire ( ) . await . unwrap ( ) ;
576593
577- // Accept uni stream from server
578- let mut recv = conn. accept_uni ( ) . await
579- . map_err ( |e| format ! ( "failed to accept download stream: {e}" ) ) ?;
580-
581- // Read type byte
582- let mut tb = [ 0u8 ; 1 ] ;
583- recv. read_exact ( & mut tb) . await
584- . map_err ( |e| format ! ( "failed to read stream type: {e}" ) ) ?;
585-
586- if tb[ 0 ] != RAW_DOWNLOAD_DATA {
587- return Err ( format ! ( "unexpected stream type: {:#x}" , tb[ 0 ] ) . into ( ) ) ;
588- }
589-
590- let header = RawFileHeader :: decode ( & mut recv) . await ?;
591- let local_path = dest. join ( & header. path ) ;
592594 if let Some ( parent) = local_path. parent ( ) {
593595 std:: fs:: create_dir_all ( parent) . ok ( ) ;
594596 }
595597
596598 download_file_raw ( & mut recv, & local_path, & header, preserve, bw_limit, & progress) . await ?;
597- progress. file_done ( & entry . path ) ;
599+ progress. file_done ( & file_path ) ;
598600
599601 Ok :: < ( ) , Box < dyn std:: error:: Error + Send + Sync > > ( ( ) )
600602 } ) ;
601603
602604 handles. push ( handle) ;
605+ received += 1 ;
603606 }
604607
605608 let mut errors = 0 ;
@@ -763,14 +766,14 @@ async fn download_file_raw(
763766 bw_limit_kbps : u64 ,
764767 progress : & Progress ,
765768) -> Result < ( ) , Box < dyn std:: error:: Error + Send + Sync > > {
766- use std:: io:: Write ;
767769 use std:: os:: unix:: fs:: PermissionsExt ;
770+ use tokio:: io:: AsyncWriteExt ;
768771
769772 if let Some ( parent) = path. parent ( ) {
770- std :: fs:: create_dir_all ( parent) . ok ( ) ;
773+ tokio :: fs:: create_dir_all ( parent) . await . ok ( ) ;
771774 }
772775
773- let mut file = std :: fs:: File :: create ( path) ?;
776+ let mut file = tokio :: fs:: File :: create ( path) . await ?;
774777 let mut written: u64 = 0 ;
775778 let mut buf = vec ! [ 0u8 ; RAW_CHUNK_SIZE ] ;
776779 let bytes_per_tick = if bw_limit_kbps > 0 {
@@ -784,7 +787,7 @@ async fn download_file_raw(
784787 loop {
785788 match recv. read ( & mut buf) . await {
786789 Ok ( Some ( n) ) => {
787- file. write_all ( & buf[ ..n] ) ?;
790+ file. write_all ( & buf[ ..n] ) . await ?;
788791 written += n as u64 ;
789792 progress. add_transferred ( n as u64 ) ;
790793
@@ -806,7 +809,7 @@ async fn download_file_raw(
806809 }
807810 }
808811
809- file. flush ( ) ?;
812+ file. flush ( ) . await ?;
810813 drop ( file) ;
811814
812815 if written != header. size {
0 commit comments