diff --git a/README.md b/README.md index a573f9f..ef5aa32 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,8 @@ options ( ); ``` +You can use [globbing](https://en.wikipedia.org/wiki/Glob_(programming)) to list all the Parquet files, like `options (filename '/mnt/userdata*.parquet')` and it will import all matching files. This can be usefull when you have a Hive directory structure, for instance organized by `year/month/day` and you can consider all Parquet files with `/mnt/userdata/**/*.parquet`. You can also use name enumerations using braces like `/mnt/userdata/data_{1,3}.parquet` that will consider only files `data_1.parquet` and `data_3.parquet`. + ## Advanced Currently `parquet_fdw` supports the following column [types](https://github.com/apache/arrow/blob/master/cpp/src/arrow/type.h): diff --git a/src/common.cpp b/src/common.cpp index 58fe2fc..ffea9fb 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -10,12 +10,32 @@ extern "C" #include "utils/memutils.h" #include "utils/memdebug.h" #include "utils/timestamp.h" +#include "utils/palloc.h" } +#ifndef PG_VERSION_NUM +#error "PG_VERSION_NUM is not defined" +#endif + #if PG_VERSION_NUM < 130000 #define MAXINT8LEN 25 #endif +/* --- PG version guards for MemoryContext alloc APIs (PG14+ uses Extended with flags) --- */ +#ifndef PARQUET_FDW_MCXT_GUARD +#define PARQUET_FDW_MCXT_GUARD + +#if PG_VERSION_NUM >= 140000 + #define PF_MCTX_ALLOC(ctx, sz) MemoryContextAllocExtended((ctx), (sz), 0) + #define PF_MCTX_REALLOC(ctx, p, sz) MemoryContextReallocExtended((ctx), (p), (sz), 0) +#else + #define PF_MCTX_ALLOC(ctx, sz) MemoryContextAlloc((ctx), (sz)) + /* repalloc does not require context in pre-14 */ + #define PF_MCTX_REALLOC(ctx, p, sz) repalloc((p), (sz)) +#endif + +#endif /* PARQUET_FDW_MCXT_GUARD */ + /* * exc_palloc * C++ specific memory allocator that utilizes postgres allocation sets. @@ -34,7 +54,7 @@ exc_palloc(std::size_t size) context->isReset = false; - ret = context->methods->alloc(context, size); + ret = PF_MCTX_ALLOC(context, size); if (unlikely(ret == NULL)) throw std::bad_alloc(); @@ -178,7 +198,7 @@ void datum_to_jsonb(Datum value, Oid typoid, bool isnull, FmgrInfo *outfunc, jb.val.string.val = strval; } else { - Datum numeric; + Datum numeric = (Datum) 0; switch (typoid) { diff --git a/src/exec_state.cpp b/src/exec_state.cpp index dbb2948..a92354a 100644 --- a/src/exec_state.cpp +++ b/src/exec_state.cpp @@ -45,6 +45,7 @@ class TrivialExecutionState : public ParquetFdwExecutionState Size estimate_coord_size() { Assert(false && "estimate_coord_size is not supported for TrivialExecutionState"); + return 0; } void init_coord() { diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 20a5695..7184594 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" @@ -90,6 +91,7 @@ bool enable_multifile_merge; static void find_cmp_func(FmgrInfo *finfo, Oid type1, Oid type2); static void destroy_parquet_state(void *arg); +static List * lappend_globbed_filenames(List *filenames, const char *filename); /* @@ -123,7 +125,7 @@ struct ParquetFdwPlanState static int get_strategy(Oid type, Oid opno, Oid am) { - Oid opclass; + Oid opclass; Oid opfamily; opclass = GetDefaultOpClass(type, am); @@ -496,7 +498,7 @@ parse_filenames_list(const char *str) { case ' ': *cur = '\0'; - filenames = lappend(filenames, makeString(f)); + filenames = lappend_globbed_filenames(filenames, f); state = PS_START; break; default: @@ -508,7 +510,7 @@ parse_filenames_list(const char *str) { case '"': *cur = '\0'; - filenames = lappend(filenames, makeString(f)); + filenames = lappend_globbed_filenames(filenames, f); state = PS_START; break; default: @@ -520,11 +522,64 @@ parse_filenames_list(const char *str) } cur++; } - filenames = lappend(filenames, makeString(f)); + filenames = lappend_globbed_filenames(filenames, f); return filenames; } + +/* + * lappend_globbed_filenames + * The filename can be a globbing pathname matching potentially multiple files. + * All the matched file names are added to the list. + */ +static List * +lappend_globbed_filenames(List *filenames, + const char *filename) +{ + glob_t globbuf; + + globbuf.gl_offs = 0; + int error = glob(filename, GLOB_ERR | GLOB_NOCHECK | GLOB_BRACE, NULL, &globbuf); + switch (error) { + case 0: + for (size_t i = globbuf.gl_offs; i < globbuf.gl_pathc; i++) + { + elog(DEBUG1, "parquet_fdw: adding globbed filename %s to list of files", globbuf.gl_pathv[i]); + filenames = lappend(filenames, makeString(pstrdup(globbuf.gl_pathv[i]))); + } + break; + case GLOB_NOSPACE: + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("parquet_fdw: running out of memory while globbing Parquet filename \"%s\"", + filename))); + break; + case GLOB_ABORTED: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg("parquet_fdw: read error while globbing Parquet filename \"%s\". Check file permissions.", + filename))); + break; + // Should not come here as we use GLOB_NOCHECK flag + case GLOB_NOMATCH: + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FILE), + errmsg("parquet_fdw: no Parquet filename matches \"%s\". Check path.", + filename))); + break; + default: + ereport(ERROR, + (errcode(ERRCODE_SYSTEM_ERROR), + errmsg("parquet_fdw: unknown error; no Parquet filename matches \"%s\". Check path and permissions.", + filename))); + } + globfree(&globbuf); + + return filenames; +} + + /* * extract_rowgroups_list * Analyze query predicates and using min/max statistics determine which @@ -1318,13 +1373,14 @@ parquetGetForeignPaths(PlannerInfo *root, } foreign_path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ + NULL, /* pathtarget (default) */ baserel->rows, startup_cost, total_cost, - NULL, /* no pathkeys */ - NULL, /* no outer rel either */ - NULL, /* no extra plan */ + pathkeys, + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ (List *) fdw_private); if (!enable_multifile && is_multi) foreign_path->total_cost += disable_cost; @@ -1344,13 +1400,14 @@ parquetGetForeignPaths(PlannerInfo *root, memcpy(private_sort, fdw_private, sizeof(ParquetFdwPlanState)); path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ + NULL, /* pathtarget (default) */ baserel->rows, startup_cost, total_cost, pathkeys, - NULL, /* no outer rel either */ - NULL, /* no extra plan */ + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ (List *) private_sort); /* For multifile case calculate the cost of merging files */ @@ -1385,14 +1442,15 @@ parquetGetForeignPaths(PlannerInfo *root, Path *path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ - rows_per_worker, - startup_cost, - startup_cost + run_cost / (num_workers + 1), - use_pathkeys ? pathkeys : NULL, - NULL, /* no outer rel either */ - NULL, /* no extra plan */ - (List *) private_parallel); + NULL, /* pathtarget (default) */ + rows_per_worker, + startup_cost, + startup_cost + run_cost / (num_workers + 1), + use_pathkeys ? pathkeys : NULL, + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ + (List *) private_parallel); path->parallel_workers = num_workers; path->parallel_aware = true; @@ -1416,14 +1474,15 @@ parquetGetForeignPaths(PlannerInfo *root, path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ - rows_per_worker, - startup_cost, - total_cost, - pathkeys, - NULL, /* no outer rel either */ - NULL, /* no extra plan */ - (List *) private_parallel_merge); + NULL, /* pathtarget (default) */ + rows_per_worker, + startup_cost, + total_cost, + pathkeys, + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ + (List *) private_parallel_merge); cost_merge(path, list_length(private_parallel_merge->filenames), startup_cost, total_cost, path->rows); @@ -1451,6 +1510,10 @@ parquetGetForeignPlan(PlannerInfo * /* root */, Plan *outer_plan) { ParquetFdwPlanState *fdw_private = (ParquetFdwPlanState *) best_path->fdw_private; + if (fdw_private == NULL) { + fdw_private = (ParquetFdwPlanState *) palloc0(sizeof(ParquetFdwPlanState)); + } + Index scan_relid = baserel->relid; List *attrs_used = NIL; List *attrs_sorted = NIL; @@ -1474,8 +1537,9 @@ parquetGetForeignPlan(PlannerInfo * /* root */, * Nodes. So we need to convert everything in nodes and store it in a List. */ attr = -1; - while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0) - attrs_used = lappend_int(attrs_used, attr); + if (fdw_private->attrs_used) + while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0) + attrs_used = lappend_int(attrs_used, attr); foreach (lc, fdw_private->attrs_sorted) attrs_sorted = lappend_int(attrs_sorted, lfirst_int(lc));