Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ options (
);
```

You can use [globbing](https://en.wikipedia.org/wiki/Glob_(programming)) to list all the Parquet files, like `options (filename '/mnt/userdata*.parquet')` and it will import all matching files. This can be usefull when you have a Hive directory structure, for instance organized by `year/month/day` and you can consider all Parquet files with `/mnt/userdata/**/*.parquet`. You can also use name enumerations using braces like `/mnt/userdata/data_{1,3}.parquet` that will consider only files `data_1.parquet` and `data_3.parquet`.

## Advanced

Currently `parquet_fdw` supports the following column [types](https://github.com/apache/arrow/blob/master/cpp/src/arrow/type.h):
Expand Down
24 changes: 22 additions & 2 deletions src/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,32 @@ extern "C"
#include "utils/memutils.h"
#include "utils/memdebug.h"
#include "utils/timestamp.h"
#include "utils/palloc.h"
}

#ifndef PG_VERSION_NUM
#error "PG_VERSION_NUM is not defined"
#endif

#if PG_VERSION_NUM < 130000
#define MAXINT8LEN 25
#endif

/* --- PG version guards for MemoryContext alloc APIs (PG14+ uses Extended with flags) --- */
#ifndef PARQUET_FDW_MCXT_GUARD
#define PARQUET_FDW_MCXT_GUARD

#if PG_VERSION_NUM >= 140000
#define PF_MCTX_ALLOC(ctx, sz) MemoryContextAllocExtended((ctx), (sz), 0)
#define PF_MCTX_REALLOC(ctx, p, sz) MemoryContextReallocExtended((ctx), (p), (sz), 0)
#else
#define PF_MCTX_ALLOC(ctx, sz) MemoryContextAlloc((ctx), (sz))
/* repalloc does not require context in pre-14 */
#define PF_MCTX_REALLOC(ctx, p, sz) repalloc((p), (sz))
#endif

#endif /* PARQUET_FDW_MCXT_GUARD */

/*
* exc_palloc
* C++ specific memory allocator that utilizes postgres allocation sets.
Expand All @@ -34,7 +54,7 @@ exc_palloc(std::size_t size)

context->isReset = false;

ret = context->methods->alloc(context, size);
ret = PF_MCTX_ALLOC(context, size);
if (unlikely(ret == NULL))
throw std::bad_alloc();

Expand Down Expand Up @@ -178,7 +198,7 @@ void datum_to_jsonb(Datum value, Oid typoid, bool isnull, FmgrInfo *outfunc,
jb.val.string.val = strval;
}
else {
Datum numeric;
Datum numeric = (Datum) 0;

switch (typoid)
{
Expand Down
1 change: 1 addition & 0 deletions src/exec_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class TrivialExecutionState : public ParquetFdwExecutionState
Size estimate_coord_size()
{
Assert(false && "estimate_coord_size is not supported for TrivialExecutionState");
return 0;
}
void init_coord()
{
Expand Down
122 changes: 93 additions & 29 deletions src/parquet_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <math.h>
#include <list>
#include <set>
#include <glob.h>

#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
Expand Down Expand Up @@ -90,6 +91,7 @@ bool enable_multifile_merge;

static void find_cmp_func(FmgrInfo *finfo, Oid type1, Oid type2);
static void destroy_parquet_state(void *arg);
static List * lappend_globbed_filenames(List *filenames, const char *filename);


/*
Expand Down Expand Up @@ -123,7 +125,7 @@ struct ParquetFdwPlanState
static int
get_strategy(Oid type, Oid opno, Oid am)
{
Oid opclass;
Oid opclass;
Oid opfamily;

opclass = GetDefaultOpClass(type, am);
Expand Down Expand Up @@ -496,7 +498,7 @@ parse_filenames_list(const char *str)
{
case ' ':
*cur = '\0';
filenames = lappend(filenames, makeString(f));
filenames = lappend_globbed_filenames(filenames, f);
state = PS_START;
break;
default:
Expand All @@ -508,7 +510,7 @@ parse_filenames_list(const char *str)
{
case '"':
*cur = '\0';
filenames = lappend(filenames, makeString(f));
filenames = lappend_globbed_filenames(filenames, f);
state = PS_START;
break;
default:
Expand All @@ -520,11 +522,64 @@ parse_filenames_list(const char *str)
}
cur++;
}
filenames = lappend(filenames, makeString(f));
filenames = lappend_globbed_filenames(filenames, f);

return filenames;
}


/*
* lappend_globbed_filenames
* The filename can be a globbing pathname matching potentially multiple files.
* All the matched file names are added to the list.
*/
static List *
lappend_globbed_filenames(List *filenames,
const char *filename)
{
glob_t globbuf;

globbuf.gl_offs = 0;
int error = glob(filename, GLOB_ERR | GLOB_NOCHECK | GLOB_BRACE, NULL, &globbuf);
switch (error) {
case 0:
for (size_t i = globbuf.gl_offs; i < globbuf.gl_pathc; i++)
{
elog(DEBUG1, "parquet_fdw: adding globbed filename %s to list of files", globbuf.gl_pathv[i]);
filenames = lappend(filenames, makeString(pstrdup(globbuf.gl_pathv[i])));
}
break;
case GLOB_NOSPACE:
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("parquet_fdw: running out of memory while globbing Parquet filename \"%s\"",
filename)));
break;
case GLOB_ABORTED:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("parquet_fdw: read error while globbing Parquet filename \"%s\". Check file permissions.",
filename)));
break;
// Should not come here as we use GLOB_NOCHECK flag
case GLOB_NOMATCH:
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FILE),
errmsg("parquet_fdw: no Parquet filename matches \"%s\". Check path.",
filename)));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("parquet_fdw: unknown error; no Parquet filename matches \"%s\". Check path and permissions.",
filename)));
}
globfree(&globbuf);

return filenames;
}


/*
* extract_rowgroups_list
* Analyze query predicates and using min/max statistics determine which
Expand Down Expand Up @@ -1318,13 +1373,14 @@ parquetGetForeignPaths(PlannerInfo *root,
}

foreign_path = (Path *) create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
NULL, /* pathtarget (default) */
baserel->rows,
startup_cost,
total_cost,
NULL, /* no pathkeys */
NULL, /* no outer rel either */
NULL, /* no extra plan */
pathkeys,
NULL, /* required_outer */
NULL, /* fdw_outerpath */
NULL, /* fdw_restrictinfo */
(List *) fdw_private);
if (!enable_multifile && is_multi)
foreign_path->total_cost += disable_cost;
Expand All @@ -1344,13 +1400,14 @@ parquetGetForeignPaths(PlannerInfo *root,
memcpy(private_sort, fdw_private, sizeof(ParquetFdwPlanState));

path = (Path *) create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
NULL, /* pathtarget (default) */
baserel->rows,
startup_cost,
total_cost,
pathkeys,
NULL, /* no outer rel either */
NULL, /* no extra plan */
NULL, /* required_outer */
NULL, /* fdw_outerpath */
NULL, /* fdw_restrictinfo */
(List *) private_sort);

/* For multifile case calculate the cost of merging files */
Expand Down Expand Up @@ -1385,14 +1442,15 @@ parquetGetForeignPaths(PlannerInfo *root,

Path *path = (Path *)
create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
rows_per_worker,
startup_cost,
startup_cost + run_cost / (num_workers + 1),
use_pathkeys ? pathkeys : NULL,
NULL, /* no outer rel either */
NULL, /* no extra plan */
(List *) private_parallel);
NULL, /* pathtarget (default) */
rows_per_worker,
startup_cost,
startup_cost + run_cost / (num_workers + 1),
use_pathkeys ? pathkeys : NULL,
NULL, /* required_outer */
NULL, /* fdw_outerpath */
NULL, /* fdw_restrictinfo */
(List *) private_parallel);

path->parallel_workers = num_workers;
path->parallel_aware = true;
Expand All @@ -1416,14 +1474,15 @@ parquetGetForeignPaths(PlannerInfo *root,

path = (Path *)
create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
rows_per_worker,
startup_cost,
total_cost,
pathkeys,
NULL, /* no outer rel either */
NULL, /* no extra plan */
(List *) private_parallel_merge);
NULL, /* pathtarget (default) */
rows_per_worker,
startup_cost,
total_cost,
pathkeys,
NULL, /* required_outer */
NULL, /* fdw_outerpath */
NULL, /* fdw_restrictinfo */
(List *) private_parallel_merge);

cost_merge(path, list_length(private_parallel_merge->filenames),
startup_cost, total_cost, path->rows);
Expand Down Expand Up @@ -1451,6 +1510,10 @@ parquetGetForeignPlan(PlannerInfo * /* root */,
Plan *outer_plan)
{
ParquetFdwPlanState *fdw_private = (ParquetFdwPlanState *) best_path->fdw_private;
if (fdw_private == NULL) {
fdw_private = (ParquetFdwPlanState *) palloc0(sizeof(ParquetFdwPlanState));
}

Index scan_relid = baserel->relid;
List *attrs_used = NIL;
List *attrs_sorted = NIL;
Expand All @@ -1474,8 +1537,9 @@ parquetGetForeignPlan(PlannerInfo * /* root */,
* Nodes. So we need to convert everything in nodes and store it in a List.
*/
attr = -1;
while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0)
attrs_used = lappend_int(attrs_used, attr);
if (fdw_private->attrs_used)
while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0)
attrs_used = lappend_int(attrs_used, attr);

foreach (lc, fdw_private->attrs_sorted)
attrs_sorted = lappend_int(attrs_sorted, lfirst_int(lc));
Expand Down