From a7c063ca95351f09299288314e8fdeed8097433d Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 22 Dec 2025 22:53:40 +0000 Subject: [PATCH 01/15] LogScanner refreshes cluster metadata when server is not available --- crates/fluss/src/client/credentials.rs | 2 +- crates/fluss/src/client/metadata.rs | 37 ++++++++++++++---- crates/fluss/src/client/table/scanner.rs | 50 +++++++++++++++++++++++- crates/fluss/src/cluster/cluster.rs | 13 +++--- 4 files changed, 84 insertions(+), 18 deletions(-) diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 8adfe48..9fcdc80 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -118,7 +118,7 @@ impl CredentialsCache { async fn refresh_from_server(&self) -> Result> { let cluster = self.metadata.get_cluster(); - let server_node = cluster.get_one_available_server(); + let server_node = cluster.get_one_available_server().expect("no available tablet server"); let conn = self.rpc_client.get_connection(server_node).await?; let request = GetSecurityTokenRequest::new(); diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 3c3ba4b..87b5677 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -23,7 +23,7 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; - +use log::info; use crate::error::Result; use crate::proto::MetadataResponse; @@ -31,23 +31,25 @@ use crate::proto::MetadataResponse; pub struct Metadata { cluster: RwLock>, connections: Arc, + bootstrap: Arc, } impl Metadata { - pub async fn new(boot_strap: &str, connections: Arc) -> Result { - let custer = Self::init_cluster(boot_strap, connections.clone()).await?; + pub async fn new(bootstrap: &str, connections: Arc) -> Result { + let cluster = Self::init_cluster(bootstrap, connections.clone()).await?; Ok(Metadata { - cluster: RwLock::new(Arc::new(custer)), + cluster: RwLock::new(Arc::new(cluster)), connections, + bootstrap: bootstrap.into(), }) } async fn init_cluster(boot_strap: &str, connections: Arc) -> Result { - let socker_addrss = boot_strap.parse::().unwrap(); + let socket_address = boot_strap.parse::().unwrap(); let server_node = ServerNode::new( -1, - socker_addrss.ip().to_string(), - socker_addrss.port() as u32, + socket_address.ip().to_string(), + socket_address.port() as u32, ServerType::CoordinatorServer, ); let con = connections.get_connection(&server_node).await?; @@ -55,6 +57,12 @@ impl Metadata { Cluster::from_metadata_response(response, None) } + async fn reinit_cluster(&self) -> Result<()> { + let cluster = Self::init_cluster(&self.bootstrap, self.connections.clone()).await?; + *self.cluster.write() = cluster.into(); + Ok(()) + } + pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> { let origin_cluster = self.cluster.read().clone(); let new_cluster = @@ -65,7 +73,20 @@ impl Metadata { } pub async fn update_tables_metadata(&self, table_paths: &HashSet<&TablePath>) -> Result<()> { - let server = self.cluster.read().get_one_available_server().clone(); + let maybe_server = { + let guard = self.cluster.read(); + guard.get_one_available_server().cloned() + }; + + let server = match maybe_server { + Some(s) => s, + None => { + info!("No available tablet server to update metadata, try to re-initialize cluster using bootstrap server."); + self.reinit_cluster().await?; + return Ok(()); + }, + }; + let conn = self.connections.get_connection(&server).await?; let update_table_paths: Vec<&TablePath> = table_paths.iter().copied().collect(); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 2246e2c..7cb943a 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -28,6 +28,7 @@ use crate::error::{Error, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; +use crate::rpc::FlussError::PartitionNotExists; use crate::rpc::{RpcClient, message}; use crate::util::FairBucketStatusMap; use arrow_schema::SchemaRef; @@ -271,6 +272,8 @@ struct LogFetcher { credentials_cache: Arc, log_fetch_buffer: Arc, nodes_with_pending_fetch_requests: Arc>>, + table_path: TablePath, + is_partitioned: bool, } impl LogFetcher { @@ -299,6 +302,8 @@ impl LogFetcher { credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())), log_fetch_buffer: Arc::new(LogFetchBuffer::new()), nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), + table_path: table_info.table_path.clone(), + is_partitioned: table_info.is_partitioned(), }) } @@ -315,9 +320,52 @@ impl LogFetcher { } } + async fn check_and_update_metadata(&self) -> Result<()> { + if self.is_partitioned { + let partition_ids: Vec = self + .fetchable_buckets() + .iter() + .filter(|b| self.get_table_bucket_leader(b).is_none()) + .map(|b| b.partition_id().unwrap()) + .collect(); + + if !partition_ids.is_empty() { + // TODO: Implement once LogFetcher is partition aware + } + return Ok(()); + } + + let need_update = self + .fetchable_buckets() + .iter() + .any(|bucket| self.get_table_bucket_leader(bucket).is_none()); + + if !need_update { + return Ok(()); + } + + match self + .metadata + .update_tables_metadata(&HashSet::from([&self.table_path])) + .await + { + Err(Error::FlussAPIError { api_error }) + if api_error.code == PartitionNotExists.code() => + { + warn!( + "Received PartitionNotExistException when updating metadata, ignoring: {}", + api_error + ); + Ok(()) + } + Err(other) => Err(other), + Ok(()) => Ok(()), + } + } + /// Send fetch requests asynchronously without waiting for responses async fn send_fetches(&self) -> Result<()> { - // todo: check update metadata like fluss-java in case leader changes + self.check_and_update_metadata().await?; let fetch_request = self.prepare_fetch_log_requests().await; for (leader, fetch_request) in fetch_request { diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index a6f20a8..b880f0c 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -214,15 +214,12 @@ impl Cluster { .unwrap_or(&EMPTY) } - pub fn get_one_available_server(&self) -> &ServerNode { - assert!( - !self.alive_tablet_servers.is_empty(), - "no alive tablet server in cluster" - ); + pub fn get_one_available_server(&self) -> Option<&ServerNode> { + if self.alive_tablet_servers.is_empty() { + return None; + } let offset = random_range(0..self.alive_tablet_servers.len()); - self.alive_tablet_servers - .get(offset) - .unwrap_or_else(|| panic!("can't find alive tab server by offset {offset}")) + self.alive_tablet_servers.get(offset) } pub fn get_bucket_count(&self, table_path: &TablePath) -> i32 { From 916fd50aca4e42898c862f7cf43735d4fe26f3a2 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 22 Dec 2025 23:00:23 +0000 Subject: [PATCH 02/15] Formatting --- crates/fluss/src/client/credentials.rs | 4 +++- crates/fluss/src/client/metadata.rs | 12 +++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 9fcdc80..e8bd865 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -118,7 +118,9 @@ impl CredentialsCache { async fn refresh_from_server(&self) -> Result> { let cluster = self.metadata.get_cluster(); - let server_node = cluster.get_one_available_server().expect("no available tablet server"); + let server_node = cluster + .get_one_available_server() + .expect("no available tablet server"); let conn = self.rpc_client.get_connection(server_node).await?; let request = GetSecurityTokenRequest::new(); diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 87b5677..7bdc1ad 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -16,16 +16,16 @@ // under the License. use crate::cluster::{Cluster, ServerNode, ServerType}; +use crate::error::Result; use crate::metadata::{TableBucket, TablePath}; +use crate::proto::MetadataResponse; use crate::rpc::message::UpdateMetadataRequest; use crate::rpc::{RpcClient, ServerConnection}; +use log::info; use parking_lot::RwLock; use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; -use log::info; -use crate::error::Result; -use crate::proto::MetadataResponse; #[derive(Default)] pub struct Metadata { @@ -81,10 +81,12 @@ impl Metadata { let server = match maybe_server { Some(s) => s, None => { - info!("No available tablet server to update metadata, try to re-initialize cluster using bootstrap server."); + info!( + "No available tablet server to update metadata, try to re-initialize cluster using bootstrap server." + ); self.reinit_cluster().await?; return Ok(()); - }, + } }; let conn = self.connections.get_connection(&server).await?; From 6df9efb7c86464f600df7d8a384f4e62e8191242 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 23 Dec 2025 18:44:28 +0000 Subject: [PATCH 03/15] Working server node invalidation / re-registering on server unavailability and recovery. --- crates/fluss/src/client/metadata.rs | 5 ++++ crates/fluss/src/client/table/scanner.rs | 19 ++++++++++++---- crates/fluss/src/cluster/cluster.rs | 29 +++++++++++++++++++++++- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 7bdc1ad..8cfabc7 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -63,6 +63,11 @@ impl Metadata { Ok(()) } + pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) { + let cluster = self.cluster.read().invalidate_server(server_id, table_ids); + *self.cluster.write() = cluster.into(); + } + pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> { let origin_cluster = self.cluster.read().clone(); let new_cluster = diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 7cb943a..d2d5242 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -385,6 +385,7 @@ impl LogFetcher { let remote_log_downloader = Arc::clone(&self.remote_log_downloader); let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); + let metadata = self.metadata.clone(); // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. @@ -399,9 +400,13 @@ impl LogFetcher { nodes_with_pending.lock().remove(&leader); }); - let server_node = cluster - .get_tablet_server(leader) - .expect("todo: handle leader not exist."); + let server_node = match cluster.get_tablet_server(leader) { + Some(node) => node, + None => { + Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; + return; + } + }; let con = match conns.get_connection(server_node).await { Ok(con) => con, @@ -413,13 +418,14 @@ impl LogFetcher { }; let fetch_response = match con - .request(message::FetchLogRequest::new(fetch_request)) + .request(message::FetchLogRequest::new(fetch_request.clone())) .await { Ok(resp) => resp, Err(e) => { // todo: handle fetch log from destination node warn!("Failed to fetch log from destination node {server_node:?}: {e:?}"); + Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } }; @@ -444,6 +450,11 @@ impl LogFetcher { Ok(()) } + async fn handle_fetch_failure(metadata: Arc, server_id: &i32, request: &FetchLogRequest) { + let table_ids = request.tables_req.iter().map(|r| r.table_id).collect(); + metadata.invalidate_server(server_id, table_ids); + } + /// Handle fetch response and add completed fetches to buffer async fn handle_fetch_response( fetch_response: crate::proto::FetchLogResponse, diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index b880f0c..81e9a26 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -22,7 +22,7 @@ use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, TableP use crate::proto::MetadataResponse; use crate::rpc::{from_pb_server_node, from_pb_table_path}; use rand::random_range; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; static EMPTY: Vec = Vec::new(); @@ -64,6 +64,33 @@ impl Cluster { } } + pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) -> Self { + let alive_tablet_servers_by_id = self + .alive_tablet_servers_by_id + .iter() + .filter_map(|(id, ts)| (id != server_id).then(|| (id.clone(), ts.clone()))) + .collect(); + + let table_paths: HashSet<&TablePath> = table_ids.iter().filter_map(|id| { self.table_path_by_id.get(id) }).collect(); + + let available_locations_by_path = self.available_locations_by_path.iter() + .filter_map(|(path, locations)| { (!table_paths.contains(path)).then(|| (path.clone(), locations.clone())) }) + .collect(); + + let available_locations_by_bucket = self.available_locations_by_bucket.iter() + .filter_map(|(bucket, location)| { (!table_paths.contains(&location.table_path)).then(|| (bucket.clone(), location.clone())) }) + .collect(); + + Cluster::new( + self.coordinator_server.clone(), + alive_tablet_servers_by_id, + available_locations_by_path, + available_locations_by_bucket, + self.table_id_by_path.clone(), + self.table_info_by_path.clone(), + ) + } + pub fn update(&mut self, cluster: Cluster) { let Cluster { coordinator_server, From cbee4b676ed7d7c42d21fa8184bc27a00b1e36de Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 23 Dec 2025 23:10:04 +0000 Subject: [PATCH 04/15] Working server node invalidation / re-registering on server unavailability and recovery. --- crates/fluss/src/client/table/scanner.rs | 48 ++++++++++++----------- crates/fluss/src/cluster/cluster.rs | 19 ++++++--- crates/fluss/src/rpc/server_connection.rs | 16 ++++++-- 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index d2d5242..3ae000d 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -24,11 +24,10 @@ use crate::client::table::log_fetch_buffer::{ use crate::client::table::remote_log::{ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, }; -use crate::error::{Error, Result}; +use crate::error::{Error, Result, RpcError}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; -use crate::rpc::FlussError::PartitionNotExists; use crate::rpc::{RpcClient, message}; use crate::util::FairBucketStatusMap; use arrow_schema::SchemaRef; @@ -344,23 +343,23 @@ impl LogFetcher { return Ok(()); } - match self - .metadata + // TODO: Handle PartitionNotExist error + self.metadata .update_tables_metadata(&HashSet::from([&self.table_path])) .await - { - Err(Error::FlussAPIError { api_error }) - if api_error.code == PartitionNotExists.code() => - { - warn!( - "Received PartitionNotExistException when updating metadata, ignoring: {}", - api_error - ); - Ok(()) - } - Err(other) => Err(other), - Ok(()) => Ok(()), - } + .or_else(|e| match &e { + Error::RpcError { source, .. } => match source { + RpcError::ConnectionError(_) | RpcError::Poisoned(_) => { + warn!( + "Retrying after encountering error while updating table metadata: {}", + e + ); + Ok(()) + } + _ => Err(e), + }, + _ => Err(e), + }) } /// Send fetch requests asynchronously without waiting for responses @@ -411,8 +410,7 @@ impl LogFetcher { let con = match conns.get_connection(server_node).await { Ok(con) => con, Err(e) => { - // todo: handle failed to get connection - warn!("Failed to get connection to destination node: {e:?}"); + warn!("Retrying after error getting connection to destination node: {e:?}"); return; } }; @@ -423,8 +421,9 @@ impl LogFetcher { { Ok(resp) => resp, Err(e) => { - // todo: handle fetch log from destination node - warn!("Failed to fetch log from destination node {server_node:?}: {e:?}"); + warn!( + "Retrying after error fetching log from destination node {server_node:?}: {e:?}" + ); Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } @@ -441,7 +440,6 @@ impl LogFetcher { ) .await { - // todo: handle fail to handle fetch response error!("Fail to handle fetch response: {e:?}"); } }); @@ -450,7 +448,11 @@ impl LogFetcher { Ok(()) } - async fn handle_fetch_failure(metadata: Arc, server_id: &i32, request: &FetchLogRequest) { + async fn handle_fetch_failure( + metadata: Arc, + server_id: &i32, + request: &FetchLogRequest, + ) { let table_ids = request.tables_req.iter().map(|r| r.table_id).collect(); metadata.invalidate_server(server_id, table_ids); } diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 81e9a26..8c6d89a 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -68,17 +68,24 @@ impl Cluster { let alive_tablet_servers_by_id = self .alive_tablet_servers_by_id .iter() - .filter_map(|(id, ts)| (id != server_id).then(|| (id.clone(), ts.clone()))) + .filter(|&(id, ts)| id != server_id).map(|(id, ts)| (*id, ts.clone())) .collect(); - let table_paths: HashSet<&TablePath> = table_ids.iter().filter_map(|id| { self.table_path_by_id.get(id) }).collect(); + let table_paths: HashSet<&TablePath> = table_ids + .iter() + .filter_map(|id| self.table_path_by_id.get(id)) + .collect(); - let available_locations_by_path = self.available_locations_by_path.iter() - .filter_map(|(path, locations)| { (!table_paths.contains(path)).then(|| (path.clone(), locations.clone())) }) + let available_locations_by_path = self + .available_locations_by_path + .iter() + .filter(|&(path, locations)| !table_paths.contains(path)).map(|(path, locations)| (path.clone(), locations.clone())) .collect(); - let available_locations_by_bucket = self.available_locations_by_bucket.iter() - .filter_map(|(bucket, location)| { (!table_paths.contains(&location.table_path)).then(|| (bucket.clone(), location.clone())) }) + let available_locations_by_bucket = self + .available_locations_by_bucket + .iter() + .filter(|&(bucket, location)| !table_paths.contains(&location.table_path)).map(|(bucket, location)| (bucket.clone(), location.clone())) .collect(); Cluster::new( diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index fdeb56f..379dced 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -66,12 +66,17 @@ impl RpcClient { server_node: &ServerNode, ) -> Result { let server_id = server_node.uid(); - { + let connection = { let connections = self.connections.read(); - if let Some(connection) = connections.get(server_id) { - return Ok(connection.clone()); + connections.get(server_id).cloned() + }; + + if let Some(conn) = connection { + if !conn.is_poisoned().await { + return Ok(conn.clone()); } } + let new_server = self.connect(server_node).await?; self.connections .write() @@ -231,6 +236,11 @@ where } } + async fn is_poisoned(&self) -> bool { + let guard = self.state.lock(); + matches!(*guard, ConnectionState::Poison(_)) + } + pub async fn request(&self, msg: R) -> Result where R: RequestBody + Send + WriteVersionedType>, From 6625773c6c14f17d0154e6baabb6344fe4be5c9e Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 23 Dec 2025 23:16:19 +0000 Subject: [PATCH 05/15] Working server node invalidation / re-registering on server unavailability and recovery. --- crates/fluss/src/client/credentials.rs | 2 +- crates/fluss/src/client/metadata.rs | 2 +- crates/fluss/src/cluster/cluster.rs | 9 ++++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index e8bd865..ffb682e 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -120,7 +120,7 @@ impl CredentialsCache { let cluster = self.metadata.get_cluster(); let server_node = cluster .get_one_available_server() - .expect("no available tablet server"); + .expect("no tablet server available"); let conn = self.rpc_client.get_connection(server_node).await?; let request = GetSecurityTokenRequest::new(); diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 8cfabc7..85641ca 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -87,7 +87,7 @@ impl Metadata { Some(s) => s, None => { info!( - "No available tablet server to update metadata, try to re-initialize cluster using bootstrap server." + "No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server." ); self.reinit_cluster().await?; return Ok(()); diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 8c6d89a..06f0b14 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -68,7 +68,8 @@ impl Cluster { let alive_tablet_servers_by_id = self .alive_tablet_servers_by_id .iter() - .filter(|&(id, ts)| id != server_id).map(|(id, ts)| (*id, ts.clone())) + .filter(|&(id, ts)| id != server_id) + .map(|(id, ts)| (*id, ts.clone())) .collect(); let table_paths: HashSet<&TablePath> = table_ids @@ -79,13 +80,15 @@ impl Cluster { let available_locations_by_path = self .available_locations_by_path .iter() - .filter(|&(path, locations)| !table_paths.contains(path)).map(|(path, locations)| (path.clone(), locations.clone())) + .filter(|&(path, locations)| !table_paths.contains(path)) + .map(|(path, locations)| (path.clone(), locations.clone())) .collect(); let available_locations_by_bucket = self .available_locations_by_bucket .iter() - .filter(|&(bucket, location)| !table_paths.contains(&location.table_path)).map(|(bucket, location)| (bucket.clone(), location.clone())) + .filter(|&(bucket, location)| !table_paths.contains(&location.table_path)) + .map(|(bucket, location)| (bucket.clone(), location.clone())) .collect(); Cluster::new( From 0aecad1b58dfbd4edfa50287ae823c2ef3e3c64f Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Tue, 23 Dec 2025 23:32:33 +0000 Subject: [PATCH 06/15] Working server node invalidation / re-registering on server unavailability and recovery. --- crates/fluss/src/client/table/scanner.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 3ae000d..1d8747b 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -402,6 +402,7 @@ impl LogFetcher { let server_node = match cluster.get_tablet_server(leader) { Some(node) => node, None => { + warn!("No server node found for leader {}, retrying", leader); Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } @@ -411,6 +412,7 @@ impl LogFetcher { Ok(con) => con, Err(e) => { warn!("Retrying after error getting connection to destination node: {e:?}"); + Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } }; From e06cf6dfbf1572233548e7c42282a70d60e1d1d6 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 10:11:15 +0000 Subject: [PATCH 07/15] Remove server connection when there's connection error to trigger metadata update --- crates/fluss/src/rpc/server_connection.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 379dced..b65dbb9 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -77,7 +77,14 @@ impl RpcClient { } } - let new_server = self.connect(server_node).await?; + let new_server = match self.connect(server_node).await { + Ok(new_server) => new_server, + Err(e) => { + self.connections.write().remove(server_id); + return Err(e); + } + }; + self.connections .write() .insert(server_id.clone(), new_server.clone()); From b66f9569a2a0c0e22aa48913a25a8322bf9b0014 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:14:12 +0000 Subject: [PATCH 08/15] Make clippy happy --- crates/fluss/src/client/table/scanner.rs | 24 ++++++++++++------------ crates/fluss/src/cluster/cluster.rs | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 1d8747b..2410b66 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -347,18 +347,18 @@ impl LogFetcher { self.metadata .update_tables_metadata(&HashSet::from([&self.table_path])) .await - .or_else(|e| match &e { - Error::RpcError { source, .. } => match source { - RpcError::ConnectionError(_) | RpcError::Poisoned(_) => { - warn!( - "Retrying after encountering error while updating table metadata: {}", - e - ); - Ok(()) - } - _ => Err(e), - }, - _ => Err(e), + .or_else(|e| { + if let Error::RpcError { source, .. } = &e + && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) + { + warn!( + "Retrying after encountering error while updating table metadata: {}", + e + ); + Ok(()) + } else { + Err(e) + } }) } diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 06f0b14..a83214c 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -68,7 +68,7 @@ impl Cluster { let alive_tablet_servers_by_id = self .alive_tablet_servers_by_id .iter() - .filter(|&(id, ts)| id != server_id) + .filter(|&(id, _ts)| id != server_id) .map(|(id, ts)| (*id, ts.clone())) .collect(); @@ -80,14 +80,14 @@ impl Cluster { let available_locations_by_path = self .available_locations_by_path .iter() - .filter(|&(path, locations)| !table_paths.contains(path)) + .filter(|&(path, _locations)| !table_paths.contains(path)) .map(|(path, locations)| (path.clone(), locations.clone())) .collect(); let available_locations_by_bucket = self .available_locations_by_bucket .iter() - .filter(|&(bucket, location)| !table_paths.contains(&location.table_path)) + .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) .map(|(bucket, location)| (bucket.clone(), location.clone())) .collect(); From 8a608f8df37618e994e677ed498a0cef4fd802fc Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:36:51 +0000 Subject: [PATCH 09/15] Update crates/fluss/src/cluster/cluster.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/fluss/src/cluster/cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index a83214c..6133dc4 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -68,7 +68,7 @@ impl Cluster { let alive_tablet_servers_by_id = self .alive_tablet_servers_by_id .iter() - .filter(|&(id, _ts)| id != server_id) + .filter(|&(id, _)| id != server_id) .map(|(id, ts)| (*id, ts.clone())) .collect(); From fa1d8ab539898010da3056bc39d6be250076d613 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:37:35 +0000 Subject: [PATCH 10/15] Update crates/fluss/src/cluster/cluster.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/fluss/src/cluster/cluster.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index 6133dc4..f14d055 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -80,7 +80,7 @@ impl Cluster { let available_locations_by_path = self .available_locations_by_path .iter() - .filter(|&(path, _locations)| !table_paths.contains(path)) + .filter(|&(path, _)| !table_paths.contains(path)) .map(|(path, locations)| (path.clone(), locations.clone())) .collect(); From b7950c1ffa771810a8daf1c1272b368cc49b1f5a Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:42:11 +0000 Subject: [PATCH 11/15] Update crates/fluss/src/rpc/server_connection.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/fluss/src/rpc/server_connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index b65dbb9..a34b24c 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -73,7 +73,7 @@ impl RpcClient { if let Some(conn) = connection { if !conn.is_poisoned().await { - return Ok(conn.clone()); + return Ok(conn); } } From aeae2ffe0e5b7ffa0dbdfd00fdbb9dd7d5913442 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:42:47 +0000 Subject: [PATCH 12/15] Update crates/fluss/src/client/table/scanner.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/fluss/src/client/table/scanner.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 2410b66..11bdfa3 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -321,16 +321,9 @@ impl LogFetcher { async fn check_and_update_metadata(&self) -> Result<()> { if self.is_partitioned { - let partition_ids: Vec = self - .fetchable_buckets() - .iter() - .filter(|b| self.get_table_bucket_leader(b).is_none()) - .map(|b| b.partition_id().unwrap()) - .collect(); - - if !partition_ids.is_empty() { - // TODO: Implement once LogFetcher is partition aware - } + // TODO: Implement partition-aware metadata refresh for buckets whose leaders are unknown. + // The implementation will likely need to collect partition IDs for such buckets and + // perform targeted metadata updates. Until then, we avoid computing unused partition_ids. return Ok(()); } From 7dbf4630d57d77940545f78634b391c186731529 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:43:05 +0000 Subject: [PATCH 13/15] Update crates/fluss/src/rpc/server_connection.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/fluss/src/rpc/server_connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index a34b24c..40d4805 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -243,7 +243,7 @@ where } } - async fn is_poisoned(&self) -> bool { + fn is_poisoned(&self) -> bool { let guard = self.state.lock(); matches!(*guard, ConnectionState::Poison(_)) } From 14413cd8ababc47efa76c9fc697125fc9d5ff781 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 12:46:02 +0000 Subject: [PATCH 14/15] Update crates/fluss/src/client/metadata.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/fluss/src/client/metadata.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 85641ca..a514422 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -64,8 +64,11 @@ impl Metadata { } pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) { - let cluster = self.cluster.read().invalidate_server(server_id, table_ids); - *self.cluster.write() = cluster.into(); + // Take a write lock for the entire operation to avoid races between + // reading the current cluster state and writing back the updated one. + let mut cluster_guard = self.cluster.write(); + let updated_cluster = cluster_guard.invalidate_server(server_id, table_ids); + *cluster_guard = Arc::new(updated_cluster); } pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> { From e886b4c804e9606df87227f9d950b836a994a20d Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 24 Dec 2025 13:01:30 +0000 Subject: [PATCH 15/15] Minor fix --- crates/fluss/src/rpc/server_connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 40d4805..441b175 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -72,7 +72,7 @@ impl RpcClient { }; if let Some(conn) = connection { - if !conn.is_poisoned().await { + if !conn.is_poisoned() { return Ok(conn); } }