Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,11 @@ fn perform_export(args: ExportArgs) {
decimal_precision: args.schema_settings.decimal_precision,
array_handling: args.schema_settings.array_handling,
};
let query = args.query.unwrap_or_else(|| {
format!("SELECT * FROM {}", args.table.unwrap())
});
let result = postgres_cloner::execute_copy(&args.postgres, &query, &args.output_file, props, args.quiet, &settings);
let result = if let Some(table) = &args.table {
postgres_cloner::execute_copy_table(&args.postgres, table, &args.output_file, props, args.quiet, &settings)
} else {
postgres_cloner::execute_copy_query(&args.postgres, &args.query.unwrap(), &args.output_file, props, args.quiet, &settings)
};
let _stats = handle_result(result);

// eprintln!("Wrote {} rows, {} bytes of raw data in {} groups", stats.rows, stats.bytes, stats.groups);
Expand Down
12 changes: 6 additions & 6 deletions cli/src/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ pub struct WriterSettings {
pub row_group_row_limit: usize
}

pub struct ParquetRowWriter<W: Write + Send> {
pub struct ParquetRowWriter<W: Write + Send, T: PgAbstractRow = postgres::Row> {
writer: SerializedFileWriter<W>,
schema: parquet::schema::types::TypePtr,
// row_group_writer: SerializedRowGroupWriter<'a, W>,
appender: DynColumnAppender<Arc<postgres::Row>>,
appender: DynColumnAppender<Arc<T>>,
stats: WriterStats,
last_timestep_stats: WriterStats,
last_timestep_time: std::time::Instant,
Expand All @@ -35,11 +35,11 @@ pub struct ParquetRowWriter<W: Write + Send> {
current_group_rows: usize
}

impl <W: Write + Send> ParquetRowWriter<W> {
impl<W: Write + Send, T: PgAbstractRow> ParquetRowWriter<W, T> {
pub fn new(
writer: SerializedFileWriter<W>,
schema: parquet::schema::types::TypePtr,
appender: DynColumnAppender<Arc<postgres::Row>>,
appender: DynColumnAppender<Arc<T>>,
quiet: bool,
settings: WriterSettings
) -> parquet::errors::Result<Self> {
Expand Down Expand Up @@ -83,10 +83,10 @@ impl <W: Write + Send> ParquetRowWriter<W> {
Ok(())
}

pub fn write_row(&mut self, row: Arc<postgres::Row>) -> Result<(), String> {
pub fn write_row(&mut self, row: Arc<T>) -> Result<(), String> {
let lvl = LevelIndexList::new_i(self.stats.rows);
let bytes = self.appender.copy_value(&lvl, Cow::Borrowed(&row))
.map_err(|e| format!("Could not copy Row[{}]:", identify_row(&row)) + &e)?;
.map_err(|e| format!("Could not copy row: {}", e))?;

self.current_group_bytes += bytes;
self.current_group_rows += 1;
Expand Down
12 changes: 12 additions & 0 deletions cli/src/pg_custom_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{sync::Arc, any::TypeId, io::Read};

use byteorder::{ReadBytesExt, BigEndian};
use postgres::types::{FromSql, Kind, WrongType, Field};
use postgres::binary_copy::BinaryCopyOutRow;
use postgres_protocol::types as pgtypes;

fn read_pg_len(bytes: &[u8]) -> i32 {
Expand Down Expand Up @@ -315,6 +316,17 @@ impl<TRow: PgAbstractRow> PgAbstractRow for Arc<TRow> {
}
}

impl PgAbstractRow for BinaryCopyOutRow {
fn ab_get<'a, T: FromSql<'a>>(&'a self, index: usize) -> T {
self.get(index)
}

fn ab_len(&self) -> usize {
// ab_len is not used in the current implementation
0
}
}

pub struct UnclonableHack<T>(pub T);

impl<T> Clone for UnclonableHack<T> {
Expand Down
86 changes: 69 additions & 17 deletions cli/src/postgres_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pg_bigdecimal::PgNumeric;
use postgres::error::SqlState;
use postgres::types::{Kind, Type as PgType, FromSql};
use postgres::{self, Client, RowIter, Row, Column, Statement, NoTls};
use postgres::binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow};
use postgres::fallible_iterator::FallibleIterator;
use parquet::schema::types::{Type as ParquetType, TypePtr, GroupTypeBuilder};

Expand All @@ -31,7 +32,7 @@ use crate::datatypes::money::PgMoney;
use crate::datatypes::numeric::{new_decimal_bytes_appender, new_decimal_int_appender};
use crate::myfrom::{MyFrom, self};
use crate::parquet_writer::{WriterStats, ParquetRowWriter, WriterSettings};
use crate::pg_custom_types::{PgEnum, PgRawRange, PgAbstractRow, PgRawRecord, PgAny, PgAnyRef, UnclonableHack};
use crate::pg_custom_types::{PgAbstractRow, PgAny, PgEnum, PgRawRecord, PgRawRange, PgAnyRef, UnclonableHack};

type ResolvedColumn<TRow> = (DynColumnAppender<TRow>, ParquetType);

Expand Down Expand Up @@ -216,33 +217,84 @@ fn pg_connect(args: &PostgresConnArgs) -> Result<Client, String> {
Ok(client)
}

pub fn execute_copy(pg_args: &PostgresConnArgs, query: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result<WriterStats, String> {

pub fn execute_copy_query(pg_args: &PostgresConnArgs, query: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result<WriterStats, String> {
let mut client = pg_connect(pg_args)?;

let statement = client.prepare(query).map_err(|db_err| { db_err.to_string() })?;
let (row_appender, schema) = map_schema_root::<Row>(statement.columns(), schema_settings)?;

execute_with_writer(output_file, output_props, quiet, schema, row_appender, |row_writer| {
let rows: RowIter = client.query_raw::<Statement, &i32, &[i32]>(&statement, &[])
.map_err(|err| format!("Failed to execute the SQL query: {}", err))?;
for row in rows.iterator() {
let row = row.map_err(|err| err.to_string())?;
let row = Arc::new(row);
row_writer.write_row(row)?;
}
Ok(())
})
}

let (row_appender, schema) = map_schema_root(statement.columns(), schema_settings)?;
pub fn execute_copy_table(pg_args: &PostgresConnArgs, table_name: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result<WriterStats, String> {
let mut client = pg_connect(pg_args)?;

if !quiet {
println!("Copying from table {} to {} using COPY with binary format...", table_name, output_file.display());
}

let schema_query = format!("SELECT * FROM {} LIMIT 0", table_name);
let statement = client.prepare(&schema_query)
.map_err(|err| format!("Failed to prepare schema query: {}", err))?;

let (row_appender, schema) = map_schema_root::<BinaryCopyOutRow>(statement.columns(), schema_settings)?;

execute_with_writer(output_file, output_props, quiet, schema, row_appender, |row_writer| {
let copy_query = format!("COPY {} TO STDOUT (FORMAT BINARY)", table_name);
let copy_reader = client.copy_out(&copy_query)
.map_err(|err| format!("Failed to execute COPY command: {}", err))?;

let column_types: Vec<postgres::types::Type> =
statement.columns().iter().map(|col| col.type_().clone()).collect();

let mut binary_iter = BinaryCopyOutIter::new(copy_reader, &column_types);
while let Some(binary_row) = binary_iter.next()
.map_err(|err| format!("Failed to read binary row: {}", err))? {

row_writer.write_row(Arc::new(binary_row))?;
}
Ok(())
})
}

fn execute_with_writer<T: PgAbstractRow, F>(
output_file: &PathBuf,
output_props: WriterPropertiesPtr,
quiet: bool,
schema: ParquetType,
row_appender: DynColumnAppender<Arc<T>>,
data_processor: F
) -> Result<WriterStats, String>
where
F: FnOnce(&mut ParquetRowWriter<std::fs::File, T>) -> Result<(), String>
{
if !quiet {
eprintln!("Schema: {}", format_schema(&schema, 0));
}
let schema = Arc::new(schema);

let settings = WriterSettings { row_group_byte_limit: 500 * 1024 * 1024, row_group_row_limit: output_props.max_row_group_size() };
let settings = WriterSettings {
row_group_byte_limit: 500 * 1024 * 1024,
row_group_row_limit: output_props.max_row_group_size()
};

let output_file_f = std::fs::File::create(output_file).unwrap();
let output_file_f = std::fs::File::create(output_file)
.map_err(|e| format!("Failed to create output file: {}", e))?;
let pq_writer = SerializedFileWriter::new(output_file_f, schema.clone(), output_props)
.map_err(|e| format!("Failed to create parquet writer: {}", e))?;
let mut row_writer = ParquetRowWriter::new(pq_writer, schema.clone(), row_appender, quiet, settings)
.map_err(|e| format!("Failed to create row writer: {}", e))?;

let rows: RowIter = client.query_raw::<Statement, &i32, &[i32]>(&statement, &[])
.map_err(|err| format!("Failed to execute the SQL query: {}", err))?;
for row in rows.iterator() {
let row = row.map_err(|err| err.to_string())?;
let row = Arc::new(row);

row_writer.write_row(row)?;
}
data_processor(&mut row_writer)?;

Ok(row_writer.close()?)
}
Expand Down Expand Up @@ -329,8 +381,8 @@ fn count_columns(p: &ParquetType) -> usize {
}


fn map_schema_root<'a>(row: &[Column], s: &SchemaSettings) -> Result<ResolvedColumn<Arc<Row>>, String> {
let mut fields: Vec<ResolvedColumn<Arc<Row>>> = vec![];
fn map_schema_root<TRow: PgAbstractRow + 'static>(row: &[Column], s: &SchemaSettings) -> Result<ResolvedColumn<Arc<TRow>>, String> {
let mut fields: Vec<ResolvedColumn<Arc<TRow>>> = vec![];
for (col_i, c) in row.iter().enumerate() {

let t = c.type_();
Expand All @@ -342,7 +394,7 @@ fn map_schema_root<'a>(row: &[Column], s: &SchemaSettings) -> Result<ResolvedCol

let (column_appenders, parquet_types): (Vec<_>, Vec<_>) = fields.into_iter().unzip();

let merged_appender: DynColumnAppender<Arc<Row>> = Box::new(DynamicMergedAppender::new(column_appenders, 0, 0));
let merged_appender: DynColumnAppender<Arc<TRow>> = Box::new(DynamicMergedAppender::new(column_appenders, 0, 0));
let struct_type = ParquetType::group_type_builder("root")
.with_fields(parquet_types.into_iter().map(Arc::new).collect())
.build()
Expand Down
Loading