diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 8adfe48..ffb682e 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -118,7 +118,9 @@ impl CredentialsCache { async fn refresh_from_server(&self) -> Result> { let cluster = self.metadata.get_cluster(); - let server_node = cluster.get_one_available_server(); + let server_node = cluster + .get_one_available_server() + .expect("no tablet server available"); let conn = self.rpc_client.get_connection(server_node).await?; let request = GetSecurityTokenRequest::new(); diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 3c3ba4b..a514422 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -16,38 +16,40 @@ // under the License. use crate::cluster::{Cluster, ServerNode, ServerType}; +use crate::error::Result; use crate::metadata::{TableBucket, TablePath}; +use crate::proto::MetadataResponse; use crate::rpc::message::UpdateMetadataRequest; use crate::rpc::{RpcClient, ServerConnection}; +use log::info; use parking_lot::RwLock; use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; -use crate::error::Result; -use crate::proto::MetadataResponse; - #[derive(Default)] pub struct Metadata { cluster: RwLock>, connections: Arc, + bootstrap: Arc, } impl Metadata { - pub async fn new(boot_strap: &str, connections: Arc) -> Result { - let custer = Self::init_cluster(boot_strap, connections.clone()).await?; + pub async fn new(bootstrap: &str, connections: Arc) -> Result { + let cluster = Self::init_cluster(bootstrap, connections.clone()).await?; Ok(Metadata { - cluster: RwLock::new(Arc::new(custer)), + cluster: RwLock::new(Arc::new(cluster)), connections, + bootstrap: bootstrap.into(), }) } async fn init_cluster(boot_strap: &str, connections: Arc) -> Result { - let socker_addrss = boot_strap.parse::().unwrap(); + let socket_address = boot_strap.parse::().unwrap(); let server_node = ServerNode::new( -1, - socker_addrss.ip().to_string(), - socker_addrss.port() as u32, + socket_address.ip().to_string(), + socket_address.port() as u32, ServerType::CoordinatorServer, ); let con = connections.get_connection(&server_node).await?; @@ -55,6 +57,20 @@ impl Metadata { Cluster::from_metadata_response(response, None) } + async fn reinit_cluster(&self) -> Result<()> { + let cluster = Self::init_cluster(&self.bootstrap, self.connections.clone()).await?; + *self.cluster.write() = cluster.into(); + Ok(()) + } + + pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) { + // Take a write lock for the entire operation to avoid races between + // reading the current cluster state and writing back the updated one. + let mut cluster_guard = self.cluster.write(); + let updated_cluster = cluster_guard.invalidate_server(server_id, table_ids); + *cluster_guard = Arc::new(updated_cluster); + } + pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> { let origin_cluster = self.cluster.read().clone(); let new_cluster = @@ -65,7 +81,22 @@ impl Metadata { } pub async fn update_tables_metadata(&self, table_paths: &HashSet<&TablePath>) -> Result<()> { - let server = self.cluster.read().get_one_available_server().clone(); + let maybe_server = { + let guard = self.cluster.read(); + guard.get_one_available_server().cloned() + }; + + let server = match maybe_server { + Some(s) => s, + None => { + info!( + "No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server." + ); + self.reinit_cluster().await?; + return Ok(()); + } + }; + let conn = self.connections.get_connection(&server).await?; let update_table_paths: Vec<&TablePath> = table_paths.iter().copied().collect(); diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 2246e2c..11bdfa3 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -24,7 +24,7 @@ use crate::client::table::log_fetch_buffer::{ use crate::client::table::remote_log::{ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, }; -use crate::error::{Error, Result}; +use crate::error::{Error, Result, RpcError}; use crate::metadata::{TableBucket, TableInfo, TablePath}; use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; @@ -271,6 +271,8 @@ struct LogFetcher { credentials_cache: Arc, log_fetch_buffer: Arc, nodes_with_pending_fetch_requests: Arc>>, + table_path: TablePath, + is_partitioned: bool, } impl LogFetcher { @@ -299,6 +301,8 @@ impl LogFetcher { credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())), log_fetch_buffer: Arc::new(LogFetchBuffer::new()), nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), + table_path: table_info.table_path.clone(), + is_partitioned: table_info.is_partitioned(), }) } @@ -315,9 +319,45 @@ impl LogFetcher { } } + async fn check_and_update_metadata(&self) -> Result<()> { + if self.is_partitioned { + // TODO: Implement partition-aware metadata refresh for buckets whose leaders are unknown. + // The implementation will likely need to collect partition IDs for such buckets and + // perform targeted metadata updates. Until then, we avoid computing unused partition_ids. + return Ok(()); + } + + let need_update = self + .fetchable_buckets() + .iter() + .any(|bucket| self.get_table_bucket_leader(bucket).is_none()); + + if !need_update { + return Ok(()); + } + + // TODO: Handle PartitionNotExist error + self.metadata + .update_tables_metadata(&HashSet::from([&self.table_path])) + .await + .or_else(|e| { + if let Error::RpcError { source, .. } = &e + && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) + { + warn!( + "Retrying after encountering error while updating table metadata: {}", + e + ); + Ok(()) + } else { + Err(e) + } + }) + } + /// Send fetch requests asynchronously without waiting for responses async fn send_fetches(&self) -> Result<()> { - // todo: check update metadata like fluss-java in case leader changes + self.check_and_update_metadata().await?; let fetch_request = self.prepare_fetch_log_requests().await; for (leader, fetch_request) in fetch_request { @@ -337,6 +377,7 @@ impl LogFetcher { let remote_log_downloader = Arc::clone(&self.remote_log_downloader); let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); + let metadata = self.metadata.clone(); // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. @@ -351,27 +392,34 @@ impl LogFetcher { nodes_with_pending.lock().remove(&leader); }); - let server_node = cluster - .get_tablet_server(leader) - .expect("todo: handle leader not exist."); + let server_node = match cluster.get_tablet_server(leader) { + Some(node) => node, + None => { + warn!("No server node found for leader {}, retrying", leader); + Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; + return; + } + }; let con = match conns.get_connection(server_node).await { Ok(con) => con, Err(e) => { - // todo: handle failed to get connection - warn!("Failed to get connection to destination node: {e:?}"); + warn!("Retrying after error getting connection to destination node: {e:?}"); + Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } }; let fetch_response = match con - .request(message::FetchLogRequest::new(fetch_request)) + .request(message::FetchLogRequest::new(fetch_request.clone())) .await { Ok(resp) => resp, Err(e) => { - // todo: handle fetch log from destination node - warn!("Failed to fetch log from destination node {server_node:?}: {e:?}"); + warn!( + "Retrying after error fetching log from destination node {server_node:?}: {e:?}" + ); + Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } }; @@ -387,7 +435,6 @@ impl LogFetcher { ) .await { - // todo: handle fail to handle fetch response error!("Fail to handle fetch response: {e:?}"); } }); @@ -396,6 +443,15 @@ impl LogFetcher { Ok(()) } + async fn handle_fetch_failure( + metadata: Arc, + server_id: &i32, + request: &FetchLogRequest, + ) { + let table_ids = request.tables_req.iter().map(|r| r.table_id).collect(); + metadata.invalidate_server(server_id, table_ids); + } + /// Handle fetch response and add completed fetches to buffer async fn handle_fetch_response( fetch_response: crate::proto::FetchLogResponse, diff --git a/crates/fluss/src/cluster/cluster.rs b/crates/fluss/src/cluster/cluster.rs index a6f20a8..f14d055 100644 --- a/crates/fluss/src/cluster/cluster.rs +++ b/crates/fluss/src/cluster/cluster.rs @@ -22,7 +22,7 @@ use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, TableP use crate::proto::MetadataResponse; use crate::rpc::{from_pb_server_node, from_pb_table_path}; use rand::random_range; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; static EMPTY: Vec = Vec::new(); @@ -64,6 +64,43 @@ impl Cluster { } } + pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec) -> Self { + let alive_tablet_servers_by_id = self + .alive_tablet_servers_by_id + .iter() + .filter(|&(id, _)| id != server_id) + .map(|(id, ts)| (*id, ts.clone())) + .collect(); + + let table_paths: HashSet<&TablePath> = table_ids + .iter() + .filter_map(|id| self.table_path_by_id.get(id)) + .collect(); + + let available_locations_by_path = self + .available_locations_by_path + .iter() + .filter(|&(path, _)| !table_paths.contains(path)) + .map(|(path, locations)| (path.clone(), locations.clone())) + .collect(); + + let available_locations_by_bucket = self + .available_locations_by_bucket + .iter() + .filter(|&(_bucket, location)| !table_paths.contains(&location.table_path)) + .map(|(bucket, location)| (bucket.clone(), location.clone())) + .collect(); + + Cluster::new( + self.coordinator_server.clone(), + alive_tablet_servers_by_id, + available_locations_by_path, + available_locations_by_bucket, + self.table_id_by_path.clone(), + self.table_info_by_path.clone(), + ) + } + pub fn update(&mut self, cluster: Cluster) { let Cluster { coordinator_server, @@ -214,15 +251,12 @@ impl Cluster { .unwrap_or(&EMPTY) } - pub fn get_one_available_server(&self) -> &ServerNode { - assert!( - !self.alive_tablet_servers.is_empty(), - "no alive tablet server in cluster" - ); + pub fn get_one_available_server(&self) -> Option<&ServerNode> { + if self.alive_tablet_servers.is_empty() { + return None; + } let offset = random_range(0..self.alive_tablet_servers.len()); - self.alive_tablet_servers - .get(offset) - .unwrap_or_else(|| panic!("can't find alive tab server by offset {offset}")) + self.alive_tablet_servers.get(offset) } pub fn get_bucket_count(&self, table_path: &TablePath) -> i32 { diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index fdeb56f..441b175 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -66,13 +66,25 @@ impl RpcClient { server_node: &ServerNode, ) -> Result { let server_id = server_node.uid(); - { + let connection = { let connections = self.connections.read(); - if let Some(connection) = connections.get(server_id) { - return Ok(connection.clone()); + connections.get(server_id).cloned() + }; + + if let Some(conn) = connection { + if !conn.is_poisoned() { + return Ok(conn); } } - let new_server = self.connect(server_node).await?; + + let new_server = match self.connect(server_node).await { + Ok(new_server) => new_server, + Err(e) => { + self.connections.write().remove(server_id); + return Err(e); + } + }; + self.connections .write() .insert(server_id.clone(), new_server.clone()); @@ -231,6 +243,11 @@ where } } + fn is_poisoned(&self) -> bool { + let guard = self.state.lock(); + matches!(*guard, ConnectionState::Poison(_)) + } + pub async fn request(&self, msg: R) -> Result where R: RequestBody + Send + WriteVersionedType>,