Skip to content

Commit 4601838

Browse files
lewiszlwalamb
andauthored
Make flight sql client generic (#8915)
# Which issue does this PR close? None. # Rationale for this change We may not directly use `Channel` (maybe a wrapper of `Channel`), make flight sql client more generic to receive any type which implements GrpcService trait. # What changes are included in this PR? Change `Channel` to generic type. # Are these changes tested? CI. # Are there any user-facing changes? No. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 365a9ec commit 4601838

File tree

1 file changed

+39
-14
lines changed

1 file changed

+39
-14
lines changed

arrow-flight/src/sql/client.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ use arrow_ipc::{MessageHeader, root_as_message};
5656
use arrow_schema::{ArrowError, Schema, SchemaRef};
5757
use futures::{Stream, TryStreamExt, stream};
5858
use prost::Message;
59-
use tonic::transport::Channel;
59+
use tonic::codegen::{Body, StdError};
6060
use tonic::{IntoRequest, IntoStreamingRequest, Streaming};
6161

6262
/// A FlightSQLServiceClient is an endpoint for retrieving or storing Arrow data
6363
/// by FlightSQL protocol.
64-
#[derive(Debug, Clone)]
64+
#[derive(Debug)]
6565
pub struct FlightSqlServiceClient<T> {
6666
token: Option<String>,
6767
headers: HashMap<String, String>,
@@ -71,14 +71,20 @@ pub struct FlightSqlServiceClient<T> {
7171
/// A FlightSql protocol client that can run queries against FlightSql servers
7272
/// This client is in the "experimental" stage. It is not guaranteed to follow the spec in all instances.
7373
/// Github issues are welcomed.
74-
impl FlightSqlServiceClient<Channel> {
74+
impl<T> FlightSqlServiceClient<T>
75+
where
76+
T: tonic::client::GrpcService<tonic::body::Body>,
77+
T::Error: Into<StdError>,
78+
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
79+
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
80+
{
7581
/// Creates a new FlightSql client that connects to a server over an arbitrary tonic `Channel`
76-
pub fn new(channel: Channel) -> Self {
82+
pub fn new(channel: T) -> Self {
7783
Self::new_from_inner(FlightServiceClient::new(channel))
7884
}
7985

8086
/// Creates a new higher level client with the provided lower level client
81-
pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self {
87+
pub fn new_from_inner(inner: FlightServiceClient<T>) -> Self {
8288
Self {
8389
token: None,
8490
flight_client: inner,
@@ -87,17 +93,17 @@ impl FlightSqlServiceClient<Channel> {
8793
}
8894

8995
/// Return a reference to the underlying [`FlightServiceClient`]
90-
pub fn inner(&self) -> &FlightServiceClient<Channel> {
96+
pub fn inner(&self) -> &FlightServiceClient<T> {
9197
&self.flight_client
9298
}
9399

94100
/// Return a mutable reference to the underlying [`FlightServiceClient`]
95-
pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel> {
101+
pub fn inner_mut(&mut self) -> &mut FlightServiceClient<T> {
96102
&mut self.flight_client
97103
}
98104

99105
/// Consume this client and return the underlying [`FlightServiceClient`]
100-
pub fn into_inner(self) -> FlightServiceClient<Channel> {
106+
pub fn into_inner(self) -> FlightServiceClient<T> {
101107
self.flight_client
102108
}
103109

@@ -416,7 +422,10 @@ impl FlightSqlServiceClient<Channel> {
416422
&mut self,
417423
query: String,
418424
transaction_id: Option<Bytes>,
419-
) -> Result<PreparedStatement<Channel>, ArrowError> {
425+
) -> Result<PreparedStatement<T>, ArrowError>
426+
where
427+
T: Clone,
428+
{
420429
let cmd = ActionCreatePreparedStatementRequest {
421430
query,
422431
transaction_id,
@@ -509,10 +518,10 @@ impl FlightSqlServiceClient<Channel> {
509518
Ok(())
510519
}
511520

512-
fn set_request_headers<T>(
521+
fn set_request_headers<M>(
513522
&self,
514-
mut req: tonic::Request<T>,
515-
) -> Result<tonic::Request<T>, ArrowError> {
523+
mut req: tonic::Request<M>,
524+
) -> Result<tonic::Request<M>, ArrowError> {
516525
for (k, v) in &self.headers {
517526
let k = AsciiMetadataKey::from_str(k.as_str()).map_err(|e| {
518527
ArrowError::ParseError(format!("Cannot convert header key \"{k}\": {e}"))
@@ -532,6 +541,16 @@ impl FlightSqlServiceClient<Channel> {
532541
}
533542
}
534543

544+
impl<T: Clone> Clone for FlightSqlServiceClient<T> {
545+
fn clone(&self) -> Self {
546+
Self {
547+
headers: self.headers.clone(),
548+
token: self.token.clone(),
549+
flight_client: self.flight_client.clone(),
550+
}
551+
}
552+
}
553+
535554
/// A PreparedStatement
536555
#[derive(Debug, Clone)]
537556
pub struct PreparedStatement<T> {
@@ -542,9 +561,15 @@ pub struct PreparedStatement<T> {
542561
parameter_schema: Schema,
543562
}
544563

545-
impl PreparedStatement<Channel> {
564+
impl<T> PreparedStatement<T>
565+
where
566+
T: tonic::client::GrpcService<tonic::body::Body>,
567+
T::Error: Into<StdError>,
568+
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
569+
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
570+
{
546571
pub(crate) fn new(
547-
flight_client: FlightSqlServiceClient<Channel>,
572+
flight_client: FlightSqlServiceClient<T>,
548573
handle: impl Into<Bytes>,
549574
dataset_schema: Schema,
550575
parameter_schema: Schema,

0 commit comments

Comments
 (0)