From d80007130a145c502a61959f82bfbd5ed5673bb4 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Fri, 23 Jan 2026 10:57:29 +0000 Subject: [PATCH 1/2] chore: fix clippy. Signed-off-by: Klaus Ma --- common/src/lib.rs | 12 +- cri/build.rs | 14 +++ object_cache/src/cache.rs | 170 +++++++++++++-------------- sdk/rust/build.rs | 2 + session_manager/src/main.rs | 2 + session_manager/src/scheduler/mod.rs | 17 +-- stdng/src/collections/bin_heap.rs | 2 +- 7 files changed, 117 insertions(+), 102 deletions(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index abf0ca0e..95fbdfff 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -65,9 +65,15 @@ impl From for FlameError { impl From for Status { fn from(value: FlameError) -> Self { match value { - FlameError::NotFound(s) => Status::not_found(s), - FlameError::Internal(s) => Status::internal(s), - _ => Status::unknown(value.to_string()), + FlameError::NotFound(msg) => Status::not_found(msg), + FlameError::InvalidConfig(msg) | FlameError::InvalidState(msg) => { + Status::invalid_argument(msg) + } + FlameError::Internal(msg) + | FlameError::Network(msg) + | FlameError::Uninitialized(msg) + | FlameError::Storage(msg) + | FlameError::VersionMismatch(msg) => Status::internal(msg), } } } diff --git a/cri/build.rs b/cri/build.rs index 51a291f6..92ca6416 100644 --- a/cri/build.rs +++ b/cri/build.rs @@ -13,6 +13,20 @@ limitations under the License. fn main() -> Result<(), Box> { tonic_build::configure() + .type_attribute(".", "#[allow(clippy::doc_overindented_list_items)]") + .type_attribute(".", "#[allow(clippy::doc_lazy_continuation)]") + .type_attribute( + "runtime.v1.MountPropagation", + "#[allow(clippy::enum_variant_names)]", + ) + .type_attribute( + "runtime.v1.ContainerState", + "#[allow(clippy::enum_variant_names)]", + ) + .type_attribute( + "runtime.v1.ContainerEventType", + "#[allow(clippy::enum_variant_names)]", + ) .protoc_arg("--experimental_allow_proto3_optional") .compile_protos(&["protos/cri.proto"], &["protos"])?; diff --git a/object_cache/src/cache.rs b/object_cache/src/cache.rs index 842263a3..9fd50355 100644 --- a/object_cache/src/cache.rs +++ b/object_cache/src/cache.rs @@ -427,15 +427,17 @@ impl FlightCacheServer { } } - fn extract_schema_from_flight_data(flight_data: &FlightData) -> Result, Status> { + fn extract_schema_from_flight_data( + flight_data: &FlightData, + ) -> Result, FlameError> { use arrow::ipc::root_as_message; let message = root_as_message(&flight_data.data_header) - .map_err(|e| Status::internal(format!("Failed to parse IPC message: {}", e)))?; + .map_err(|e| FlameError::Internal(format!("Failed to parse IPC message: {}", e)))?; let ipc_schema = message .header_as_schema() - .ok_or_else(|| Status::internal("Message is not a schema"))?; + .ok_or_else(|| FlameError::Internal("Message is not a schema".to_string()))?; let decoded_schema = arrow::ipc::convert::fb_to_schema(ipc_schema); Ok(Arc::new(decoded_schema)) @@ -444,24 +446,28 @@ impl FlightCacheServer { fn decode_batch_from_flight_data( flight_data: &FlightData, schema: &Arc, - ) -> Result { + ) -> Result { arrow_flight::utils::flight_data_to_arrow_batch( flight_data, schema.clone(), &Default::default(), ) - .map_err(|e| Status::internal(format!("Failed to decode batch: {}", e))) + .map_err(|e| FlameError::Internal(format!("Failed to decode batch: {}", e))) } async fn collect_batches_from_stream( mut stream: Streaming, - ) -> Result<(String, Option, Vec), Status> { + ) -> Result<(String, Option, Vec), FlameError> { let mut batches = Vec::new(); let mut session_id: Option = None; let mut object_id: Option = None; let mut schema: Option> = None; - while let Some(flight_data) = stream.message().await? { + while let Some(flight_data) = stream + .message() + .await + .map_err(|e| FlameError::Internal(format!("Stream error: {}", e)))? + { Self::extract_session_and_object_id(&flight_data, &mut session_id, &mut object_id); // Extract schema from data_header in first message @@ -479,29 +485,29 @@ impl FlightCacheServer { } if batches.is_empty() { - return Err(Status::invalid_argument("No data received")); + return Err(FlameError::InvalidState("No data received".to_string())); } let session_id = session_id.ok_or_else(|| { - Status::invalid_argument( - "session_id must be provided in app_metadata as 'session_id:{id}'", + FlameError::InvalidState( + "session_id must be provided in app_metadata as 'session_id:{id}'".to_string(), ) })?; Ok((session_id, object_id, batches)) } - fn combine_batches(batches: Vec) -> Result { + fn combine_batches(batches: Vec) -> Result { if batches.len() == 1 { Ok(batches.into_iter().next().unwrap()) } else { let schema = batches[0].schema(); concat_batches(&schema, &batches) - .map_err(|e| Status::internal(format!("Failed to concatenate batches: {}", e))) + .map_err(|e| FlameError::Internal(format!("Failed to concatenate batches: {}", e))) } } - fn create_put_result(metadata: &ObjectMetadata) -> Result { + fn create_put_result(metadata: &ObjectMetadata) -> Result { let object_ref = bson::doc! { "endpoint": &metadata.endpoint, "key": &metadata.key, @@ -510,7 +516,7 @@ impl FlightCacheServer { let mut bson_bytes = Vec::new(); object_ref.to_writer(&mut bson_bytes).map_err(|e| { - Status::internal(format!("Failed to serialize ObjectRef to BSON: {}", e)) + FlameError::Internal(format!("Failed to serialize ObjectRef to BSON: {}", e)) })?; Ok(PutResult { @@ -518,75 +524,65 @@ impl FlightCacheServer { }) } - async fn handle_put_action(&self, action_body: &str) -> Result { + async fn handle_put_action(&self, action_body: &str) -> Result { let (session_id_str, data_b64) = action_body .split_once(':') - .ok_or_else(|| Status::invalid_argument("Invalid PUT action format"))?; + .ok_or_else(|| FlameError::InvalidState("Invalid PUT action format".to_string()))?; let session_id = session_id_str.to_string(); let data = base64::engine::general_purpose::STANDARD .decode(data_b64) - .map_err(|e| Status::invalid_argument(format!("Invalid base64: {}", e)))?; + .map_err(|e| FlameError::InvalidState(format!("Invalid base64: {}", e)))?; let object = Object { version: 0, data }; - let metadata = self - .cache - .put(session_id, object) - .await - .map_err(|e| Status::internal(format!("Failed to put: {}", e)))?; + let metadata = self.cache.put(session_id, object).await?; serde_json::to_string(&metadata) - .map_err(|e| Status::internal(format!("Failed to serialize: {}", e))) + .map_err(|e| FlameError::Internal(format!("Failed to serialize: {}", e))) } - async fn handle_update_action(&self, action_body: &str) -> Result { + async fn handle_update_action(&self, action_body: &str) -> Result { let (key_str, data_b64) = action_body .split_once(':') - .ok_or_else(|| Status::invalid_argument("Invalid UPDATE action format"))?; + .ok_or_else(|| FlameError::InvalidState("Invalid UPDATE action format".to_string()))?; let key = key_str.to_string(); let data = base64::engine::general_purpose::STANDARD .decode(data_b64) - .map_err(|e| Status::invalid_argument(format!("Invalid base64: {}", e)))?; + .map_err(|e| FlameError::InvalidState(format!("Invalid base64: {}", e)))?; let object = Object { version: 0, data }; - let metadata = self - .cache - .update(key, object) - .await - .map_err(|e| Status::internal(format!("Failed to update: {}", e)))?; + let metadata = self.cache.update(key, object).await?; serde_json::to_string(&metadata) - .map_err(|e| Status::internal(format!("Failed to serialize: {}", e))) + .map_err(|e| FlameError::Internal(format!("Failed to serialize: {}", e))) } - async fn handle_delete_action(&self, session_id: String) -> Result { - self.cache - .delete(session_id) - .await - .map_err(|e| Status::internal(format!("Failed to delete: {}", e)))?; + async fn handle_delete_action(&self, session_id: String) -> Result { + self.cache.delete(session_id).await?; Ok("OK".to_string()) } } // Helper function to encode schema to IPC format for FlightInfo -fn encode_schema(schema: &Schema) -> Result, Status> { +fn encode_schema(schema: &Schema) -> Result, FlameError> { // Encode schema as IPC message using IpcDataGenerator - use arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions}; + use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; let options = IpcWriteOptions::default(); let data_gen = IpcDataGenerator::default(); + let mut dict_tracker = DictionaryTracker::new(false); // Encode the schema - let encoded = data_gen.schema_to_bytes(schema, &options); + let encoded = + data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dict_tracker, &options); Ok(encoded.ipc_message) } // Helper function to convert RecordBatch to FlightData -fn batch_to_flight_data_vec(batch: &RecordBatch) -> Result, Status> { - use arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions}; - use arrow_flight::utils::flight_data_from_arrow_batch; +fn batch_to_flight_data_vec(batch: &RecordBatch) -> Result, FlameError> { + use arrow::ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; tracing::debug!( "batch_to_flight_data_vec: batch rows={}, cols={}", @@ -597,53 +593,51 @@ fn batch_to_flight_data_vec(batch: &RecordBatch) -> Result, Stat // Create IPC write options with alignment to ensure proper encoding let options = IpcWriteOptions::default() .try_with_compression(None) - .map_err(|e| Status::internal(format!("Failed to set compression: {}", e)))?; + .map_err(|e| FlameError::Internal(format!("Failed to set compression: {}", e)))?; - let (mut flight_data_vec, _) = flight_data_from_arrow_batch(batch, &options); - tracing::debug!( - "batch_to_flight_data_vec: generated {} FlightData messages", - flight_data_vec.len() - ); + let mut flight_data_vec = Vec::new(); - // If empty, manually encode the batch - if flight_data_vec.is_empty() { - tracing::warn!("flight_data_from_arrow_batch returned empty, using manual encoding"); - - // First, encode and send schema - let mut data_gen = IpcDataGenerator::default(); - let encoded_schema = data_gen.schema_to_bytes(batch.schema().as_ref(), &options); - - let schema_flight_data = FlightData { - flight_descriptor: None, - app_metadata: vec![].into(), - data_header: encoded_schema.ipc_message.into(), - data_body: vec![].into(), - }; - flight_data_vec.push(schema_flight_data); + // Encode using IpcDataGenerator directly with DictionaryTracker + let data_gen = IpcDataGenerator::default(); + let mut dict_tracker = DictionaryTracker::new(false); - // Then, send the batch data - let mut dictionary_tracker = arrow::ipc::writer::DictionaryTracker::new(false); + // First, encode and send schema + let encoded_schema = data_gen.schema_to_bytes_with_dictionary_tracker( + batch.schema().as_ref(), + &mut dict_tracker, + &options, + ); - let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, &options) - .map_err(|e| Status::internal(format!("Failed to encode batch: {}", e)))?; + let schema_flight_data = FlightData { + flight_descriptor: None, + app_metadata: vec![].into(), + data_header: encoded_schema.ipc_message.into(), + data_body: vec![].into(), + }; + flight_data_vec.push(schema_flight_data); - // Add dictionary batches if any - for dict_batch in encoded_dictionaries { - flight_data_vec.push(dict_batch.into()); - } + // Then, send the batch data + let (encoded_dictionaries, encoded_batch) = data_gen + .encoded_batch(batch, &mut dict_tracker, &options) + .map_err(|e| FlameError::Internal(format!("Failed to encode batch: {}", e)))?; - // Add the data batch - flight_data_vec.push(encoded_batch.into()); + // Add dictionary batches if any + for dict_batch in encoded_dictionaries { + flight_data_vec.push(dict_batch.into()); } + // Add the data batch + flight_data_vec.push(encoded_batch.into()); + tracing::debug!( "batch_to_flight_data_vec: final {} FlightData messages", flight_data_vec.len() ); if flight_data_vec.is_empty() { - Err(Status::internal("No FlightData generated from batch")) + Err(FlameError::Internal( + "No FlightData generated from batch".to_string(), + )) } else { Ok(flight_data_vec) } @@ -672,7 +666,7 @@ fn write_batch_to_file(path: &Path, batch: &RecordBatch) -> Result<(), FlameErro } // Helper function to create a RecordBatch from object data -fn object_to_batch(object: &Object) -> Result { +fn object_to_batch(object: &Object) -> Result { let schema = get_object_schema(); let version_array = UInt64Array::from(vec![object.version]); @@ -682,25 +676,27 @@ fn object_to_batch(object: &Object) -> Result { Arc::new(schema), vec![Arc::new(version_array), Arc::new(data_array)], ) - .map_err(|e| Status::internal(format!("Failed to create RecordBatch: {}", e))) + .map_err(|e| FlameError::Internal(format!("Failed to create RecordBatch: {}", e))) } // Helper function to extract data from RecordBatch -fn batch_to_object(batch: &RecordBatch) -> Result { +fn batch_to_object(batch: &RecordBatch) -> Result { if batch.num_rows() != 1 { - return Err(Status::invalid_argument("Expected exactly one row")); + return Err(FlameError::InvalidState( + "Expected exactly one row".to_string(), + )); } let version_col = batch .column(0) .as_any() .downcast_ref::() - .ok_or_else(|| Status::internal("Invalid version column"))?; + .ok_or_else(|| FlameError::Internal("Invalid version column".to_string()))?; let data_col = batch .column(1) .as_any() .downcast_ref::() - .ok_or_else(|| Status::internal("Invalid data column"))?; + .ok_or_else(|| FlameError::Internal("Invalid data column".to_string()))?; let version = version_col.value(0); let data = data_col.value(0).to_vec(); @@ -732,7 +728,6 @@ impl FlightService for FlightCacheServer { }; // Key format: "session_id/object_id" - let schema = get_object_schema(); // Create endpoint with cache server's public endpoint let endpoint_uri = self.cache.endpoint.to_uri(); @@ -776,11 +771,7 @@ impl FlightService for FlightCacheServer { .map_err(|e| Status::invalid_argument(format!("Invalid ticket: {}", e)))?; // Key format: "session_id/object_id" - let object = self - .cache - .get(key.clone()) - .await - .map_err(|e| Status::not_found(format!("Object not found: {}", e)))?; + let object = self.cache.get(key.clone()).await?; let batch = object_to_batch(&object)?; tracing::debug!( @@ -812,8 +803,7 @@ impl FlightService for FlightCacheServer { let metadata = self .cache .put_with_id(session_id, object_id, object) - .await - .map_err(|e| Status::internal(format!("Failed to put object: {}", e)))?; + .await?; let result = Self::create_put_result(&metadata)?; diff --git a/sdk/rust/build.rs b/sdk/rust/build.rs index 9f23917a..4cb970da 100644 --- a/sdk/rust/build.rs +++ b/sdk/rust/build.rs @@ -19,6 +19,8 @@ fn main() -> Result<(), Box> { "flame.ExecutorState", "#[allow(clippy::enum_variant_names)]", ) + .type_attribute("flame.NodeSpec", "#[allow(dead_code)]") + .type_attribute("flame.Node", "#[allow(dead_code)]") .protoc_arg("--experimental_allow_proto3_optional") .compile_protos( &[ diff --git a/session_manager/src/main.rs b/session_manager/src/main.rs index 5243ee4a..8d202920 100644 --- a/session_manager/src/main.rs +++ b/session_manager/src/main.rs @@ -69,6 +69,7 @@ async fn main() -> Result<(), FlameError> { let provider_rt = build_runtime("provider", 1)?; // Start provider thread. + #[allow(clippy::let_underscore_future)] { let controller = controller.clone(); let ctx = ctx.clone(); @@ -115,6 +116,7 @@ async fn main() -> Result<(), FlameError> { tracing::info!("flame-session-manager started."); // Register default applications. + #[allow(clippy::let_underscore_future)] let _: JoinHandle> = tokio::spawn(async move { for (name, attr) in common::default_applications() { controller.register_application(name, attr).await?; diff --git a/session_manager/src/scheduler/mod.rs b/session_manager/src/scheduler/mod.rs index 747ba265..99842b8b 100644 --- a/session_manager/src/scheduler/mod.rs +++ b/session_manager/src/scheduler/mod.rs @@ -169,14 +169,15 @@ mod tests { )?; tokio_test::block_on(controller.register_node(&new_test_node("node_1".to_string())))?; let ssn_1_id = format!("ssn-1-{}", Utc::now().timestamp()); - let ssn_1 = tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { - id: ssn_1_id.clone(), - application: "flmtest".to_string(), - slots: 1, - common_data: None, - min_instances: 0, - max_instances: None, - }))?; + let ssn_1 = + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_1_id.clone(), + application: "flmtest".to_string(), + slots: 1, + common_data: None, + min_instances: 0, + max_instances: None, + }))?; for _ in 0..task_num { tokio_test::block_on(controller.create_task(ssn_1.id.clone(), None))?; diff --git a/stdng/src/collections/bin_heap.rs b/stdng/src/collections/bin_heap.rs index 2dac5755..fdbe4038 100644 --- a/stdng/src/collections/bin_heap.rs +++ b/stdng/src/collections/bin_heap.rs @@ -93,7 +93,7 @@ where C: Cmp, { fn partial_cmp(&self, other: &Self) -> Option { - Some(Cmp::cmp(self, &self.data, &other.data)) + Some(Ord::cmp(self, other)) } } From 23a3491dce4c0896002e1421ac0c94995d559715 Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Fri, 23 Jan 2026 11:39:56 +0000 Subject: [PATCH 2/2] fix: fix UT for FlameError. Signed-off-by: Klaus Ma --- common/src/lib.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index 95fbdfff..a36a4cf7 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -252,7 +252,17 @@ mod tests { let error = FlameError::Network("test".to_string()); let status = Status::from(error); - assert_eq!(status.code(), Code::Unknown); + assert_eq!(status.code(), Code::Internal); + assert_eq!(status.message(), "test"); + + let error = FlameError::InvalidConfig("test".to_string()); + let status = Status::from(error); + assert_eq!(status.code(), Code::InvalidArgument); + assert_eq!(status.message(), "test"); + + let error = FlameError::InvalidState("test".to_string()); + let status = Status::from(error); + assert_eq!(status.code(), Code::InvalidArgument); assert_eq!(status.message(), "test"); } }