Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/greenplum-abi-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
./configure --with-quicklz --disable-gpperfmon --with-gssapi --enable-mapreduce --enable-orafce --enable-ic-proxy \
--enable-orca --with-libxml --with-pythonsrc-ext --with-uuid=e2fs --with-pgport=5432 --enable-tap-tests \
--enable-debug-extensions --with-perl --with-python --with-openssl --with-pam --with-ldap --with-includes="" \
--without-mdblocales \
--without-mdblocales --without-zstd\
--with-libraries="" --disable-rpath \
--prefix=/usr/local/greenplum-db-devel \
--mandir=/usr/local/greenplum-db-devel/man
Expand Down
46 changes: 23 additions & 23 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -11601,50 +11601,50 @@ fi
fi

if test "$with_zstd" = yes; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressCCtx in -lzstd" >&5
$as_echo_n "checking for ZSTD_compressCCtx in -lzstd... " >&6; }
if ${ac_cv_lib_zstd_ZSTD_compressCCtx+:} false; then :
$as_echo_n "(cached) " >&6
else
{ printf "%s\n" "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compressCCtx in -l:libzstd.a" >&5
printf %s "checking for ZSTD_compressCCtx in -l:libzstd.a... " >&6; }
if test ${ac_cv_lib__libzstd_a_ZSTD_compressCCtx+y}
then :
printf %s "(cached) " >&6
else $as_nop
ac_check_lib_save_LIBS=$LIBS
LIBS="-lzstd $LIBS"
LIBS="-l:libzstd.a $LIBS"
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
/* end confdefs.h. */

/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char ZSTD_compressCCtx ();
int
main ()
main (void)
{
return ZSTD_compressCCtx ();
;
return 0;
}
_ACEOF
if ac_fn_c_try_link "$LINENO"; then :
ac_cv_lib_zstd_ZSTD_compressCCtx=yes
else
ac_cv_lib_zstd_ZSTD_compressCCtx=no
if ac_fn_c_try_link "$LINENO"
then :
ac_cv_lib__libzstd_a_ZSTD_compressCCtx=yes
else $as_nop
ac_cv_lib__libzstd_a_ZSTD_compressCCtx=no
fi
rm -f core conftest.err conftest.$ac_objext \
rm -f core conftest.err conftest.$ac_objext conftest.beam \
conftest$ac_exeext conftest.$ac_ext
LIBS=$ac_check_lib_save_LIBS
fi
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compressCCtx" >&5
$as_echo "$ac_cv_lib_zstd_ZSTD_compressCCtx" >&6; }
if test "x$ac_cv_lib_zstd_ZSTD_compressCCtx" = xyes; then :
cat >>confdefs.h <<_ACEOF
#define HAVE_LIBZSTD 1
_ACEOF
{ printf "%s\n" "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib__libzstd_a_ZSTD_compressCCtx" >&5
printf "%s\n" "$ac_cv_lib__libzstd_a_ZSTD_compressCCtx" >&6; }
if test "x$ac_cv_lib__libzstd_a_ZSTD_compressCCtx" = xyes
then :

LIBS="-lzstd $LIBS"
LIBS="-l:libzstd.a $LIBS"

else
printf "%s\n" "#define HAVE_LIBZSTD 1" >>confdefs.h


else $as_nop
as_fn_error $? "zstd library not found
If you have libzstd already installed, see config.log for details on the
failure. It is possible the compiler isn't looking in the proper directory.
Expand Down
2 changes: 1 addition & 1 deletion configure.in
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ Use --without-zlib to disable zlib support.])])
fi

if test "$with_zstd" = yes; then
AC_CHECK_LIB(zstd, ZSTD_compressCCtx, [],
AC_CHECK_LIB(:libzstd.a, ZSTD_compressCCtx, [],
[AC_MSG_ERROR([zstd library not found
If you have libzstd already installed, see config.log for details on the
failure. It is possible the compiler isn't looking in the proper directory.
Expand Down
1 change: 0 additions & 1 deletion docker/test/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ gpconfig -c shared_preload_libraries -v yezzey
if [ "${TEST_CGROUP}" = "true" ]; then
gpconfig -c gp_resource_manager -v "group"
fi

gpstop -a -i && gpstart -a

createdb $USER
Expand Down
42 changes: 39 additions & 3 deletions src/backend/storage/file/buffile.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "postgres.h"

#ifdef HAVE_LIBZSTD
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#endif

Expand All @@ -72,10 +73,11 @@

#include "cdb/cdbvars.h"
#include "storage/gp_compress.h"
#include "utils/gp_alloc.h"
#include "utils/faultinjector.h"
#include "utils/memutils.h"
#include "utils/workfile_mgr.h"

#include "utils/guc.h"
/*
* This data structure represents a buffered file that consists of one
* physical file (accessed through a virtual file descriptor
Expand Down Expand Up @@ -121,6 +123,7 @@ struct BufFile

/* ZStandard compression support */
#ifdef HAVE_LIBZSTD
MemoryContext pg_zstd_context;
zstd_context *zstd_context; /* ZStandard library handles. */

/*
Expand Down Expand Up @@ -364,6 +367,8 @@ BufFileClose(BufFile *file)
#ifdef HAVE_LIBZSTD
if (file->zstd_context)
zstd_free_context(file->zstd_context);
if (file->pg_zstd_context)
MemoryContextDelete(file->pg_zstd_context);
#endif

pfree(file);
Expand Down Expand Up @@ -997,6 +1002,17 @@ BufFilePledgeSequential(BufFile *buffile)
#ifdef HAVE_LIBZSTD

#define BUFFILE_ZSTD_COMPRESSION_LEVEL 1
void *
customAlloc(void *opaque, size_t size)
{
return MemoryContextAlloc((MemoryContext)opaque, size);
}

void
customFree(void *opaque, void *address)
{
pfree(address);
}

/*
* Temporary buffer used during compresion. It's used only within the
Expand All @@ -1012,7 +1028,12 @@ BufFileStartCompression(BufFile *file)
{
ResourceOwner oldowner;
size_t ret;
ZSTD_customMem customMem;
file->pg_zstd_context = AllocSetContextCreate(TopMemoryContext,"zstd_context", ALLOCSET_DEFAULT_SIZES);

customMem.customAlloc = customAlloc;
customMem.customFree = customFree;
customMem.opaque = file->pg_zstd_context;
/*
* When working with compressed files, we rely on libzstd's buffer,
* and the BufFile's own buffer is unused. It's a bit silly that we
Expand All @@ -1038,7 +1059,13 @@ BufFileStartCompression(BufFile *file)
CurrentResourceOwner = file->resowner;

file->zstd_context = zstd_alloc_context();
file->zstd_context->cctx = ZSTD_createCStream();
if (gp_enable_zstd_memory_accounting)
{
file->zstd_context->cctx = ZSTD_createCStream_advanced(customMem);
}else{
file->zstd_context->cctx = ZSTD_createCStream();
}

if (!file->zstd_context->cctx)
elog(ERROR, "out of memory");
ret = ZSTD_initCStream(file->zstd_context->cctx, BUFFILE_ZSTD_COMPRESSION_LEVEL);
Expand Down Expand Up @@ -1097,6 +1124,11 @@ BufFileEndCompression(BufFile *file)
ZSTD_outBuffer output;
size_t ret;
int wrote;
ZSTD_customMem customMem;

customMem.customAlloc = customAlloc;
customMem.customFree = customFree;
customMem.opaque = file->pg_zstd_context;

Assert(file->state == BFS_COMPRESSED_WRITING);

Expand All @@ -1122,7 +1154,11 @@ BufFileEndCompression(BufFile *file)
file->uncompressed_bytes, file->maxoffset);

/* Done writing. Initialize for reading */
file->zstd_context->dctx = ZSTD_createDStream();
if (gp_enable_zstd_memory_accounting){
file->zstd_context->dctx = ZSTD_createDStream_advanced(customMem);
}else{
file->zstd_context->dctx = ZSTD_createDStream();
}
if (!file->zstd_context->dctx)
elog(ERROR, "out of memory");
ret = ZSTD_initDStream(file->zstd_context->dctx);
Expand Down
14 changes: 13 additions & 1 deletion src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ bool gp_enable_exchange_default_partition = false;
int dtx_phase2_retry_count = 0;
bool gp_log_suboverflow_statement = false;
bool gp_use_synchronize_seqscans_catalog_vacuum_full = false;

bool gp_enable_zstd_memory_accounting = true;
bool log_dispatch_stats = false;

int explain_memory_verbosity = 0;
Expand Down Expand Up @@ -3294,10 +3294,22 @@ struct config_bool ConfigureNamesBool_gp[] =
NULL
},
&gp_use_synchronize_seqscans_catalog_vacuum_full,
true,
NULL, NULL, NULL
},

{
{"gp_enable_zstd_memory_accounting", PGC_USERSET, COMPAT_OPTIONS_PREVIOUS,
gettext_noop("Enables normal memory counting in zstd (not using malloc)"),
NULL
},
&gp_enable_zstd_memory_accounting,
false,
NULL, NULL, NULL
},



{
{"gp_external_fail_on_eof", PGC_USERSET, EXTERNAL_TABLES,
gettext_noop("Fail with ERROR and rollback on 'unexpected end of file' error when used with a CUSTOM formatter."),
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ extern bool gp_enable_exchange_default_partition;
extern int dtx_phase2_retry_count;
extern bool gp_log_suboverflow_statement;
extern bool gp_use_synchronize_seqscans_catalog_vacuum_full;
extern bool gp_enable_zstd_memory_accounting;

/* WAL replication debug gucs */
extern bool debug_walrepl_snd;
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/sync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,4 @@
"gp_resgroup_debug_wait_queue",
"gp_log_resqueue_priority_sleep_time",
"yc_allow_copy_to_program",
"gp_enable_zstd_memory_accounting",
Loading