diff --git a/cli/src/main.rs b/cli/src/main.rs index 4330cc4..f137215 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -243,10 +243,11 @@ fn perform_export(args: ExportArgs) { decimal_precision: args.schema_settings.decimal_precision, array_handling: args.schema_settings.array_handling, }; - let query = args.query.unwrap_or_else(|| { - format!("SELECT * FROM {}", args.table.unwrap()) - }); - let result = postgres_cloner::execute_copy(&args.postgres, &query, &args.output_file, props, args.quiet, &settings); + let result = if let Some(table) = &args.table { + postgres_cloner::execute_copy_table(&args.postgres, table, &args.output_file, props, args.quiet, &settings) + } else { + postgres_cloner::execute_copy_query(&args.postgres, &args.query.unwrap(), &args.output_file, props, args.quiet, &settings) + }; let _stats = handle_result(result); // eprintln!("Wrote {} rows, {} bytes of raw data in {} groups", stats.rows, stats.bytes, stats.groups); diff --git a/cli/src/parquet_writer.rs b/cli/src/parquet_writer.rs index b9d5590..6c367ef 100644 --- a/cli/src/parquet_writer.rs +++ b/cli/src/parquet_writer.rs @@ -19,11 +19,11 @@ pub struct WriterSettings { pub row_group_row_limit: usize } -pub struct ParquetRowWriter { +pub struct ParquetRowWriter { writer: SerializedFileWriter, schema: parquet::schema::types::TypePtr, // row_group_writer: SerializedRowGroupWriter<'a, W>, - appender: DynColumnAppender>, + appender: DynColumnAppender>, stats: WriterStats, last_timestep_stats: WriterStats, last_timestep_time: std::time::Instant, @@ -35,11 +35,11 @@ pub struct ParquetRowWriter { current_group_rows: usize } -impl ParquetRowWriter { +impl ParquetRowWriter { pub fn new( writer: SerializedFileWriter, schema: parquet::schema::types::TypePtr, - appender: DynColumnAppender>, + appender: DynColumnAppender>, quiet: bool, settings: WriterSettings ) -> parquet::errors::Result { @@ -83,10 +83,10 @@ impl ParquetRowWriter { Ok(()) } - pub fn write_row(&mut self, row: Arc) -> Result<(), String> { + pub fn write_row(&mut self, row: Arc) -> Result<(), String> { let lvl = LevelIndexList::new_i(self.stats.rows); let bytes = self.appender.copy_value(&lvl, Cow::Borrowed(&row)) - .map_err(|e| format!("Could not copy Row[{}]:", identify_row(&row)) + &e)?; + .map_err(|e| format!("Could not copy row: {}", e))?; self.current_group_bytes += bytes; self.current_group_rows += 1; diff --git a/cli/src/pg_custom_types.rs b/cli/src/pg_custom_types.rs index 4419d82..1069654 100644 --- a/cli/src/pg_custom_types.rs +++ b/cli/src/pg_custom_types.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, any::TypeId, io::Read}; use byteorder::{ReadBytesExt, BigEndian}; use postgres::types::{FromSql, Kind, WrongType, Field}; +use postgres::binary_copy::BinaryCopyOutRow; use postgres_protocol::types as pgtypes; fn read_pg_len(bytes: &[u8]) -> i32 { @@ -315,6 +316,17 @@ impl PgAbstractRow for Arc { } } +impl PgAbstractRow for BinaryCopyOutRow { + fn ab_get<'a, T: FromSql<'a>>(&'a self, index: usize) -> T { + self.get(index) + } + + fn ab_len(&self) -> usize { + // ab_len is not used in the current implementation + 0 + } +} + pub struct UnclonableHack(pub T); impl Clone for UnclonableHack { diff --git a/cli/src/postgres_cloner.rs b/cli/src/postgres_cloner.rs index a63adcd..ce08831 100644 --- a/cli/src/postgres_cloner.rs +++ b/cli/src/postgres_cloner.rs @@ -19,6 +19,7 @@ use pg_bigdecimal::PgNumeric; use postgres::error::SqlState; use postgres::types::{Kind, Type as PgType, FromSql}; use postgres::{self, Client, RowIter, Row, Column, Statement, NoTls}; +use postgres::binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow}; use postgres::fallible_iterator::FallibleIterator; use parquet::schema::types::{Type as ParquetType, TypePtr, GroupTypeBuilder}; @@ -31,7 +32,7 @@ use crate::datatypes::money::PgMoney; use crate::datatypes::numeric::{new_decimal_bytes_appender, new_decimal_int_appender}; use crate::myfrom::{MyFrom, self}; use crate::parquet_writer::{WriterStats, ParquetRowWriter, WriterSettings}; -use crate::pg_custom_types::{PgEnum, PgRawRange, PgAbstractRow, PgRawRecord, PgAny, PgAnyRef, UnclonableHack}; +use crate::pg_custom_types::{PgAbstractRow, PgAny, PgEnum, PgRawRecord, PgRawRange, PgAnyRef, UnclonableHack}; type ResolvedColumn = (DynColumnAppender, ParquetType); @@ -216,33 +217,84 @@ fn pg_connect(args: &PostgresConnArgs) -> Result { Ok(client) } -pub fn execute_copy(pg_args: &PostgresConnArgs, query: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result { - +pub fn execute_copy_query(pg_args: &PostgresConnArgs, query: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result { let mut client = pg_connect(pg_args)?; + let statement = client.prepare(query).map_err(|db_err| { db_err.to_string() })?; + let (row_appender, schema) = map_schema_root::(statement.columns(), schema_settings)?; + + execute_with_writer(output_file, output_props, quiet, schema, row_appender, |row_writer| { + let rows: RowIter = client.query_raw::(&statement, &[]) + .map_err(|err| format!("Failed to execute the SQL query: {}", err))?; + for row in rows.iterator() { + let row = row.map_err(|err| err.to_string())?; + let row = Arc::new(row); + row_writer.write_row(row)?; + } + Ok(()) + }) +} - let (row_appender, schema) = map_schema_root(statement.columns(), schema_settings)?; +pub fn execute_copy_table(pg_args: &PostgresConnArgs, table_name: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result { + let mut client = pg_connect(pg_args)?; + + if !quiet { + println!("Copying from table {} to {} using COPY with binary format...", table_name, output_file.display()); + } + + let schema_query = format!("SELECT * FROM {} LIMIT 0", table_name); + let statement = client.prepare(&schema_query) + .map_err(|err| format!("Failed to prepare schema query: {}", err))?; + + let (row_appender, schema) = map_schema_root::(statement.columns(), schema_settings)?; + + execute_with_writer(output_file, output_props, quiet, schema, row_appender, |row_writer| { + let copy_query = format!("COPY {} TO STDOUT (FORMAT BINARY)", table_name); + let copy_reader = client.copy_out(©_query) + .map_err(|err| format!("Failed to execute COPY command: {}", err))?; + + let column_types: Vec = + statement.columns().iter().map(|col| col.type_().clone()).collect(); + + let mut binary_iter = BinaryCopyOutIter::new(copy_reader, &column_types); + while let Some(binary_row) = binary_iter.next() + .map_err(|err| format!("Failed to read binary row: {}", err))? { + + row_writer.write_row(Arc::new(binary_row))?; + } + Ok(()) + }) +} + +fn execute_with_writer( + output_file: &PathBuf, + output_props: WriterPropertiesPtr, + quiet: bool, + schema: ParquetType, + row_appender: DynColumnAppender>, + data_processor: F +) -> Result +where + F: FnOnce(&mut ParquetRowWriter) -> Result<(), String> +{ if !quiet { eprintln!("Schema: {}", format_schema(&schema, 0)); } let schema = Arc::new(schema); - let settings = WriterSettings { row_group_byte_limit: 500 * 1024 * 1024, row_group_row_limit: output_props.max_row_group_size() }; + let settings = WriterSettings { + row_group_byte_limit: 500 * 1024 * 1024, + row_group_row_limit: output_props.max_row_group_size() + }; - let output_file_f = std::fs::File::create(output_file).unwrap(); + let output_file_f = std::fs::File::create(output_file) + .map_err(|e| format!("Failed to create output file: {}", e))?; let pq_writer = SerializedFileWriter::new(output_file_f, schema.clone(), output_props) .map_err(|e| format!("Failed to create parquet writer: {}", e))?; let mut row_writer = ParquetRowWriter::new(pq_writer, schema.clone(), row_appender, quiet, settings) .map_err(|e| format!("Failed to create row writer: {}", e))?; - let rows: RowIter = client.query_raw::(&statement, &[]) - .map_err(|err| format!("Failed to execute the SQL query: {}", err))?; - for row in rows.iterator() { - let row = row.map_err(|err| err.to_string())?; - let row = Arc::new(row); - - row_writer.write_row(row)?; - } + data_processor(&mut row_writer)?; Ok(row_writer.close()?) } @@ -329,8 +381,8 @@ fn count_columns(p: &ParquetType) -> usize { } -fn map_schema_root<'a>(row: &[Column], s: &SchemaSettings) -> Result>, String> { - let mut fields: Vec>> = vec![]; +fn map_schema_root(row: &[Column], s: &SchemaSettings) -> Result>, String> { + let mut fields: Vec>> = vec![]; for (col_i, c) in row.iter().enumerate() { let t = c.type_(); @@ -342,7 +394,7 @@ fn map_schema_root<'a>(row: &[Column], s: &SchemaSettings) -> Result, Vec<_>) = fields.into_iter().unzip(); - let merged_appender: DynColumnAppender> = Box::new(DynamicMergedAppender::new(column_appenders, 0, 0)); + let merged_appender: DynColumnAppender> = Box::new(DynamicMergedAppender::new(column_appenders, 0, 0)); let struct_type = ParquetType::group_type_builder("root") .with_fields(parquet_types.into_iter().map(Arc::new).collect()) .build()