@@ -16,6 +16,18 @@ use streamkit_core::types::Packet;
1616use streamkit_core:: NodeContext ;
1717use tokio:: sync:: mpsc;
1818
19+ /// Await a codec task and log if it panicked. Returns `true` when the
20+ /// task panicked (caller should skip draining).
21+ async fn finish_codec_task ( codec_task : tokio:: task:: JoinHandle < ( ) > , label : & str ) -> bool {
22+ match codec_task. await {
23+ Err ( e) if e. is_panic ( ) => {
24+ tracing:: error!( "{label} codec task panicked: {e:?}" ) ;
25+ true
26+ } ,
27+ _ => false ,
28+ }
29+ }
30+
1931/// Shared select-loop that forwards codec results to the output sender.
2032///
2133/// Handles three concurrent events:
@@ -87,10 +99,6 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(
8799 Some ( control_msg) = context. control_rx. recv( ) => {
88100 if matches!( control_msg, streamkit_core:: control:: NodeControlMessage :: Shutdown ) {
89101 tracing:: info!( "{label} received shutdown signal" ) ;
90- // NOTE: Dropping codec_tx first signals the codec thread to
91- // exit/flush, then aborting ensures it doesn't linger.
92- // Because we break out here, flushed results are never sent
93- // downstream. Data loss on explicit shutdown is acceptable.
94102 input_task. abort( ) ;
95103 drop( codec_tx) ;
96104 codec_task. abort( ) ;
@@ -113,33 +121,22 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(
113121 // may close their channels before all results are forwarded —
114122 // causing zero-byte output on fast pipelines.
115123 tracing:: debug!( "{label} waiting for codec task to finish before drain" ) ;
116- match codec_task. await {
117- Err ( e) if e. is_panic ( ) => {
118- tracing:: error!( "{label} codec task panicked: {e:?}" ) ;
119- } ,
120- _ => {
121- let mut drained = 0u64 ;
122- while let Some ( maybe_result) = result_rx. recv ( ) . await {
123- match maybe_result {
124- Ok ( item) => {
125- drained += 1 ;
126- if forward_one ( to_packet ( item) , context, counter, stats) . await {
127- break ;
128- }
129- } ,
130- Err ( err) => handle_error ( & err, counter, stats, label) ,
131- }
124+ if !finish_codec_task ( codec_task, label) . await {
125+ let mut drained = 0u64 ;
126+ while let Some ( maybe_result) = result_rx. recv ( ) . await {
127+ match maybe_result {
128+ Ok ( item) => {
129+ drained += 1 ;
130+ if forward_one ( to_packet ( item) , context, counter, stats) . await {
131+ break ;
132+ }
133+ } ,
134+ Err ( err) => handle_error ( & err, counter, stats, label) ,
132135 }
133- tracing :: debug! ( "{label} drain complete: forwarded {drained} result(s)" ) ;
134- } ,
136+ }
137+ tracing :: debug! ( "{label} drain complete: forwarded {drained} result(s)" ) ;
135138 }
136139 } else {
137- codec_task. abort ( ) ;
138- match codec_task. await {
139- Err ( e) if e. is_panic ( ) => {
140- tracing:: error!( "{label} codec task panicked: {e:?}" ) ;
141- } ,
142- _ => { } ,
143- }
140+ finish_codec_task ( codec_task, label) . await ;
144141 }
145142}
0 commit comments