Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/backend/catalog/system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,25 @@ CREATE VIEW pg_resqueue_status AS
queueholders int4)
ON (s.queueid = q.oid);

-- Resource queue cumulative statistics view
CREATE VIEW pg_stat_resqueues AS
SELECT
q.oid AS queueid,
q.rsqname AS queuename,
s.queries_submitted,
s.queries_admitted,
s.queries_rejected,
s.queries_completed,
s.elapsed_wait_secs AS total_wait_time_secs,
s.max_wait_secs,
s.elapsed_exec_secs AS total_exec_time_secs,
s.max_exec_secs,
s.total_cost,
s.total_memory_kb,
s.stat_reset_timestamp
FROM pg_resqueue AS q,
pg_stat_get_resqueue_stats(q.oid) AS s;

-- External table views

CREATE VIEW pg_max_external_files AS
Expand Down
6 changes: 6 additions & 0 deletions src/backend/cdb/dispatcher/cdbdisp_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
ThrowErrorData(qeError);
}

/*
* GPDB: Merge relation stats sent by QEs so QD's mod_since_analyze
* stays up to date for autovacuum triggering.
*/
pgstat_combine_from_qe(pr);

cdbdisp_returnResults(pr, cdb_pgresults);

cdbdisp_destroyDispatcherState(ds);
Expand Down
5 changes: 5 additions & 0 deletions src/backend/commands/dbcommands.c
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,11 @@ dropdb(const char *dbname, bool missing_ok, bool force)
*/
dropDatabaseDependencies(db_id);

/*
* Tell the cumulative stats system to forget it immediately, too.
*/
pgstat_drop_database(db_id);

/*
* Drop db-specific replication slots.
*/
Expand Down
12 changes: 12 additions & 0 deletions src/backend/executor/execUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -2144,6 +2144,12 @@ void mppExecutorFinishup(QueryDesc *queryDesc)
if (ProcessDispatchResult_hook)
ProcessDispatchResult_hook(ds);

/*
* GPDB: Merge relation stats sent by QEs so QD's mod_since_analyze
* stays up to date for autovacuum triggering.
*/
pgstat_combine_from_qe(pr);

/* get num of rows processed from writer QEs. */
estate->es_processed +=
cdbdisp_sumCmdTuples(pr, primaryWriterSliceIndex);
Expand Down Expand Up @@ -2225,6 +2231,12 @@ uint64 mppExecutorWait(QueryDesc *queryDesc)
LocallyExecutingSliceIndex(queryDesc->estate),
estate->showstatctx);
}
/*
* GPDB: Merge relation stats sent by QEs so QD's mod_since_analyze
* stays up to date for autovacuum triggering.
*/
pgstat_combine_from_qe(pr);

/* get num of rows processed from writer QEs. */
es_processed +=
cdbdisp_sumCmdTuples(pr, primaryWriterSliceIndex);
Expand Down
7 changes: 4 additions & 3 deletions src/backend/postmaster/autovacuum.c
Original file line number Diff line number Diff line change
Expand Up @@ -676,16 +676,17 @@ AutoVacLauncherMain(int argc, char *argv[])
* the database chosen is connectable, the launcher will never select it and the
* worker will continue to signal for a new launcher.
*/
#if 0
/*
* Even when system is configured to use a different fetch consistency,
* for autovac we always want fresh stats.
*/
SetConfigOption("stats_fetch_consistency", "none", PGC_SUSET, PGC_S_OVERRIDE);

#if 0
/*
* In emergency mode, just start a worker (unless shutdown was requested)
* and go away.
* In GPDB, we only want an autovacuum worker to start once we know
* there is a database to vacuum. Therefore, we never want emergency mode
* to start a worker immediately.
*/
if (!AutoVacuumingActive())
{
Expand Down
9 changes: 9 additions & 0 deletions src/backend/tcop/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,15 @@ exec_mpp_query(const char *query_string,

(*receiver->rDestroy) (receiver);

/*
* GPDB: Send pending relation stats to QD before PortalDrop and
* finish_xact_command(). The stats are in pgStatXactStack
* (transaction-level counts); finish_xact_command() calls
* AtEOXact_PgStat() which NULLs pgStatXactStack. We also send
* before PortalDrop to avoid any subtransaction cleanup side effects.
*/
pgstat_send_qd_tabstats();

PortalDrop(portal, false);

/*
Expand Down
4 changes: 4 additions & 0 deletions src/backend/utils/activity/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ subdir = src/backend/utils/activity
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global

# GPDB: needed for libpq-int.h (PGExtraType, pg_result struct)
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)

OBJS = \
backend_progress.o \
backend_status.o \
Expand All @@ -25,6 +28,7 @@ OBJS = \
pgstat_io.o \
pgstat_relation.o \
pgstat_replslot.o \
pgstat_resqueue.o \
pgstat_shmem.o \
pgstat_slru.o \
pgstat_subscription.o \
Expand Down
153 changes: 153 additions & 0 deletions src/backend/utils/activity/pgstat.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
#include "access/transam.h"
#include "access/xact.h"
#include "lib/dshash.h"
#include "libpq/pqformat.h"
#include "libpq-int.h"
#include "pgstat.h"
#include "port/atomics.h"
#include "storage/fd.h"
Expand All @@ -107,6 +109,8 @@
#include "utils/memutils.h"
#include "utils/pgstat_internal.h"
#include "utils/timestamp.h"
#include "catalog/gp_distribution_policy.h"
#include "cdb/cdbvars.h"


/* ----------
Expand Down Expand Up @@ -337,6 +341,22 @@ static const PgStat_KindInfo pgstat_kind_infos[PGSTAT_NUM_KINDS] = {
.reset_timestamp_cb = pgstat_subscription_reset_timestamp_cb,
},

[PGSTAT_KIND_RESQUEUE] = {
.name = "resqueue",

.fixed_amount = false,
/* resource queues are cluster-wide objects, visible across databases */
.accessed_across_databases = true,

.shared_size = sizeof(PgStatShared_ResQueue),
.shared_data_off = offsetof(PgStatShared_ResQueue, stats),
.shared_data_len = sizeof(((PgStatShared_ResQueue *) 0)->stats),
.pending_size = sizeof(PgStat_ResQueueCounts),

.flush_pending_cb = pgstat_resqueue_flush_cb,
.reset_timestamp_cb = pgstat_resqueue_reset_timestamp_cb,
},


/* stats for fixed-numbered (mostly 1) objects */

Expand Down Expand Up @@ -1728,3 +1748,136 @@ assign_stats_fetch_consistency(int newval, void *extra)
if (pgstat_fetch_consistency != newval)
force_stats_snapshot_clear = true;
}


/* -----------------------------------------------------------------------
* GPDB: QE→QD pgstat collection.
*
* After a DML statement completes on QE, send the accumulated pending
* relation stats (from pgStatPending) to the QD via a 'y' protocol message.
* The QD collects these in pgstat_combine_from_qe() and merges them into
* its own pending stats, so autovacuum can see modification counts.
* -----------------------------------------------------------------------
*/

/*
* pgstat_send_qd_tabstats -- QE side: send relation stats to QD.
*
* Must be called only on QE (Gp_role == GP_ROLE_EXECUTE), BEFORE
* finish_xact_command(). At call time the transaction-level per-table
* counts are still in pgStatXactStack. finish_xact_command() calls
* AtEOXact_PgStat() which NULLs pgStatXactStack, so we must read the
* stats before that happens.
*/
void
pgstat_send_qd_tabstats(void)
{
PgStat_SubXactStatus *xact_state;
StringInfoData buf;
PgStatTabRecordFromQE *records;
int nrecords = 0;
int capacity = 64;

if (Gp_role != GP_ROLE_EXECUTE || !Gp_is_writer)
return;

/*
* On QE inside a distributed transaction, stats for the current
* statement are in pgStatXactStack (not yet merged to pgStatPending,
* because the top-level transaction hasn't committed yet). Read the
* current nesting level's per-table insert/update/delete counts.
*/
xact_state = pgstat_get_current_xact_stack();

if (xact_state == NULL)
return;

records = (PgStatTabRecordFromQE *)
palloc(capacity * sizeof(PgStatTabRecordFromQE));

/*
* Send only the current nesting level's per-table insert/update/delete
* counts. QD will place these into its own transactional state (trans),
* letting PG's normal AtEOXact_PgStat_Relations / AtEOSubXact_PgStat_Relations
* machinery handle delta_live_tuples, delta_dead_tuples, changed_tuples,
* and subtransaction commit/abort correctly.
*/
{
PgStat_TableXactStatus *trans;

for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat = trans->parent;
PgStat_Counter ins, upd, del;

ins = trans->tuples_inserted;
upd = trans->tuples_updated;
del = trans->tuples_deleted;

if (ins == 0 && upd == 0 && del == 0 && !trans->truncdropped)
continue;

/*
* Filter by distribution policy: skip catalog tables (QD has
* the same updates) and deduplicate replicated tables (only
* one segment reports, to avoid overcounting).
*/
{
GpPolicy *gppolicy = GpPolicyFetch(tabstat->id);

switch (gppolicy->ptype)
{
case POLICYTYPE_ENTRY:
pfree(gppolicy);
continue;
case POLICYTYPE_REPLICATED:
if (GpIdentity.segindex != tabstat->id % gppolicy->numsegments)
{
pfree(gppolicy);
continue;
}
break;
case POLICYTYPE_PARTITIONED:
break;
default:
elog(ERROR, "unrecognized policy type %d",
gppolicy->ptype);
}
pfree(gppolicy);
}

/* New entry — each table appears at most once per nesting level */
if (nrecords >= capacity)
{
capacity *= 2;
records = (PgStatTabRecordFromQE *)
repalloc(records, capacity * sizeof(PgStatTabRecordFromQE));
}

records[nrecords].t_id = tabstat->id;
records[nrecords].t_shared = tabstat->shared;
records[nrecords].truncdropped = trans->truncdropped;
records[nrecords].tuples_inserted = ins;
records[nrecords].tuples_updated = upd;
records[nrecords].tuples_deleted = del;
nrecords++;
}
}

if (nrecords == 0)
{
pfree(records);
return;
}

pq_beginmessage(&buf, 'y');
pq_sendstring(&buf, "PGSTAT");
pq_sendbyte(&buf, false); /* result not complete yet */
pq_sendint(&buf, PGExtraTypeTableStats, sizeof(PGExtraType));
pq_sendint(&buf, nrecords * sizeof(PgStatTabRecordFromQE), sizeof(int));
pq_sendbytes(&buf, (char *) records, nrecords * sizeof(PgStatTabRecordFromQE));
pq_endmessage(&buf);

pfree(records);

}
Loading