From a6f312b23fda8b271aef3e08cb8392b627eab063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre=20M=C3=A9tras?= Date: Tue, 26 Aug 2025 08:41:41 -0400 Subject: [PATCH 1/4] Add globbed filenames to list of filenames --- src/parquet_impl.cpp | 64 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 20a5695..388cb0e 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "parquet/arrow/reader.h" #include "parquet/arrow/schema.h" @@ -90,6 +91,7 @@ bool enable_multifile_merge; static void find_cmp_func(FmgrInfo *finfo, Oid type1, Oid type2); static void destroy_parquet_state(void *arg); +static List * lappend_glob(List *filenames, const char *filename); /* @@ -468,6 +470,7 @@ parse_filenames_list(const char *str) char *f = cur; ParserState state = PS_START; List *filenames = NIL; + bool wildcard_seen = false; while (*cur) { @@ -483,6 +486,11 @@ parse_filenames_list(const char *str) f = cur + 1; state = PS_QUOTE; break; + case '*': + case '?': + case '[': + wildcard_seen = true; + break; default: /* XXX we should check that *cur is a valid path symbol * but let's skip it for now */ @@ -496,8 +504,12 @@ parse_filenames_list(const char *str) { case ' ': *cur = '\0'; - filenames = lappend(filenames, makeString(f)); + if (wildcard_seen) + filenames = lappend_glob(filenames, f); + else + filenames = lappend(filenames, makeString(f)); state = PS_START; + wildcard_seen = false; break; default: break; @@ -508,8 +520,12 @@ parse_filenames_list(const char *str) { case '"': *cur = '\0'; - filenames = lappend(filenames, makeString(f)); + if (wildcard_seen) + filenames = lappend_glob(filenames, f); + else + filenames = lappend(filenames, makeString(f)); state = PS_START; + wildcard_seen = false; break; default: break; @@ -520,11 +536,53 @@ parse_filenames_list(const char *str) } cur++; } - filenames = lappend(filenames, makeString(f)); + if (wildcard_seen) + filenames = lappend_glob(filenames, f); + else + filenames = lappend(filenames, makeString(f)); return filenames; } + +/* + * lappend_glob + * The filename is a globbing pathname matching potentially multiple files. + * All the matched file names are added to the list. + */ +static List * +lappend_glob(List *filenames, + const char *filename) +{ + glob_t globbuf; + + int error = glob(filename, GLOB_NOCHECK | GLOB_ERR, NULL, &globbuf); + switch (error) { + case 0: + for (size_t i = 0; i < globbuf.gl_pathc; i++) + { + filenames = lappend(filenames, makeString(globbuf.gl_pathv[i])); + } + break; + case GLOB_NOSPACE: + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("running out of memory while globbing Parquet filename \"%s\"", + filename))); + break; + case GLOB_ABORTED: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg("read error while globbing Parquet filename \"%s\". Check file permissions.", + filename))); + break; + } + globfree(&globbuf); + + return filenames; +} + + /* * extract_rowgroups_list * Analyze query predicates and using min/max statistics determine which From a387d543a366fb0a2fd5c96d0653d74049afa808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre=20M=C3=A9tras?= Date: Wed, 27 Aug 2025 15:07:02 -0400 Subject: [PATCH 2/4] Upgrade to PostgreSQL 17 and fix memory bug --- src/common.cpp | 24 ++++++++- src/exec_state.cpp | 1 + src/parquet_impl.cpp | 118 +++++++++++++++++++++++-------------------- 3 files changed, 85 insertions(+), 58 deletions(-) diff --git a/src/common.cpp b/src/common.cpp index 58fe2fc..ffea9fb 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -10,12 +10,32 @@ extern "C" #include "utils/memutils.h" #include "utils/memdebug.h" #include "utils/timestamp.h" +#include "utils/palloc.h" } +#ifndef PG_VERSION_NUM +#error "PG_VERSION_NUM is not defined" +#endif + #if PG_VERSION_NUM < 130000 #define MAXINT8LEN 25 #endif +/* --- PG version guards for MemoryContext alloc APIs (PG14+ uses Extended with flags) --- */ +#ifndef PARQUET_FDW_MCXT_GUARD +#define PARQUET_FDW_MCXT_GUARD + +#if PG_VERSION_NUM >= 140000 + #define PF_MCTX_ALLOC(ctx, sz) MemoryContextAllocExtended((ctx), (sz), 0) + #define PF_MCTX_REALLOC(ctx, p, sz) MemoryContextReallocExtended((ctx), (p), (sz), 0) +#else + #define PF_MCTX_ALLOC(ctx, sz) MemoryContextAlloc((ctx), (sz)) + /* repalloc does not require context in pre-14 */ + #define PF_MCTX_REALLOC(ctx, p, sz) repalloc((p), (sz)) +#endif + +#endif /* PARQUET_FDW_MCXT_GUARD */ + /* * exc_palloc * C++ specific memory allocator that utilizes postgres allocation sets. @@ -34,7 +54,7 @@ exc_palloc(std::size_t size) context->isReset = false; - ret = context->methods->alloc(context, size); + ret = PF_MCTX_ALLOC(context, size); if (unlikely(ret == NULL)) throw std::bad_alloc(); @@ -178,7 +198,7 @@ void datum_to_jsonb(Datum value, Oid typoid, bool isnull, FmgrInfo *outfunc, jb.val.string.val = strval; } else { - Datum numeric; + Datum numeric = (Datum) 0; switch (typoid) { diff --git a/src/exec_state.cpp b/src/exec_state.cpp index dbb2948..a92354a 100644 --- a/src/exec_state.cpp +++ b/src/exec_state.cpp @@ -45,6 +45,7 @@ class TrivialExecutionState : public ParquetFdwExecutionState Size estimate_coord_size() { Assert(false && "estimate_coord_size is not supported for TrivialExecutionState"); + return 0; } void init_coord() { diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 388cb0e..592c9db 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -91,7 +91,7 @@ bool enable_multifile_merge; static void find_cmp_func(FmgrInfo *finfo, Oid type1, Oid type2); static void destroy_parquet_state(void *arg); -static List * lappend_glob(List *filenames, const char *filename); +static List * lappend_globbed_filenames(List *filenames, const char *filename); /* @@ -125,7 +125,7 @@ struct ParquetFdwPlanState static int get_strategy(Oid type, Oid opno, Oid am) { - Oid opclass; + Oid opclass; Oid opfamily; opclass = GetDefaultOpClass(type, am); @@ -470,7 +470,6 @@ parse_filenames_list(const char *str) char *f = cur; ParserState state = PS_START; List *filenames = NIL; - bool wildcard_seen = false; while (*cur) { @@ -486,11 +485,6 @@ parse_filenames_list(const char *str) f = cur + 1; state = PS_QUOTE; break; - case '*': - case '?': - case '[': - wildcard_seen = true; - break; default: /* XXX we should check that *cur is a valid path symbol * but let's skip it for now */ @@ -504,12 +498,8 @@ parse_filenames_list(const char *str) { case ' ': *cur = '\0'; - if (wildcard_seen) - filenames = lappend_glob(filenames, f); - else - filenames = lappend(filenames, makeString(f)); + filenames = lappend_globbed_filenames(filenames, f); state = PS_START; - wildcard_seen = false; break; default: break; @@ -520,12 +510,8 @@ parse_filenames_list(const char *str) { case '"': *cur = '\0'; - if (wildcard_seen) - filenames = lappend_glob(filenames, f); - else - filenames = lappend(filenames, makeString(f)); + filenames = lappend_globbed_filenames(filenames, f); state = PS_START; - wildcard_seen = false; break; default: break; @@ -536,46 +522,57 @@ parse_filenames_list(const char *str) } cur++; } - if (wildcard_seen) - filenames = lappend_glob(filenames, f); - else - filenames = lappend(filenames, makeString(f)); + filenames = lappend_globbed_filenames(filenames, f); return filenames; } /* - * lappend_glob - * The filename is a globbing pathname matching potentially multiple files. + * lappend_globbed_filenames + * The filename can be a globbing pathname matching potentially multiple files. * All the matched file names are added to the list. */ static List * -lappend_glob(List *filenames, - const char *filename) +lappend_globbed_filenames(List *filenames, + const char *filename) { glob_t globbuf; - int error = glob(filename, GLOB_NOCHECK | GLOB_ERR, NULL, &globbuf); + globbuf.gl_offs = 0; + int error = glob(filename, GLOB_ERR | GLOB_NOCHECK, NULL, &globbuf); switch (error) { case 0: - for (size_t i = 0; i < globbuf.gl_pathc; i++) + for (size_t i = globbuf.gl_offs; i < globbuf.gl_pathc; i++) { - filenames = lappend(filenames, makeString(globbuf.gl_pathv[i])); + elog(DEBUG1, "parquet_fdw: adding globbed filename %s to list of files", globbuf.gl_pathv[i]); + filenames = lappend(filenames, makeString(pstrdup(globbuf.gl_pathv[i]))); } break; case GLOB_NOSPACE: ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("running out of memory while globbing Parquet filename \"%s\"", + errmsg("parquet_fdw: running out of memory while globbing Parquet filename \"%s\"", filename))); break; case GLOB_ABORTED: ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg("read error while globbing Parquet filename \"%s\". Check file permissions.", + errmsg("parquet_fdw: read error while globbing Parquet filename \"%s\". Check file permissions.", + filename))); + break; + // Should not come here as we use GLOB_NOCHECK flag + case GLOB_NOMATCH: + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FILE), + errmsg("parquet_fdw: no Parquet filename matches \"%s\". Check path.", filename))); break; + default: + ereport(ERROR, + (errcode(ERRCODE_SYSTEM_ERROR), + errmsg("parquet_fdw: unknown error; no Parquet filename matches \"%s\". Check path and permissions.", + filename))); } globfree(&globbuf); @@ -1376,13 +1373,14 @@ parquetGetForeignPaths(PlannerInfo *root, } foreign_path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ + NULL, /* pathtarget (default) */ baserel->rows, startup_cost, total_cost, - NULL, /* no pathkeys */ - NULL, /* no outer rel either */ - NULL, /* no extra plan */ + pathkeys, + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ (List *) fdw_private); if (!enable_multifile && is_multi) foreign_path->total_cost += disable_cost; @@ -1402,13 +1400,14 @@ parquetGetForeignPaths(PlannerInfo *root, memcpy(private_sort, fdw_private, sizeof(ParquetFdwPlanState)); path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ + NULL, /* pathtarget (default) */ baserel->rows, startup_cost, total_cost, pathkeys, - NULL, /* no outer rel either */ - NULL, /* no extra plan */ + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ (List *) private_sort); /* For multifile case calculate the cost of merging files */ @@ -1443,14 +1442,15 @@ parquetGetForeignPaths(PlannerInfo *root, Path *path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ - rows_per_worker, - startup_cost, - startup_cost + run_cost / (num_workers + 1), - use_pathkeys ? pathkeys : NULL, - NULL, /* no outer rel either */ - NULL, /* no extra plan */ - (List *) private_parallel); + NULL, /* pathtarget (default) */ + rows_per_worker, + startup_cost, + startup_cost + run_cost / (num_workers + 1), + use_pathkeys ? pathkeys : NULL, + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ + (List *) private_parallel); path->parallel_workers = num_workers; path->parallel_aware = true; @@ -1474,14 +1474,15 @@ parquetGetForeignPaths(PlannerInfo *root, path = (Path *) create_foreignscan_path(root, baserel, - NULL, /* default pathtarget */ - rows_per_worker, - startup_cost, - total_cost, - pathkeys, - NULL, /* no outer rel either */ - NULL, /* no extra plan */ - (List *) private_parallel_merge); + NULL, /* pathtarget (default) */ + rows_per_worker, + startup_cost, + total_cost, + pathkeys, + NULL, /* required_outer */ + NULL, /* fdw_outerpath */ + NULL, /* fdw_restrictinfo */ + (List *) private_parallel_merge); cost_merge(path, list_length(private_parallel_merge->filenames), startup_cost, total_cost, path->rows); @@ -1509,6 +1510,10 @@ parquetGetForeignPlan(PlannerInfo * /* root */, Plan *outer_plan) { ParquetFdwPlanState *fdw_private = (ParquetFdwPlanState *) best_path->fdw_private; + if (fdw_private == NULL) { + fdw_private = (ParquetFdwPlanState *) palloc0(sizeof(ParquetFdwPlanState)); + } + Index scan_relid = baserel->relid; List *attrs_used = NIL; List *attrs_sorted = NIL; @@ -1532,8 +1537,9 @@ parquetGetForeignPlan(PlannerInfo * /* root */, * Nodes. So we need to convert everything in nodes and store it in a List. */ attr = -1; - while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0) - attrs_used = lappend_int(attrs_used, attr); + if (fdw_private->attrs_used) + while ((attr = bms_next_member(fdw_private->attrs_used, attr)) >= 0) + attrs_used = lappend_int(attrs_used, attr); foreach (lc, fdw_private->attrs_sorted) attrs_sorted = lappend_int(attrs_sorted, lfirst_int(lc)); From e094be44f08dd8fac40ecb3a91ed545fc652f900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre=20M=C3=A9tras?= Date: Wed, 27 Aug 2025 15:43:07 -0400 Subject: [PATCH 3/4] Update documentation --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index a573f9f..27c3c17 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,8 @@ options ( ); ``` +You can use [globbing](https://en.wikipedia.org/wiki/Glob_(programming)) to list all the Parquet files, like `options (filename '/mnt/userdata*.parquet')` and it will import all matching files. This can be usefull when you have a Hive directory structure, for instance organized by `year/month/day` and you can consider all Parquet files with `/mnt/userdata/**/*.parquet`. + ## Advanced Currently `parquet_fdw` supports the following column [types](https://github.com/apache/arrow/blob/master/cpp/src/arrow/type.h): From a9a5d5b00a3978aef4a3bcbf563779494c68be49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre=20M=C3=A9tras?= Date: Tue, 2 Sep 2025 09:38:28 -0400 Subject: [PATCH 4/4] Support for filenames enumeration using braces --- README.md | 2 +- src/parquet_impl.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 27c3c17..ef5aa32 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ options ( ); ``` -You can use [globbing](https://en.wikipedia.org/wiki/Glob_(programming)) to list all the Parquet files, like `options (filename '/mnt/userdata*.parquet')` and it will import all matching files. This can be usefull when you have a Hive directory structure, for instance organized by `year/month/day` and you can consider all Parquet files with `/mnt/userdata/**/*.parquet`. +You can use [globbing](https://en.wikipedia.org/wiki/Glob_(programming)) to list all the Parquet files, like `options (filename '/mnt/userdata*.parquet')` and it will import all matching files. This can be usefull when you have a Hive directory structure, for instance organized by `year/month/day` and you can consider all Parquet files with `/mnt/userdata/**/*.parquet`. You can also use name enumerations using braces like `/mnt/userdata/data_{1,3}.parquet` that will consider only files `data_1.parquet` and `data_3.parquet`. ## Advanced diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 592c9db..7184594 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -540,7 +540,7 @@ lappend_globbed_filenames(List *filenames, glob_t globbuf; globbuf.gl_offs = 0; - int error = glob(filename, GLOB_ERR | GLOB_NOCHECK, NULL, &globbuf); + int error = glob(filename, GLOB_ERR | GLOB_NOCHECK | GLOB_BRACE, NULL, &globbuf); switch (error) { case 0: for (size_t i = globbuf.gl_offs; i < globbuf.gl_pathc; i++)