Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/fluss/src/client/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ impl CredentialsCache {

async fn refresh_from_server(&self) -> Result<HashMap<String, String>> {
let cluster = self.metadata.get_cluster();
let server_node = cluster.get_one_available_server();
let server_node = cluster
.get_one_available_server()
.expect("no tablet server available");
let conn = self.rpc_client.get_connection(server_node).await?;

let request = GetSecurityTokenRequest::new();
Expand Down
51 changes: 41 additions & 10 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,61 @@
// under the License.

use crate::cluster::{Cluster, ServerNode, ServerType};
use crate::error::Result;
use crate::metadata::{TableBucket, TablePath};
use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
use crate::rpc::{RpcClient, ServerConnection};
use log::info;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;

use crate::error::Result;
use crate::proto::MetadataResponse;

#[derive(Default)]
pub struct Metadata {
cluster: RwLock<Arc<Cluster>>,
connections: Arc<RpcClient>,
bootstrap: Arc<str>,
}

impl Metadata {
pub async fn new(boot_strap: &str, connections: Arc<RpcClient>) -> Result<Self> {
let custer = Self::init_cluster(boot_strap, connections.clone()).await?;
pub async fn new(bootstrap: &str, connections: Arc<RpcClient>) -> Result<Self> {
let cluster = Self::init_cluster(bootstrap, connections.clone()).await?;
Ok(Metadata {
cluster: RwLock::new(Arc::new(custer)),
cluster: RwLock::new(Arc::new(cluster)),
connections,
bootstrap: bootstrap.into(),
})
}

async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> Result<Cluster> {
let socker_addrss = boot_strap.parse::<SocketAddr>().unwrap();
let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
let server_node = ServerNode::new(
-1,
socker_addrss.ip().to_string(),
socker_addrss.port() as u32,
socket_address.ip().to_string(),
socket_address.port() as u32,
ServerType::CoordinatorServer,
);
let con = connections.get_connection(&server_node).await?;
let response = con.request(UpdateMetadataRequest::new(&[])).await?;
Cluster::from_metadata_response(response, None)
}

async fn reinit_cluster(&self) -> Result<()> {
let cluster = Self::init_cluster(&self.bootstrap, self.connections.clone()).await?;
*self.cluster.write() = cluster.into();
Ok(())
}

pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) {
// Take a write lock for the entire operation to avoid races between
// reading the current cluster state and writing back the updated one.
let mut cluster_guard = self.cluster.write();
let updated_cluster = cluster_guard.invalidate_server(server_id, table_ids);
*cluster_guard = Arc::new(updated_cluster);
}

pub async fn update(&self, metadata_response: MetadataResponse) -> Result<()> {
let origin_cluster = self.cluster.read().clone();
let new_cluster =
Expand All @@ -65,7 +81,22 @@ impl Metadata {
}

pub async fn update_tables_metadata(&self, table_paths: &HashSet<&TablePath>) -> Result<()> {
let server = self.cluster.read().get_one_available_server().clone();
let maybe_server = {
let guard = self.cluster.read();
guard.get_one_available_server().cloned()
};

let server = match maybe_server {
Some(s) => s,
None => {
info!(
"No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server."
);
self.reinit_cluster().await?;
return Ok(());
}
};

let conn = self.connections.get_connection(&server).await?;

let update_table_paths: Vec<&TablePath> = table_paths.iter().copied().collect();
Expand Down
78 changes: 67 additions & 11 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::client::table::log_fetch_buffer::{
use crate::client::table::remote_log::{
RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
};
use crate::error::{Error, Result};
use crate::error::{Error, Result, RpcError};
use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema};
Expand Down Expand Up @@ -271,6 +271,8 @@ struct LogFetcher {
credentials_cache: Arc<CredentialsCache>,
log_fetch_buffer: Arc<LogFetchBuffer>,
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
table_path: TablePath,
is_partitioned: bool,
}

impl LogFetcher {
Expand Down Expand Up @@ -299,6 +301,8 @@ impl LogFetcher {
credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())),
log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())),
table_path: table_info.table_path.clone(),
is_partitioned: table_info.is_partitioned(),
})
}

Expand All @@ -315,9 +319,45 @@ impl LogFetcher {
}
}

async fn check_and_update_metadata(&self) -> Result<()> {
if self.is_partitioned {
// TODO: Implement partition-aware metadata refresh for buckets whose leaders are unknown.
// The implementation will likely need to collect partition IDs for such buckets and
// perform targeted metadata updates. Until then, we avoid computing unused partition_ids.
return Ok(());
}

let need_update = self
.fetchable_buckets()
.iter()
.any(|bucket| self.get_table_bucket_leader(bucket).is_none());

if !need_update {
return Ok(());
}

// TODO: Handle PartitionNotExist error
self.metadata
.update_tables_metadata(&HashSet::from([&self.table_path]))
.await
.or_else(|e| {
if let Error::RpcError { source, .. } = &e
&& matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_))
{
warn!(
"Retrying after encountering error while updating table metadata: {}",
e
);
Ok(())
} else {
Err(e)
}
})
}

/// Send fetch requests asynchronously without waiting for responses
async fn send_fetches(&self) -> Result<()> {
// todo: check update metadata like fluss-java in case leader changes
self.check_and_update_metadata().await?;
let fetch_request = self.prepare_fetch_log_requests().await;

for (leader, fetch_request) in fetch_request {
Expand All @@ -337,6 +377,7 @@ impl LogFetcher {
let remote_log_downloader = Arc::clone(&self.remote_log_downloader);
let creds_cache = self.credentials_cache.clone();
let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone();
let metadata = self.metadata.clone();

// Spawn async task to handle the fetch request
// Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped.
Expand All @@ -351,27 +392,34 @@ impl LogFetcher {
nodes_with_pending.lock().remove(&leader);
});

let server_node = cluster
.get_tablet_server(leader)
.expect("todo: handle leader not exist.");
let server_node = match cluster.get_tablet_server(leader) {
Some(node) => node,
None => {
warn!("No server node found for leader {}, retrying", leader);
Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
return;
}
};

let con = match conns.get_connection(server_node).await {
Ok(con) => con,
Err(e) => {
// todo: handle failed to get connection
warn!("Failed to get connection to destination node: {e:?}");
warn!("Retrying after error getting connection to destination node: {e:?}");
Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
return;
}
};

let fetch_response = match con
.request(message::FetchLogRequest::new(fetch_request))
.request(message::FetchLogRequest::new(fetch_request.clone()))
.await
{
Ok(resp) => resp,
Err(e) => {
// todo: handle fetch log from destination node
warn!("Failed to fetch log from destination node {server_node:?}: {e:?}");
warn!(
"Retrying after error fetching log from destination node {server_node:?}: {e:?}"
);
Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
return;
}
};
Expand All @@ -387,7 +435,6 @@ impl LogFetcher {
)
.await
{
// todo: handle fail to handle fetch response
error!("Fail to handle fetch response: {e:?}");
}
});
Expand All @@ -396,6 +443,15 @@ impl LogFetcher {
Ok(())
}

async fn handle_fetch_failure(
metadata: Arc<Metadata>,
server_id: &i32,
request: &FetchLogRequest,
) {
let table_ids = request.tables_req.iter().map(|r| r.table_id).collect();
metadata.invalidate_server(server_id, table_ids);
}

/// Handle fetch response and add completed fetches to buffer
async fn handle_fetch_response(
fetch_response: crate::proto::FetchLogResponse,
Expand Down
52 changes: 43 additions & 9 deletions crates/fluss/src/cluster/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::metadata::{JsonSerde, TableBucket, TableDescriptor, TableInfo, TableP
use crate::proto::MetadataResponse;
use crate::rpc::{from_pb_server_node, from_pb_table_path};
use rand::random_range;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

static EMPTY: Vec<BucketLocation> = Vec::new();

Expand Down Expand Up @@ -64,6 +64,43 @@ impl Cluster {
}
}

pub fn invalidate_server(&self, server_id: &i32, table_ids: Vec<i64>) -> Self {
let alive_tablet_servers_by_id = self
.alive_tablet_servers_by_id
.iter()
.filter(|&(id, _)| id != server_id)
.map(|(id, ts)| (*id, ts.clone()))
.collect();

let table_paths: HashSet<&TablePath> = table_ids
.iter()
.filter_map(|id| self.table_path_by_id.get(id))
.collect();

let available_locations_by_path = self
.available_locations_by_path
.iter()
.filter(|&(path, _)| !table_paths.contains(path))
.map(|(path, locations)| (path.clone(), locations.clone()))
.collect();

let available_locations_by_bucket = self
.available_locations_by_bucket
.iter()
.filter(|&(_bucket, location)| !table_paths.contains(&location.table_path))
.map(|(bucket, location)| (bucket.clone(), location.clone()))
.collect();

Cluster::new(
self.coordinator_server.clone(),
alive_tablet_servers_by_id,
available_locations_by_path,
available_locations_by_bucket,
self.table_id_by_path.clone(),
self.table_info_by_path.clone(),
)
}

pub fn update(&mut self, cluster: Cluster) {
let Cluster {
coordinator_server,
Expand Down Expand Up @@ -214,15 +251,12 @@ impl Cluster {
.unwrap_or(&EMPTY)
}

pub fn get_one_available_server(&self) -> &ServerNode {
assert!(
!self.alive_tablet_servers.is_empty(),
"no alive tablet server in cluster"
);
pub fn get_one_available_server(&self) -> Option<&ServerNode> {
if self.alive_tablet_servers.is_empty() {
return None;
}
let offset = random_range(0..self.alive_tablet_servers.len());
self.alive_tablet_servers
.get(offset)
.unwrap_or_else(|| panic!("can't find alive tab server by offset {offset}"))
self.alive_tablet_servers.get(offset)
}

pub fn get_bucket_count(&self, table_path: &TablePath) -> i32 {
Expand Down
25 changes: 21 additions & 4 deletions crates/fluss/src/rpc/server_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,25 @@ impl RpcClient {
server_node: &ServerNode,
) -> Result<ServerConnection, RpcError> {
let server_id = server_node.uid();
{
let connection = {
let connections = self.connections.read();
if let Some(connection) = connections.get(server_id) {
return Ok(connection.clone());
connections.get(server_id).cloned()
};

if let Some(conn) = connection {
if !conn.is_poisoned() {
return Ok(conn);
}
}
let new_server = self.connect(server_node).await?;

let new_server = match self.connect(server_node).await {
Ok(new_server) => new_server,
Err(e) => {
self.connections.write().remove(server_id);
return Err(e);
}
};

self.connections
.write()
.insert(server_id.clone(), new_server.clone());
Expand Down Expand Up @@ -231,6 +243,11 @@ where
}
}

fn is_poisoned(&self) -> bool {
let guard = self.state.lock();
matches!(*guard, ConnectionState::Poison(_))
}

pub async fn request<R>(&self, msg: R) -> Result<R::ResponseBody, Error>
where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
Expand Down
Loading