From 4bca308cae136173e34c3c6a0d1230762a140164 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Tue, 18 Jul 2023 13:32:59 +0800 Subject: [PATCH 1/8] Add a syntax to create Incrementally Maintainable Materialized Views Allow to create Incrementally Maintainable Materialized View (IMMV) by using INCREMENTAL option in CREATE MATERIALIZED VIEW command as follow: CREATE [INCREMANTAL] MATERIALIZED VIEW xxxxx AS SELECT ....; --- src/backend/parser/gram.y | 34 ++++++++++++++++++++++------------ src/include/nodes/primnodes.h | 1 + src/include/parser/kwlist.h | 1 + 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 70cff912d51..0a8ec864028 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -488,6 +488,7 @@ static void check_expressions_in_partition_key(PartitionSpec *spec, core_yyscan_ %type OptTempTableName %type into_clause create_as_target create_mv_target +%type incremental %type createfunc_opt_item common_func_opt_item dostmt_opt_item %type func_arg func_arg_with_default table_func_column aggr_arg @@ -764,7 +765,7 @@ static void check_expressions_in_partition_key(PartitionSpec *spec, core_yyscan_ HANDLER HAVING HEADER_P HOLD HOUR_P IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE - INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P + INCLUDING INCREMENT INCREMENTAL INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION @@ -6694,31 +6695,33 @@ ext_opt_encoding_item: *****************************************************************************/ CreateMatViewStmt: - CREATE OptNoLog MATERIALIZED VIEW create_mv_target AS SelectStmt opt_with_data OptDistributedBy + CREATE OptNoLog incremental MATERIALIZED VIEW create_mv_target AS SelectStmt opt_with_data OptDistributedBy { CreateTableAsStmt *ctas = makeNode(CreateTableAsStmt); - ctas->query = $7; - ctas->into = $5; + ctas->query = $8; + ctas->into = $6; ctas->objtype = OBJECT_MATVIEW; ctas->is_select_into = false; ctas->if_not_exists = false; /* cram additional flags into the IntoClause */ - $5->rel->relpersistence = $2; - $5->skipData = !($8); - ctas->into->distributedBy = $9; + $6->rel->relpersistence = $2; + $6->skipData = !($9); + $6->ivm = $3; + ctas->into->distributedBy = $10; $$ = (Node *) ctas; } - | CREATE OptNoLog MATERIALIZED VIEW IF_P NOT EXISTS create_mv_target AS SelectStmt opt_with_data + | CREATE OptNoLog incremental MATERIALIZED VIEW IF_P NOT EXISTS create_mv_target AS SelectStmt opt_with_data { CreateTableAsStmt *ctas = makeNode(CreateTableAsStmt); - ctas->query = $10; - ctas->into = $8; + ctas->query = $11; + ctas->into = $9; ctas->objtype = OBJECT_MATVIEW; ctas->is_select_into = false; ctas->if_not_exists = true; /* cram additional flags into the IntoClause */ - $8->rel->relpersistence = $2; - $8->skipData = !($11); + $9->rel->relpersistence = $2; + $9->skipData = !($12); + $9->ivm = $3; $$ = (Node *) ctas; } ; @@ -6735,11 +6738,16 @@ create_mv_target: $$->tableSpaceName = $5; $$->viewQuery = NULL; /* filled at analysis time */ $$->skipData = false; /* might get changed later */ + $$->ivm = false; $$->accessMethod = greenplumLegacyAOoptions($$->accessMethod, &$$->options); } ; +incremental: INCREMENTAL { $$ = true; } + | /*EMPTY*/ { $$ = false; } + ; + OptNoLog: UNLOGGED { $$ = RELPERSISTENCE_UNLOGGED; } | /*EMPTY*/ { $$ = RELPERSISTENCE_PERMANENT; } ; @@ -18583,6 +18591,7 @@ unreserved_keyword: | INCLUDING | INCLUSIVE | INCREMENT + | INCREMENTAL | INDEX | INDEXES | INHERIT @@ -19511,6 +19520,7 @@ bare_label_keyword: | INCLUDING | INCLUSIVE | INCREMENT + | INCREMENTAL | INDEX | INDEXES | INHERIT diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 57fb269564e..00b5ab952ce 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -122,6 +122,7 @@ typedef struct IntoClause Node *viewQuery; /* materialized view's SELECT query */ bool skipData; /* true for WITH NO DATA */ Node *distributedBy; /* GPDB: columns to distribubte the data on. */ + bool ivm; /* true for WITH IVM */ } IntoClause; typedef struct CopyIntoClause diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index fc275952c71..363fa5e70ba 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -229,6 +229,7 @@ PG_KEYWORD("include", INCLUDE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("including", INCLUDING, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("inclusive", INCLUSIVE, UNRESERVED_KEYWORD, BARE_LABEL) /* GPDB */ PG_KEYWORD("increment", INCREMENT, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("incremental", INCREMENTAL, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("index", INDEX, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("indexes", INDEXES, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("inherit", INHERIT, UNRESERVED_KEYWORD, BARE_LABEL) From ce7deabcf34256908f75d5f9c6a9d7624c7db021 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Tue, 18 Jul 2023 13:58:15 +0800 Subject: [PATCH 2/8] Add relisivm column to pg_class system catalog If this boolean column is true, a relations is Incrementally Maintainable Materialized View (IMMV). This is set when IMMV is created. --- src/backend/catalog/heap.c | 1 + src/backend/catalog/index.c | 1 + src/backend/utils/cache/lsyscache.c | 24 ++++++++++++++++++++++++ src/backend/utils/cache/relcache.c | 2 ++ src/include/catalog/pg_class.h | 3 +++ src/include/utils/lsyscache.h | 1 + src/include/utils/rel.h | 2 ++ 7 files changed, 34 insertions(+) diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 4b6f11597d1..6ab984a08cf 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1306,6 +1306,7 @@ InsertPgClassTuple(Relation pg_class_desc, values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite); values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid); values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid); + values[Anum_pg_class_relisivm - 1] = BoolGetDatum(rd_rel->relisivm); if (relacl != (Datum) 0) values[Anum_pg_class_relacl - 1] = relacl; else diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 715a3ddf4b7..cdb71d17c8a 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1002,6 +1002,7 @@ index_create(Relation heapRelation, indexRelation->rd_rel->relowner = heapRelation->rd_rel->relowner; indexRelation->rd_rel->relam = accessMethodObjectId; indexRelation->rd_rel->relispartition = OidIsValid(parentIndexRelid); + indexRelation->rd_rel->relisivm = false; /* * store index's pg_class entry diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 1d63212197a..f5c96d20ac2 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -2044,6 +2044,30 @@ is_agg_partial_capable(Oid aggid) return result; } +/* + * get_rel_relisivm + * + * Returns the relisivm flag associated with a given relation. + */ +bool +get_rel_relisivm(Oid relid) +{ + HeapTuple tp; + + tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + if (HeapTupleIsValid(tp)) + { + Form_pg_class reltup = (Form_pg_class) GETSTRUCT(tp); + bool result; + + result = reltup->relisivm; + ReleaseSysCache(tp); + return result; + } + else + return false; +} + /* * get_rel_tablespace * diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index f8dd5c30368..deef5d3887d 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -1966,6 +1966,8 @@ formrdesc(const char *relationName, Oid relationReltype, /* ... and they're always populated, too */ relation->rd_rel->relispopulated = true; + /* ... and they're always no ivm, too */ + relation->rd_rel->relisivm = false; relation->rd_rel->relreplident = REPLICA_IDENTITY_NOTHING; relation->rd_rel->relpages = 0; diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h index 02e56aed583..813d47fee83 100644 --- a/src/include/catalog/pg_class.h +++ b/src/include/catalog/pg_class.h @@ -119,6 +119,9 @@ CATALOG(pg_class,1259,RelationRelationId) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83,Relat /* is relation a partition? */ bool relispartition BKI_DEFAULT(f); + /* is relation a matview with ivm? */ + bool relisivm BKI_DEFAULT(f); + /* link to original rel during table rewrite; otherwise 0 */ Oid relrewrite BKI_DEFAULT(0) BKI_LOOKUP_OPT(pg_class); diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 6369d4d36fd..aa2d4014de6 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -171,6 +171,7 @@ extern Oid get_rel_namespace(Oid relid); extern Oid get_rel_type_id(Oid relid); extern char get_rel_relkind(Oid relid); extern bool get_rel_relispartition(Oid relid); +extern bool get_rel_relisivm(Oid relid); extern Oid get_rel_tablespace(Oid relid); extern char get_rel_persistence(Oid relid); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 6c3757dab00..eb1dc64726e 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -728,6 +728,8 @@ typedef struct ViewOptions */ #define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated) +#define RelationIsIVM(relation) ((relation)->rd_rel->relisivm) + /* * RelationIsAccessibleInLogicalDecoding * True if we need to log enough information to have access via From 6d102f1dcf69c65c8a3c4904ecdbb61829e91720 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Tue, 18 Jul 2023 14:49:03 +0800 Subject: [PATCH 3/8] Allow to prolong life span of transition tables until transaction end Originally, tuplestores of AFTER trigger's transition tables were freed for each query depth. For our IVM implementation, we would like to prolong life of the tuplestores because we have to preserve them for a whole query assuming that some base tables might be changed in some trigger functions. --- src/backend/commands/trigger.c | 79 +++++++++++++++++++++++++++++++++- src/include/commands/trigger.h | 2 + 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 023624dbd22..c163a60dd58 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -3552,6 +3552,10 @@ typedef struct AfterTriggerEventList * end of the list, so it is relatively easy to discard them. The event * list chunks themselves are stored in event_cxt. * + * prolonged_tuplestored is a list of transition table tuplestores whose + * life are prolonged to the end of the outmost query instead of each nested + * query. + * * query_depth is the current depth of nested AfterTriggerBeginQuery calls * (-1 when the stack is empty). * @@ -3617,6 +3621,7 @@ typedef struct AfterTriggersData SetConstraintState state; /* the active S C state */ AfterTriggerEventList events; /* deferred-event list */ MemoryContext event_cxt; /* memory context for events, if any */ + List *prolonged_tuplestores; /* list of prolonged tuplestores */ /* per-query-level data: */ AfterTriggersQueryData *query_stack; /* array of structs shown below */ @@ -3652,6 +3657,7 @@ struct AfterTriggersTableData bool closed; /* true when no longer OK to add tuples */ bool before_trig_done; /* did we already queue BS triggers? */ bool after_trig_done; /* did we already queue AS triggers? */ + bool prolonged; /* are transition tables prolonged? */ AfterTriggerEventList after_trig_events; /* if so, saved list pointer */ Tuplestorestate *old_tuplestore; /* "old" transition table, if any */ Tuplestorestate *new_tuplestore; /* "new" transition table, if any */ @@ -3674,6 +3680,7 @@ static AfterTriggersTableData *GetAfterTriggersTableData(Oid relid, static TupleTableSlot *GetAfterTriggersStoreSlot(AfterTriggersTableData *table, TupleDesc tupdesc); static void AfterTriggerFreeQuery(AfterTriggersQueryData *qs); +static void release_or_prolong_tuplestore(Tuplestorestate *ts, bool prolonged); static SetConstraintState SetConstraintStateCreate(int numalloc); static SetConstraintState SetConstraintStateCopy(SetConstraintState state); static SetConstraintState SetConstraintStateAddItem(SetConstraintState state, @@ -4432,6 +4439,45 @@ afterTriggerInvokeEvents(AfterTriggerEventList *events, } +/* + * SetTransitionTablePreserved + * + * Prolong lifespan of transition tables corresponding specified relid and + * command type to the end of the outmost query instead of each nested query. + * This enables to use nested AFTER trigger's transition tables from outer + * query's triggers. Currently, only immediate incremental view maintenance + * uses this. + */ +void +SetTransitionTablePreserved(Oid relid, CmdType cmdType) +{ + AfterTriggersTableData *table; + AfterTriggersQueryData *qs; + bool found = false; + ListCell *lc; + + /* Check state, like AfterTriggerSaveEvent. */ + if (afterTriggers.query_depth < 0) + elog(ERROR, "SetTransitionTablePreserved() called outside of query"); + + qs = &afterTriggers.query_stack[afterTriggers.query_depth]; + + foreach(lc, qs->tables) + { + table = (AfterTriggersTableData *) lfirst(lc); + if (table->relid == relid && table->cmdType == cmdType && + table->closed) + { + table->prolonged = true; + found = true; + } + } + + if (!found) + elog(ERROR,"could not find table with OID %d and command type %d", relid, cmdType); +} + + /* * GetAfterTriggersTableData * @@ -4626,6 +4672,7 @@ AfterTriggerBeginXact(void) */ afterTriggers.firing_counter = (CommandId) 1; /* mustn't be 0 */ afterTriggers.query_depth = -1; + afterTriggers.prolonged_tuplestores = NIL; /* * Verify that there is no leftover state remaining. If these assertions @@ -4786,11 +4833,11 @@ AfterTriggerFreeQuery(AfterTriggersQueryData *qs) ts = table->old_tuplestore; table->old_tuplestore = NULL; if (ts) - tuplestore_end(ts); + release_or_prolong_tuplestore(ts, table->prolonged); ts = table->new_tuplestore; table->new_tuplestore = NULL; if (ts) - tuplestore_end(ts); + release_or_prolong_tuplestore(ts, table->prolonged); if (table->storeslot) ExecDropSingleTupleTableSlot(table->storeslot); } @@ -4802,6 +4849,34 @@ AfterTriggerFreeQuery(AfterTriggersQueryData *qs) */ qs->tables = NIL; list_free_deep(tables); + + /* Release prolonged tuplestores at the end of the outmost query */ + if (afterTriggers.query_depth == 0) + { + foreach(lc, afterTriggers.prolonged_tuplestores) + { + ts = (Tuplestorestate *) lfirst(lc); + if (ts) + tuplestore_end(ts); + } + afterTriggers.prolonged_tuplestores = NIL; + } +} + +/* + * Release the tuplestore, or append it to the prolonged tuplestores list. + */ +static void +release_or_prolong_tuplestore(Tuplestorestate *ts, bool prolonged) +{ + if (prolonged && afterTriggers.query_depth > 0) + { + MemoryContext oldcxt = MemoryContextSwitchTo(CurTransactionContext); + afterTriggers.prolonged_tuplestores = lappend(afterTriggers.prolonged_tuplestores, ts); + MemoryContextSwitchTo(oldcxt); + } + else + tuplestore_end(ts); } diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index fc826b80006..5d9aa063c62 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -254,6 +254,8 @@ extern void AfterTriggerEndSubXact(bool isCommit); extern void AfterTriggerSetState(ConstraintsSetStmt *stmt); extern bool AfterTriggerPendingOnRel(Oid relid); +extern void SetTransitionTablePreserved(Oid relid, CmdType cmdType); + /* * in utils/adt/ri_triggers.c From 77bebdd5ac9ecb946b0b86a8cfd50d48b99d03ad Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Tue, 18 Jul 2023 15:16:59 +0800 Subject: [PATCH 4/8] Add Incremental View Maintenance support to pg_dump --- src/bin/pg_dump/pg_dump.c | 10 ++++++++-- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 15 +++++++++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ef2a276c048..be9dd73c109 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -7386,6 +7386,7 @@ getTables(Archive *fout, int *numTables) int i_ispartition; int i_partbound; int i_amname; + int i_isivm; /* * Find all the tables and table-like objects. @@ -7504,7 +7505,8 @@ getTables(Archive *fout, int *numTables) "AS changed_acl, " "%s AS partkeydef, " "%s AS ispartition, " - "%s AS partbound " + "%s AS partbound, " + "c.relisivm AS isivm " "FROM pg_class c " "LEFT JOIN pg_depend d ON " "(c.relkind = '%c' AND " @@ -8073,6 +8075,7 @@ getTables(Archive *fout, int *numTables) i_ispartition = PQfnumber(res, "ispartition"); i_partbound = PQfnumber(res, "partbound"); i_amname = PQfnumber(res, "amname"); + i_isivm = PQfnumber(res, "isivm"); if (dopt->lockWaitTimeout) { @@ -8199,6 +8202,7 @@ getTables(Archive *fout, int *numTables) tblinfo[i].partkeydef = pg_strdup(PQgetvalue(res, i, i_partkeydef)); tblinfo[i].ispartition = (strcmp(PQgetvalue(res, i, i_ispartition), "t") == 0); tblinfo[i].partbound = pg_strdup(PQgetvalue(res, i, i_partbound)); + tblinfo[i].isivm = (strcmp(PQgetvalue(res, i, i_isivm), "t") == 0); /* foreign server */ tblinfo[i].foreign_server = atooid(PQgetvalue(res, i, i_foreignserver)); @@ -18115,9 +18119,11 @@ dumpTableSchema(Archive *fout, const TableInfo *tbinfo) } } - appendPQExpBuffer(q, "CREATE %s%s %s", + appendPQExpBuffer(q, "CREATE %s%s%s %s", tbinfo->relpersistence == RELPERSISTENCE_UNLOGGED ? "UNLOGGED " : "", + tbinfo->relkind == RELKIND_MATVIEW && tbinfo->isivm ? + "INCREMENTAL " : "", reltypename, qualrelname); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 009762223ac..c56c3439073 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -396,6 +396,7 @@ typedef struct _tableInfo struct _tableDataInfo *dataObj; /* TableDataInfo, if dumping its data */ int numTriggers; /* number of triggers for table */ struct _triggerInfo *triggers; /* array of TriggerInfo structs */ + bool isivm; /* is incrementally maintainable materialized view? */ } TableInfo; typedef struct _tableAttachInfo diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 03d076806d6..a06a994ed3e 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -2171,6 +2171,21 @@ { exclude_dump_test_schema => 1, no_toast_compression => 1, }, }, + 'CREATE MATERIALIZED VIEW matview_ivm' => { + create_order => 21, + create_sql => 'CREATE INCREMENTAL MATERIALIZED VIEW dump_test.matview_ivm (col1) AS + SELECT col1 FROM dump_test.test_table;', + regexp => qr/^ + \QCREATE INCREMENTAL MATERIALIZED VIEW dump_test.matview_ivm AS\E + \n\s+\QSELECT test_table.col1\E + \n\s+\QFROM dump_test.test_table\E + \n\s+\QWITH NO DATA;\E + /xm, + like => + { %full_runs, %dump_test_schema_runs, section_pre_data => 1, }, + unlike => { exclude_dump_test_schema => 1, }, + }, + 'CREATE POLICY p1 ON test_table' => { create_order => 22, create_sql => 'CREATE POLICY p1 ON dump_test.test_table From 8f43ca8fdbc089287b3f75441de55f272040c986 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Tue, 18 Jul 2023 15:36:46 +0800 Subject: [PATCH 5/8] Add Incremental View Maintenance support to psql Add tab completion and meta-command output for IVM. --- src/bin/psql/describe.c | 11 ++++++++++- src/bin/psql/tab-complete.c | 14 +++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index f66e4535b8e..c1c23315ee9 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -1922,6 +1922,7 @@ describeOneTableDetails(const char *schemaname, char relpersistence; char relreplident; char *relam; + bool isivm; char *compressionType; char *compressionLevel; @@ -1953,7 +1954,8 @@ describeOneTableDetails(const char *schemaname, "c.relhastriggers, c.relrowsecurity, c.relforcerowsecurity, " "false AS relhasoids, c.relispartition, %s, c.reltablespace, " "CASE WHEN c.reloftype = 0 THEN '' ELSE c.reloftype::pg_catalog.regtype::pg_catalog.text END, " - "c.relpersistence, c.relreplident, am.amname\n" + "c.relpersistence, c.relreplident, am.amname, " + "c.relisivm\n" "FROM pg_catalog.pg_class c\n " "LEFT JOIN pg_catalog.pg_class tc ON (c.reltoastrelid = tc.oid)\n" "LEFT JOIN pg_catalog.pg_am am ON (c.relam = am.oid)\n" @@ -2148,6 +2150,7 @@ describeOneTableDetails(const char *schemaname, (char *) NULL : pg_strdup(PQgetvalue(res, 0, 14)); else tableinfo.relam = NULL; + tableinfo.isivm = strcmp(PQgetvalue(res, 0, 15), "t") == 0; /* GPDB Only: relstorage */ if (pset.sversion < 120000 && isGPDB()) @@ -4094,6 +4097,12 @@ describeOneTableDetails(const char *schemaname, printfPQExpBuffer(&buf, _("Access method: %s"), tableinfo.relam); printTableAddFooter(&cont, buf.data); } + + /* Incremental view maintance info */ + if (verbose && tableinfo.relkind == RELKIND_MATVIEW && tableinfo.isivm) + { + printTableAddFooter(&cont, _("Incremental view maintenance: yes")); + } } /* reloptions, if verbose */ diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 5cae4f6a677..2ba21b7eba9 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1101,6 +1101,7 @@ static const pgsql_thing_t words_after_create[] = { {"FOREIGN TABLE", NULL, NULL, NULL}, {"FUNCTION", NULL, NULL, Query_for_list_of_functions}, {"GROUP", Query_for_list_of_roles}, + {"INCREMENTAL MATERIALIZED VIEW", NULL, NULL, &Query_for_list_of_matviews, THING_NO_DROP | THING_NO_ALTER}, {"INDEX", NULL, NULL, &Query_for_list_of_indexes}, {"LANGUAGE", Query_for_list_of_languages}, {"LARGE OBJECT", NULL, NULL, NULL, THING_NO_CREATE | THING_NO_DROP}, @@ -2756,7 +2757,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("SEQUENCE", "TABLE", "VIEW"); /* Complete "CREATE UNLOGGED" with TABLE or MATVIEW */ else if (TailMatches("CREATE", "UNLOGGED")) - COMPLETE_WITH("TABLE", "MATERIALIZED VIEW"); + COMPLETE_WITH("TABLE", "MATERIALIZED VIEW", "INCREMENTAL MATERIALIZED VIEW"); /* Complete PARTITION BY with RANGE ( or LIST ( or ... */ else if (TailMatches("PARTITION", "BY")) COMPLETE_WITH("RANGE (", "LIST (", "HASH ("); @@ -3077,13 +3078,16 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("SELECT"); /* CREATE MATERIALIZED VIEW */ - else if (Matches("CREATE", "MATERIALIZED")) + else if (Matches("CREATE", "MATERIALIZED") || + Matches("CREATE", "INCREMENTAL", "MATERIALIZED")) COMPLETE_WITH("VIEW"); - /* Complete CREATE MATERIALIZED VIEW with AS */ - else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny)) + /* Complete CREATE MATERIALIZED VIEW with AS */ + else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny) || + Matches("CREATE", "INCREMENTAL", "MATERIALIZED", "VIEW", MatchAny)) COMPLETE_WITH("AS"); /* Complete "CREATE MATERIALIZED VIEW AS with "SELECT" */ - else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny, "AS")) + else if (Matches("CREATE", "MATERIALIZED", "VIEW", MatchAny, "AS") || + Matches("CREATE", "INCREMENTAL", "MATERIALIZED", "VIEW", MatchAny, "AS")) COMPLETE_WITH("SELECT"); /* CREATE EVENT TRIGGER */ From bd5cb3881fa73bc32a0a0de445ec8b96d8b63522 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Wed, 19 Jul 2023 17:18:14 +0800 Subject: [PATCH 6/8] Add Incremental View Maintenance support In this implementation, AFTER triggers are used to collect tuplestores containing transition table contents. When multiple tables are changed, multiple AFTER triggers are invoked, then the final AFTER trigger performs actual update of the matview. In addition, BEFORE triggers are also used to handle global information for view maintenance. To calculate view deltas, we need both pre-state and post-state of base tables. Post-update states are available in AFTER trigger, and pre-update states can be calculated by removing inserted tuples and appending deleted tuples. Insterted tuples are filtered using the snapshot taken before table modiication, and deleted tuples are contained in the old transition table. Incrementally Maintainable Materialized Views (IMMV) can contain duplicated tuples. This patch also allows self-join, simultaneous updates of more than one base table, and multiple updates of the same base table. --- src/backend/access/transam/xact.c | 5 + src/backend/commands/createas.c | 679 ++++++++++++++ src/backend/commands/matview.c | 1436 ++++++++++++++++++++++++++++- src/include/catalog/pg_proc.dat | 10 + src/include/commands/createas.h | 4 + src/include/commands/matview.h | 8 + 6 files changed, 2140 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c2b72a87e67..7b4864f529d 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -39,6 +39,7 @@ #include "catalog/storage_tablespace.h" #include "catalog/storage_database.h" #include "commands/async.h" +#include "commands/matview.h" #include "commands/dbcommands.h" #include "commands/extension.h" #include "commands/resgroupcmds.h" @@ -3565,6 +3566,7 @@ AbortTransaction(void) AtAbort_Notify(); AtEOXact_RelationMap(false, is_parallel_worker); AtAbort_Twophase(); + AtAbort_IVM(); /* * Advertise the fact that we aborted in pg_xact (assuming that we got as @@ -6152,6 +6154,9 @@ AbortSubTransaction(void) AbortBufferIO(); UnlockBuffers(); + /* Clean up hash entries for incremental view maintenance */ + AtAbort_IVM(); + /* Reset WAL record construction state */ XLogResetInsertion(); diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index c824f7da2a5..ef86cc6ca99 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -32,15 +32,26 @@ #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" +#include "catalog/index.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_inherits.h" +#include "catalog/pg_trigger.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/defrem.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" +#include "commands/trigger.h" #include "commands/view.h" #include "miscadmin.h" +#include "optimizer/optimizer.h" +#include "optimizer/prep.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "parser/parser.h" +#include "parser/parsetree.h" #include "parser/parse_clause.h" #include "postmaster/autostats.h" #include "rewrite/rewriteHandler.h" @@ -86,6 +97,12 @@ static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self); static void intorel_shutdown(DestReceiver *self); static void intorel_destroy(DestReceiver *self); +static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, + Relids *relids, bool ex_lock); +static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock); +static void check_ivm_restriction(Node *node); +static bool check_ivm_restriction_walker(Node *node, void *context); +static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList); /* * create_ctas_internal @@ -418,6 +435,18 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, if (query_info_collect_hook) (*query_info_collect_hook)(METRICS_QUERY_SUBMIT, queryDesc); + if (is_matview && into->ivm) + { + /* check if the query is supported in IMMV definition */ + if (contain_mutable_functions((Node *) query)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("mutable function is not supported on incrementally maintainable materialized view"), + errhint("functions must be marked IMMUTABLE"))); + + check_ivm_restriction((Node *) query); + } + if (into->skipData) { /* @@ -483,6 +512,27 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, /* Restore userid and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); + + if (into->ivm) + { + Oid matviewOid = address.objectId; + Relation matviewRel = table_open(matviewOid, NoLock); + + /* + * Mark relisivm field, if it's a matview and into->ivm is true. + */ + SetMatViewIVMState(matviewRel, true); + + if (!into->skipData) + { + /* Create an index on incremental maintainable materialized view, if possible */ + //CreateIndexOnIMMV((Query *) into->viewQuery, matviewRel); + + /* Create triggers on incremental maintainable materialized view */ + CreateIvmTriggersOnBaseTables((Query *) into->viewQuery, matviewOid); + } + table_close(matviewRel, NoLock); + } } return address; @@ -809,3 +859,632 @@ GetIntoRelOid(QueryDesc *queryDesc) else return InvalidOid; } + +/* + * CreateIvmTriggersOnBaseTables -- create IVM triggers on all base tables + */ +void +CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid) +{ + Relids relids = NULL; + bool ex_lock = false; + RangeTblEntry *rte; + + /* Immediately return if we don't have any base tables. */ + if (list_length(qry->rtable) < 1) + return; + + /* + * If the view has more than one base tables, we need an exclusive lock + * on the view so that the view would be maintained serially to avoid + * the inconsistency that occurs when two base tables are modified in + * concurrent transactions. However, if the view has only one table, + * we can use a weaker lock. + * + * The type of lock should be determined here, because if we check the + * view definition at maintenance time, we need to acquire a weaker lock, + * and upgrading the lock level after this increases probability of + * deadlock. + */ + + rte = list_nth(qry->rtable, 0); + if (list_length(qry->rtable) > 1 || rte->rtekind != RTE_RELATION) + ex_lock = true; + + CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)qry, matviewOid, &relids, ex_lock); + + bms_free(relids); +} + +static void +CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid matviewOid, + Relids *relids, bool ex_lock) +{ + if (node == NULL) + return; + + /* This can recurse, so check for excessive recursion */ + check_stack_depth(); + + switch (nodeTag(node)) + { + case T_Query: + { + Query *query = (Query *) node; + + CreateIvmTriggersOnBaseTablesRecurse(qry, (Node *)query->jointree, matviewOid, relids, ex_lock); + } + break; + + case T_RangeTblRef: + { + int rti = ((RangeTblRef *) node)->rtindex; + RangeTblEntry *rte = rt_fetch(rti, qry->rtable); + + if (rte->rtekind == RTE_RELATION && !bms_is_member(rte->relid, *relids)) + { + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_BEFORE, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_BEFORE, true); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_INSERT, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_DELETE, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_UPDATE, TRIGGER_TYPE_AFTER, ex_lock); + CreateIvmTrigger(rte->relid, matviewOid, TRIGGER_TYPE_TRUNCATE, TRIGGER_TYPE_AFTER, true); + + *relids = bms_add_member(*relids, rte->relid); + } + } + break; + + case T_FromExpr: + { + FromExpr *f = (FromExpr *) node; + ListCell *l; + + foreach(l, f->fromlist) + CreateIvmTriggersOnBaseTablesRecurse(qry, lfirst(l), matviewOid, relids, ex_lock); + } + break; + + case T_JoinExpr: + { + JoinExpr *j = (JoinExpr *) node; + + CreateIvmTriggersOnBaseTablesRecurse(qry, j->larg, matviewOid, relids, ex_lock); + CreateIvmTriggersOnBaseTablesRecurse(qry, j->rarg, matviewOid, relids, ex_lock); + } + break; + + default: + elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); + } +} + +/* + * CreateIvmTrigger -- create IVM trigger on a base table + */ +static void +CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock) +{ + ObjectAddress refaddr; + ObjectAddress address; + CreateTrigStmt *ivm_trigger; + List *transitionRels = NIL; + + Assert(timing == TRIGGER_TYPE_BEFORE || timing == TRIGGER_TYPE_AFTER); + + refaddr.classId = RelationRelationId; + refaddr.objectId = viewOid; + refaddr.objectSubId = 0; + + ivm_trigger = makeNode(CreateTrigStmt); + ivm_trigger->relation = NULL; + ivm_trigger->row = false; + + ivm_trigger->timing = timing; + ivm_trigger->events = type; + + switch (type) + { + case TRIGGER_TYPE_INSERT: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_ins_before" : "IVM_trigger_ins_after"); + break; + case TRIGGER_TYPE_DELETE: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_del_before" : "IVM_trigger_del_after"); + break; + case TRIGGER_TYPE_UPDATE: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_upd_before" : "IVM_trigger_upd_after"); + break; + case TRIGGER_TYPE_TRUNCATE: + ivm_trigger->trigname = (timing == TRIGGER_TYPE_BEFORE ? "IVM_trigger_truncate_before" : "IVM_trigger_truncate_after"); + break; + default: + elog(ERROR, "unsupported trigger type"); + } + + if (timing == TRIGGER_TYPE_AFTER) + { + if (type == TRIGGER_TYPE_INSERT || type == TRIGGER_TYPE_UPDATE) + { + TriggerTransition *n = makeNode(TriggerTransition); + n->name = "__ivm_newtable"; + n->isNew = true; + n->isTable = true; + + transitionRels = lappend(transitionRels, n); + } + if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE) + { + TriggerTransition *n = makeNode(TriggerTransition); + n->name = "__ivm_oldtable"; + n->isNew = false; + n->isTable = true; + + transitionRels = lappend(transitionRels, n); + } + } + + /* + * XXX: When using DELETE or UPDATE, we must use exclusive lock for now + * because apply_old_delta(_with_count) uses ctid to identify the tuple + * to be deleted/deleted, but doesn't work in concurrent situations. + * + * If the view doesn't have aggregate, distinct, or tuple duplicate, + * then it would work even in concurrent situations. However, we don't have + * any way to guarantee the view has a unique key before opening the IMMV + * at the maintenance time because users may drop the unique index. + */ + + if (type == TRIGGER_TYPE_DELETE || type == TRIGGER_TYPE_UPDATE) + ex_lock = true; + + ivm_trigger->funcname = + (timing == TRIGGER_TYPE_BEFORE ? SystemFuncName("IVM_immediate_before") : SystemFuncName("IVM_immediate_maintenance")); + + ivm_trigger->columns = NIL; + ivm_trigger->transitionRels = transitionRels; + ivm_trigger->whenClause = NULL; + ivm_trigger->isconstraint = false; + ivm_trigger->deferrable = false; + ivm_trigger->initdeferred = false; + ivm_trigger->constrrel = NULL; + ivm_trigger->args = list_make2( + makeString(DatumGetPointer(DirectFunctionCall1(oidout, ObjectIdGetDatum(viewOid)))), + makeString(DatumGetPointer(DirectFunctionCall1(boolout, BoolGetDatum(ex_lock)))) + ); + + address = CreateTrigger(ivm_trigger, NULL, relOid, InvalidOid, InvalidOid, + InvalidOid, InvalidOid, InvalidOid, NULL, false, false); + + recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO); + + /* Make changes-so-far visible */ + CommandCounterIncrement(); +} + +/* + * check_ivm_restriction --- look for specify nodes in the query tree + */ +static void +check_ivm_restriction(Node *node) +{ + check_ivm_restriction_walker(node, NULL); +} + +static bool +check_ivm_restriction_walker(Node *node, void *context) +{ + if (node == NULL) + return false; + + /* + * We currently don't support Sub-Query. + */ + if (IsA(node, SubPlan) || IsA(node, SubLink)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("subquery is not supported on incrementally maintainable materialized view"))); + + /* This can recurse, so check for excessive recursion */ + check_stack_depth(); + + switch (nodeTag(node)) + { + case T_Query: + { + Query *qry = (Query *)node; + ListCell *lc; + List *vars; + + /* if contained CTE, return error */ + if (qry->cteList != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CTE is not supported on incrementally maintainable materialized view"))); + if (qry->havingQual != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg(" HAVING clause is not supported on incrementally maintainable materialized view"))); + if (qry->sortClause != NIL) /* There is a possibility that we don't need to return an error */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("ORDER BY clause is not supported on incrementally maintainable materialized view"))); + if (qry->limitOffset != NULL || qry->limitCount != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("LIMIT/OFFSET clause is not supported on incrementally maintainable materialized view"))); + if (qry->distinctClause) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DISTINCT is not supported on incrementally maintainable materialized view"))); + if (qry->hasDistinctOn) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("DISTINCT ON is not supported on incrementally maintainable materialized view"))); + if (qry->hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("window functions are not supported on incrementally maintainable materialized view"))); + if (qry->groupingSets != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("GROUPING SETS, ROLLUP, or CUBE clauses is not supported on incrementally maintainable materialized view"))); + if (qry->setOperations != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("UNION/INTERSECT/EXCEPT statements are not supported on incrementally maintainable materialized view"))); + if (list_length(qry->targetList) == 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("empty target list is not supported on incrementally maintainable materialized view"))); + if (qry->rowMarks != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("FOR UPDATE/SHARE clause is not supported on incrementally maintainable materialized view"))); + + /* system column restrictions */ + vars = pull_vars_of_level((Node *) qry, 0); + foreach(lc, vars) + { + if (IsA(lfirst(lc), Var)) + { + Var *var = (Var *) lfirst(lc); + /* if system column, return error */ + if (var->varattno < 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("system column is not supported on incrementally maintainable materialized view"))); + } + } + + /* restrictions for rtable */ + foreach(lc, qry->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); + + if (rte->subquery) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("subquery is not supported on incrementally maintainable materialized view"))); + + if (rte->tablesample != NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("TABLESAMPLE clause is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("partitioned table is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_RELATION && has_superclass(rte->relid)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("partitions is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_RELATION && find_inheritance_children(rte->relid, NoLock) != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("inheritance parent is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_FOREIGN_TABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("foreign table is not supported on incrementally maintainable materialized view"))); + + if (rte->relkind == RELKIND_VIEW || + rte->relkind == RELKIND_MATVIEW) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("VIEW or MATERIALIZED VIEW is not supported on incrementally maintainable materialized view"))); + + if (rte->rtekind == RTE_VALUES) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("VALUES is not supported on incrementally maintainable materialized view"))); + + } + + query_tree_walker(qry, check_ivm_restriction_walker, NULL, QTW_IGNORE_RANGE_TABLE); + + break; + } + case T_TargetEntry: + { + TargetEntry *tle = (TargetEntry *)node; + if (isIvmName(tle->resname)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("column name %s is not supported on incrementally maintainable materialized view", tle->resname))); + + expression_tree_walker(node, check_ivm_restriction_walker, NULL); + break; + } + case T_JoinExpr: + { + JoinExpr *joinexpr = (JoinExpr *)node; + + if (joinexpr->jointype > JOIN_INNER) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view"))); + + expression_tree_walker(node, check_ivm_restriction_walker, NULL); + } + break; + case T_Aggref: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function is not supported on incrementally maintainable materialized view"))); + break; + default: + expression_tree_walker(node, check_ivm_restriction_walker, (void *) context); + break; + } + return false; +} + +/* + * CreateIndexOnIMMV + * + * Create a unique index on incremental maintainable materialized view. + * If the view definition query has a GROUP BY clause, the index is created + * on the columns of GROUP BY expressions. Otherwise, if the view contains + * all primary key attritubes of its base tables in the target list, the index + * is created on these attritubes. In other cases, no index is created. + */ +void +CreateIndexOnIMMV(Query *query, Relation matviewRel) +{ + ListCell *lc; + IndexStmt *index; + ObjectAddress address; + List *constraintList = NIL; + char idxname[NAMEDATALEN]; + List *indexoidlist = RelationGetIndexList(matviewRel); + ListCell *indexoidscan; + Bitmapset *key_attnos; + + snprintf(idxname, sizeof(idxname), "%s_index", RelationGetRelationName(matviewRel)); + + index = makeNode(IndexStmt); + + index->unique = true; + index->primary = false; + index->isconstraint = false; + index->deferrable = false; + index->initdeferred = false; + index->idxname = idxname; + index->relation = + makeRangeVar(get_namespace_name(RelationGetNamespace(matviewRel)), + pstrdup(RelationGetRelationName(matviewRel)), + -1); + index->accessMethod = DEFAULT_INDEX_TYPE; + index->options = NIL; + index->tableSpace = get_tablespace_name(matviewRel->rd_rel->reltablespace); + index->whereClause = NULL; + index->indexParams = NIL; + index->indexIncludingParams = NIL; + index->excludeOpNames = NIL; + index->idxcomment = NULL; + index->indexOid = InvalidOid; + index->oldNode = InvalidOid; + index->oldCreateSubid = InvalidSubTransactionId; + index->oldFirstRelfilenodeSubid = InvalidSubTransactionId; + index->transformed = true; + index->concurrent = false; + index->if_not_exists = false; + + /* create index on the base tables' primary key columns */ + key_attnos = get_primary_key_attnos_from_query(query, &constraintList); + if (key_attnos) + { + foreach(lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1); + + if (bms_is_member(tle->resno - FirstLowInvalidHeapAttributeNumber, key_attnos)) + { + IndexElem *iparam; + + iparam = makeNode(IndexElem); + iparam->name = pstrdup(NameStr(attr->attname)); + iparam->expr = NULL; + iparam->indexcolname = NULL; + iparam->collation = NIL; + iparam->opclass = NIL; + iparam->opclassopts = NIL; + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + index->indexParams = lappend(index->indexParams, iparam); + } + } + } + else + { + /* create no index, just notice that an appropriate index is necessary for efficient IVM */ + ereport(NOTICE, + (errmsg("could not create an index on materialized view \"%s\" automatically", + RelationGetRelationName(matviewRel)), + errdetail("This target list does not have all the primary key columns. "), + errhint("Create an index on the materialized view for efficient incremental maintenance."))); + return; + } + + /* If we have a compatible index, we don't need to create another. */ + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + bool hasCompatibleIndex = false; + + indexRel = index_open(indexoid, AccessShareLock); + + if (CheckIndexCompatible(indexRel->rd_id, + index->accessMethod, + index->indexParams, + index->excludeOpNames)) + hasCompatibleIndex = true; + + index_close(indexRel, AccessShareLock); + + if (hasCompatibleIndex) + return; + } + + address = DefineIndex(RelationGetRelid(matviewRel), + index, + InvalidOid, + InvalidOid, + InvalidOid, + false, true, false, false, true); + + ereport(NOTICE, + (errmsg("created index \"%s\" on materialized view \"%s\"", + idxname, RelationGetRelationName(matviewRel)))); + + /* + * Make dependencies so that the index is dropped if any base tables's + * primary key is dropped. + */ + foreach(lc, constraintList) + { + Oid constraintOid = lfirst_oid(lc); + ObjectAddress refaddr; + + refaddr.classId = ConstraintRelationId; + refaddr.objectId = constraintOid; + refaddr.objectSubId = 0; + + recordDependencyOn(&address, &refaddr, DEPENDENCY_NORMAL); + } +} + + +/* + * get_primary_key_attnos_from_query + * + * Identify the columns in base tables' primary keys in the target list. + * + * Returns a Bitmapset of the column attnos of the primary key's columns of + * tables that used in the query. The attnos are offset by + * FirstLowInvalidHeapAttributeNumber as same as get_primary_key_attnos. + * + * If any table has no primary key or any primary key's columns is not in + * the target list, return NULL. We also return NULL if any pkey constraint + * is deferrable. + * + * constraintList is set to a list of the OIDs of the pkey constraints. + */ +static Bitmapset * +get_primary_key_attnos_from_query(Query *query, List **constraintList) +{ + List *key_attnos_list = NIL; + ListCell *lc; + int i; + Bitmapset *keys = NULL; + Relids rels_in_from; + + /* + * Collect primary key attributes from all tables used in query. The key attributes + * sets for each table are stored in key_attnos_list in order by RTE index. + */ + foreach(lc, query->rtable) + { + RangeTblEntry *r = (RangeTblEntry*) lfirst(lc); + Bitmapset *key_attnos; + bool has_pkey = true; + + /* for tables, call get_primary_key_attnos */ + if (r->rtekind == RTE_RELATION) + { + Oid constraintOid; + key_attnos = get_primary_key_attnos(r->relid, false, &constraintOid); + *constraintList = lappend_oid(*constraintList, constraintOid); + has_pkey = (key_attnos != NULL); + } + /* for other RTEs, store NULL into key_attnos_list */ + else + key_attnos = NULL; + + /* + * If any table or subquery has no primary key or its pkey constraint is deferrable, + * we cannot get key attributes for this query, so return NULL. + */ + if (!has_pkey) + return NULL; + + key_attnos_list = lappend(key_attnos_list, key_attnos); + } + + /* Collect key attributes appearing in the target list */ + i = 1; + foreach(lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) flatten_join_alias_vars(query, lfirst(lc)); + + if (IsA(tle->expr, Var)) + { + Var *var = (Var*) tle->expr; + Bitmapset *key_attnos = list_nth(key_attnos_list, var->varno - 1); + + /* check if this attribute is from a base table's primary key */ + if (bms_is_member(var->varattno - FirstLowInvalidHeapAttributeNumber, key_attnos)) + { + /* + * Remove found key attributes from key_attnos_list, and add this + * to the result list. + */ + key_attnos = bms_del_member(key_attnos, var->varattno - FirstLowInvalidHeapAttributeNumber); + if (bms_is_empty(key_attnos)) + { + key_attnos_list = list_delete_nth_cell(key_attnos_list, var->varno - 1); + key_attnos_list = list_insert_nth(key_attnos_list, var->varno - 1, NULL); + } + keys = bms_add_member(keys, i - FirstLowInvalidHeapAttributeNumber); + } + } + i++; + } + + /* Collect RTE indexes of relations appearing in the FROM clause */ + rels_in_from = get_relids_in_jointree((Node *) query->jointree, false); + + /* + * Check if all key attributes of relations in FROM are appearing in the target + * list. If an attribute remains in key_attnos_list in spite of the table is used + * in FROM clause, the target is missing this key attribute, so we return NULL. + */ + i = 1; + foreach(lc, key_attnos_list) + { + Bitmapset *bms = (Bitmapset *)lfirst(lc); + if (!bms_is_empty(bms) && bms_is_member(i, rels_in_from)) + return NULL; + i++; + } + + return keys; +} diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 61a580873b1..ad3abe078ec 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -26,21 +26,30 @@ #include "catalog/namespace.h" #include "catalog/oid_dispatch.h" #include "catalog/pg_am.h" +#include "catalog/pg_depend.h" +#include "catalog/pg_trigger.h" #include "catalog/pg_opclass.h" #include "catalog/pg_operator.h" #include "cdb/cdbaocsam.h" #include "cdb/cdbappendonlyam.h" #include "cdb/cdbvars.h" #include "commands/cluster.h" +#include "commands/createas.h" #include "commands/matview.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "executor/executor.h" #include "executor/spi.h" +#include "executor/tstoreReceiver.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "parser/analyze.h" +#include "parser/parse_clause.h" +#include "parser/parse_func.h" #include "parser/parse_relation.h" #include "pgstat.h" #include "rewrite/rewriteHandler.h" +#include "rewrite/rowsecurity.h" #include "storage/lmgr.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" @@ -49,6 +58,7 @@ #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "utils/typcache.h" typedef struct @@ -67,6 +77,52 @@ typedef struct uint64 processed; /* GPDB: number of tuples inserted */ } DR_transientrel; +#define MV_INIT_QUERYHASHSIZE 16 + +/* + * MV_TriggerHashEntry + * + * Hash entry for base tables on which IVM trigger is invoked + */ +typedef struct MV_TriggerHashEntry +{ + Oid matview_id; /* OID of the materialized view */ + int before_trig_count; /* count of before triggers invoked */ + int after_trig_count; /* count of after triggers invoked */ + + Snapshot snapshot; /* Snapshot just before table change */ + + List *tables; /* List of MV_TriggerTable */ + bool has_old; /* tuples are deleted from any table? */ + bool has_new; /* tuples are inserted into any table? */ +} MV_TriggerHashEntry; + +/* + * MV_TriggerTable + * + * IVM related data for tables on which the trigger is invoked. + */ +typedef struct MV_TriggerTable +{ + Oid table_id; /* OID of the modified table */ + List *old_tuplestores; /* tuplestores for deleted tuples */ + List *new_tuplestores; /* tuplestores for inserted tuples */ + + List *rte_indexes; /* List of RTE index of the modified table */ + RangeTblEntry *original_rte; /* the original RTE saved before rewriting query */ + + Relation rel; /* relation of the modified table */ + TupleTableSlot *slot; /* for checking visibility in the pre-state table */ +} MV_TriggerTable; + +static HTAB *mv_trigger_info = NULL; + +static bool in_delta_calculation = false; + +/* ENR name for materialized view delta */ +#define NEW_DELTA_ENRNAME "new_delta" +#define OLD_DELTA_ENRNAME "old_delta" + static int matview_maintenance_depth = 0; static RefreshClause* MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoAO); @@ -75,6 +131,8 @@ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); static void transientrel_destroy(DestReceiver *self); static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, + QueryEnvironment *queryEnv, + TupleDesc *resultTupleDesc, const char *queryString, RefreshClause *refreshClause); static char *make_temptable_name_n(char *tempname, int n); static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, @@ -83,6 +141,37 @@ static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersist static bool is_usable_unique_index(Relation indexRel); static void OpenMatViewIncrementalMaintenance(void); static void CloseMatViewIncrementalMaintenance(void); +static Query *get_matview_query(Relation matviewRel); + +static Query *rewrite_query_for_preupdate_state(Query *query, List *tables, + ParseState *pstate, Oid matviewid); +static void register_delta_ENRs(ParseState *pstate, Query *query, List *tables); +static char *make_delta_enr_name(const char *prefix, Oid relid, int count); +static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, + QueryEnvironment *queryEnv, Oid matviewid); +static RangeTblEntry *replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new, + QueryEnvironment *queryEnv); +static Query *rewrite_query_for_counting(Query *query, ParseState *pstate); + +static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query, + DestReceiver *dest_old, DestReceiver *dest_new, + TupleDesc *tupdesc_old, TupleDesc *tupdesc_new, + QueryEnvironment *queryEnv); +static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index); + +static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores, + TupleDesc tupdesc_old, TupleDesc tupdesc_new, + Query *query); +static void apply_old_delta(const char *matviewname, const char *deltaname_old, + List *keys); +static void apply_new_delta(const char *matviewname, const char *deltaname_new, + StringInfo target_list); +static char *get_matching_condition_string(List *keys); +static void generate_equal(StringInfo querybuf, Oid opttype, + const char *leftop, const char *rightop); + +static void mv_InitHashTables(void); +static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort); /* * SetMatViewPopulatedState @@ -138,6 +227,46 @@ MakeRefreshClause(bool concurrent, bool skipData, RangeVar *relation, bool intoA return refreshClause; } +/* + * SetMatViewIVMState + * Mark a materialized view as IVM, or not. + * + * NOTE: caller must be holding an appropriate lock on the relation. + */ +void +SetMatViewIVMState(Relation relation, bool newstate) +{ + Relation pgrel; + HeapTuple tuple; + + Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); + + /* + * Update relation's pg_class entry. Crucial side-effect: other backends + * (and this one too!) are sent SI message to make them rebuild relcache + * entries. + */ + pgrel = table_open(RelationRelationId, RowExclusiveLock); + tuple = SearchSysCacheCopy1(RELOID, + ObjectIdGetDatum(RelationGetRelid(relation))); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", + RelationGetRelid(relation)); + + ((Form_pg_class) GETSTRUCT(tuple))->relisivm = newstate; + + CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); + + heap_freetuple(tuple); + table_close(pgrel, RowExclusiveLock); + + /* + * Advance command counter to make the updated pg_class row locally + * visible. + */ + CommandCounterIncrement(); +} + /* * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command * @@ -180,6 +309,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, int save_nestlevel; ObjectAddress address; RefreshClause *refreshClause; + bool oldPopulated; /* MATERIALIZED_VIEW_FIXME: Refresh MatView is not MPP-fied. */ @@ -205,6 +335,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, SetUserIdAndSecContext(relowner, save_sec_context | SECURITY_RESTRICTED_OPERATION); save_nestlevel = NewGUCNestLevel(); + oldPopulated = RelationIsPopulated(matviewRel); /* Make sure it is a materialized view. */ if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW) @@ -329,6 +460,74 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, relpersistence = matviewRel->rd_rel->relpersistence; } + /* delete IMMV triggers. */ + if (RelationIsIVM(matviewRel) && stmt->skipData) + { + Relation tgRel; + Relation depRel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tup; + ObjectAddresses *immv_triggers; + + immv_triggers = new_object_addresses(); + + tgRel = table_open(TriggerRelationId, RowExclusiveLock); + depRel = table_open(DependRelationId, RowExclusiveLock); + + /* search triggers that depends on IMMV. */ + ScanKeyInit(&key, + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(matviewOid)); + scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, 1, &key); + while ((tup = systable_getnext(scan)) != NULL) + { + ObjectAddress obj; + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(tup); + + if (foundDep->classid == TriggerRelationId) + { + HeapTuple tgtup; + ScanKeyData tgkey[1]; + SysScanDesc tgscan; + Form_pg_trigger tgform; + + /* Find the trigger name. */ + ScanKeyInit(&tgkey[0], + Anum_pg_trigger_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(foundDep->objid)); + + tgscan = systable_beginscan(tgRel, TriggerOidIndexId, true, + NULL, 1, tgkey); + tgtup = systable_getnext(tgscan); + if (!HeapTupleIsValid(tgtup)) + elog(ERROR, "could not find tuple for immv trigger %u", foundDep->objid); + + tgform = (Form_pg_trigger) GETSTRUCT(tgtup); + + /* If trigger is created by IMMV, delete it. */ + if (strncmp(NameStr(tgform->tgname), "IVM_trigger_", 12) == 0) + { + obj.classId = foundDep->classid; + obj.objectId = foundDep->objid; + obj.objectSubId = foundDep->refobjsubid; + add_exact_object_address(&obj, immv_triggers); + } + systable_endscan(tgscan); + } + } + systable_endscan(scan); + + performMultipleDeletions(immv_triggers, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); + + table_close(depRel, RowExclusiveLock); + table_close(tgRel, RowExclusiveLock); + free_object_addresses(immv_triggers); + } + /* * Create the transient table that will receive the regenerated data. Lock * it against access by any other process until commit (by which time it @@ -355,7 +554,7 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, * In GPDB, we call refresh_matview_datafill() even when WITH NO DATA was * specified, because it will dispatch the operation to the segments. */ - processed = refresh_matview_datafill(dest, dataQuery, queryString, refreshClause); + processed = refresh_matview_datafill(dest, dataQuery, NULL, NULL, queryString, refreshClause); /* Make the matview match the newly generated data. */ if (concurrent) @@ -398,6 +597,12 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, // pgstat_count_heap_insert(matviewRel, processed); } + if (!stmt->skipData && RelationIsIVM(matviewRel) && !oldPopulated) + { + //CreateIndexOnIMMV(dataQuery, matviewRel); + CreateIvmTriggersOnBaseTables(dataQuery, matviewOid); + } + table_close(matviewRel, NoLock); /* Roll back any GUC changes */ @@ -432,6 +637,8 @@ ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, */ static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, + QueryEnvironment *queryEnv, + TupleDesc *resultTupleDesc, const char *queryString, RefreshClause *refreshClause) { List *rewritten; @@ -493,7 +700,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, /* Create a QueryDesc, redirecting output to our tuple receiver */ queryDesc = CreateQueryDesc(plan, queryString, GetActiveSnapshot(), InvalidSnapshot, - dest, NULL, NULL, 0); + dest, NULL, queryEnv ? queryEnv: NULL, 0); RestoreOidAssignments(saved_dispatch_oids); @@ -513,6 +720,9 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, */ processed = queryDesc->estate->es_processed; + if (resultTupleDesc) + *resultTupleDesc = CreateTupleDescCopy(queryDesc->tupDesc); + /* and clean up */ ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); @@ -1133,3 +1343,1225 @@ CloseMatViewIncrementalMaintenance(void) matview_maintenance_depth--; Assert(matview_maintenance_depth >= 0); } + +/* + * get_matview_query - get the Query from a matview's _RETURN rule. + */ +static Query * +get_matview_query(Relation matviewRel) +{ + RewriteRule *rule; + List * actions; + + /* + * Check that everything is correct for a refresh. Problems at this point + * are internal errors, so elog is sufficient. + */ + if (matviewRel->rd_rel->relhasrules == false || + matviewRel->rd_rules->numLocks < 1) + elog(ERROR, + "materialized view \"%s\" is missing rewrite information", + RelationGetRelationName(matviewRel)); + + if (matviewRel->rd_rules->numLocks > 1) + elog(ERROR, + "materialized view \"%s\" has too many rules", + RelationGetRelationName(matviewRel)); + + rule = matviewRel->rd_rules->rules[0]; + if (rule->event != CMD_SELECT || !(rule->isInstead)) + elog(ERROR, + "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule", + RelationGetRelationName(matviewRel)); + + actions = rule->actions; + if (list_length(actions) != 1) + elog(ERROR, + "the rule for materialized view \"%s\" is not a single action", + RelationGetRelationName(matviewRel)); + + /* + * The stored query was rewritten at the time of the MV definition, but + * has not been scribbled on by the planner. + */ + return linitial_node(Query, actions); +} + + +/* ---------------------------------------------------- + * Incremental View Maintenance routines + * --------------------------------------------------- + */ + +/* + * IVM_immediate_before + * + * IVM trigger function invoked before base table is modified. If this is + * invoked firstly in the same statement, we save the transaction id and the + * command id at that time. + */ +Datum +IVM_immediate_before(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; + char *ex_lock_text = trigdata->tg_trigger->tgargs[1]; + Oid matviewOid; + MV_TriggerHashEntry *entry; + bool found; + bool ex_lock; + + matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text))); + ex_lock = DatumGetBool(DirectFunctionCall1(boolin, CStringGetDatum(ex_lock_text))); + + /* If the view has more than one tables, we have to use an exclusive lock. */ + if (ex_lock) + { + /* + * Wait for concurrent transactions which update this materialized view at + * READ COMMITED. This is needed to see changes committed in other + * transactions. No wait and raise an error at REPEATABLE READ or + * SERIALIZABLE to prevent update anomalies of matviews. + * XXX: dead-lock is possible here. + */ + if (!IsolationUsesXactSnapshot()) + LockRelationOid(matviewOid, ExclusiveLock); + else if (!ConditionalLockRelationOid(matviewOid, ExclusiveLock)) + { + /* try to throw error by name; relation could be deleted... */ + char *relname = get_rel_name(matviewOid); + + if (!relname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on materialized view during incremental maintenance"))); + + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on materialized view \"%s\" during incremental maintenance", + relname))); + } + } + else + LockRelationOid(matviewOid, RowExclusiveLock); + + /* + * On the first call initialize the hashtable + */ + if (!mv_trigger_info) + mv_InitHashTables(); + + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_ENTER, &found); + + /* On the first BEFORE to update the view, initialize trigger data */ + if (!found) + { + /* + * Get a snapshot just before the table was modified for checking + * tuple visibility in the pre-update state of the table. + */ + Snapshot snapshot = GetActiveSnapshot(); + + entry->matview_id = matviewOid; + entry->before_trig_count = 0; + entry->after_trig_count = 0; + entry->snapshot = RegisterSnapshot(snapshot); + entry->tables = NIL; + entry->has_old = false; + entry->has_new = false; + } + + entry->before_trig_count++; + + return PointerGetDatum(NULL); +} + +/* + * IVM_immediate_maintenance + * + * IVM trigger function invoked after base table is modified. + * For each table, tuplestores of transition tables are collected. + * and after the last modification + */ +Datum +IVM_immediate_maintenance(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Relation rel; + Oid relid; + Oid matviewOid; + Query *query; + Query *rewritten = NULL; + char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; + Relation matviewRel; + int old_depth = matview_maintenance_depth; + + Oid relowner; + Tuplestorestate *old_tuplestore = NULL; + Tuplestorestate *new_tuplestore = NULL; + DestReceiver *dest_new = NULL, *dest_old = NULL; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + + MV_TriggerHashEntry *entry; + MV_TriggerTable *table; + bool found; + + ParseState *pstate; + QueryEnvironment *queryEnv = create_queryEnv(); + MemoryContext oldcxt; + ListCell *lc; + int i; + + + /* Create a ParseState for rewriting the view definition query */ + pstate = make_parsestate(NULL); + pstate->p_queryEnv = queryEnv; + pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; + + rel = trigdata->tg_relation; + relid = rel->rd_id; + + matviewOid = DatumGetObjectId(DirectFunctionCall1(oidin, CStringGetDatum(matviewOid_text))); + + /* + * On the first call initialize the hashtable + */ + if (!mv_trigger_info) + mv_InitHashTables(); + + /* get the entry for this materialized view */ + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_FIND, &found); + Assert (found && entry != NULL); + entry->after_trig_count++; + + /* search the entry for the modified table and create new entry if not found */ + found = false; + foreach(lc, entry->tables) + { + table = (MV_TriggerTable *) lfirst(lc); + if (table->table_id == relid) + { + found = true; + break; + } + } + if (!found) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + table = (MV_TriggerTable *) palloc0(sizeof(MV_TriggerTable)); + table->table_id = relid; + table->old_tuplestores = NIL; + table->new_tuplestores = NIL; + table->rte_indexes = NIL; + table->slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), table_slot_callbacks(rel)); + table->rel = table_open(RelationGetRelid(rel), NoLock); + entry->tables = lappend(entry->tables, table); + + MemoryContextSwitchTo(oldcxt); + } + + /* Save the transition tables and make a request to not free immediately */ + if (trigdata->tg_oldtable) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + table->old_tuplestores = lappend(table->old_tuplestores, trigdata->tg_oldtable); + entry->has_old = true; + MemoryContextSwitchTo(oldcxt); + } + if (trigdata->tg_newtable) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + table->new_tuplestores = lappend(table->new_tuplestores, trigdata->tg_newtable); + entry->has_new = true; + MemoryContextSwitchTo(oldcxt); + } + if (entry->has_new || entry->has_old) + { + CmdType cmd; + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + cmd = CMD_INSERT; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + cmd = CMD_DELETE; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + cmd = CMD_UPDATE; + else + elog(ERROR,"unsupported trigger type"); + + /* Prolong lifespan of transition tables to the end of the last AFTER trigger */ + SetTransitionTablePreserved(relid, cmd); + } + + + /* If this is not the last AFTER trigger call, immediately exit. */ + Assert (entry->before_trig_count >= entry->after_trig_count); + if (entry->before_trig_count != entry->after_trig_count) + return PointerGetDatum(NULL); + + /* + * If this is the last AFTER trigger call, continue and update the view. + */ + + /* + * Advance command counter to make the updated base table row locally + * visible. + */ + CommandCounterIncrement(); + + matviewRel = table_open(matviewOid, NoLock); + + /* Make sure it is a materialized view. */ + Assert(matviewRel->rd_rel->relkind == RELKIND_MATVIEW); + + /* + * Get and push the latast snapshot to see any changes which is committed + * during waiting in other transactions at READ COMMITTED level. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * Check for active uses of the relation in the current transaction, such + * as open scans. + * + * NB: We count on this to protect us against problems with refreshing the + * data using TABLE_INSERT_FROZEN. + */ + CheckTableNotInUse(matviewRel, "refresh a materialized view incrementally"); + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also arrange to make GUC variable changes local to this command. + * We will switch modes when we are about to execute user code. + */ + relowner = matviewRel->rd_rel->relowner; + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + /* get view query*/ + query = get_matview_query(matviewRel); + + /* + * When a base table is truncated, the view content will be empty if the + * view definition query does not contain an aggregate without a GROUP clause. + * Therefore, such views can be truncated. + */ + if (TRIGGER_FIRED_BY_TRUNCATE(trigdata->tg_event)) + { + ExecuteTruncateGuts(list_make1(matviewRel), list_make1_oid(matviewOid), + NIL, DROP_RESTRICT, false, false); + + /* Clean up hash entry and delete tuplestores */ + clean_up_IVM_hash_entry(entry, false); + + /* Pop the original snapshot. */ + PopActiveSnapshot(); + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + return PointerGetDatum(NULL); + } + + /* + * rewrite query for calculating deltas + */ + + rewritten = copyObject(query); + + /* Replace resnames in a target list with materialized view's attnames */ + i = 0; + foreach (lc, rewritten->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + char *resname = NameStr(attr->attname); + + tle->resname = pstrdup(resname); + i++; + } + + /* Set all tables in the query to pre-update state */ + rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables, + pstate, matviewOid); + /* Rewrite for counting duplicated tuples */ + rewritten = rewrite_query_for_counting(rewritten, pstate); + + /* Create tuplestores to store view deltas */ + if (entry->has_old) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + old_tuplestore = tuplestore_begin_heap(false, false, work_mem); + dest_old = CreateDestReceiver(DestTuplestore); + SetTuplestoreDestReceiverParams(dest_old, + old_tuplestore, + TopTransactionContext, + false, + NULL, + NULL); + + MemoryContextSwitchTo(oldcxt); + } + if (entry->has_new) + { + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + new_tuplestore = tuplestore_begin_heap(false, false, work_mem); + dest_new = CreateDestReceiver(DestTuplestore); + SetTuplestoreDestReceiverParams(dest_new, + new_tuplestore, + TopTransactionContext, + false, + NULL, + NULL); + MemoryContextSwitchTo(oldcxt); + } + + /* for all modified tables */ + foreach(lc, entry->tables) + { + ListCell *lc2; + + table = (MV_TriggerTable *) lfirst(lc); + + /* loop for self-join */ + foreach(lc2, table->rte_indexes) + { + int rte_index = lfirst_int(lc2); + TupleDesc tupdesc_old; + TupleDesc tupdesc_new; + + /* calculate delta tables */ + calc_delta(table, rte_index, rewritten, dest_old, dest_new, + &tupdesc_old, &tupdesc_new, queryEnv); + + /* Set the table in the query to post-update state */ + rewritten = rewrite_query_for_postupdate_state(rewritten, table, rte_index); + + PG_TRY(); + { + /* apply the delta tables to the materialized view */ + apply_delta(matviewOid, old_tuplestore, new_tuplestore, + tupdesc_old, tupdesc_new, query); + } + PG_CATCH(); + { + matview_maintenance_depth = old_depth; + PG_RE_THROW(); + } + PG_END_TRY(); + + /* clear view delta tuplestores */ + if (old_tuplestore) + tuplestore_clear(old_tuplestore); + if (new_tuplestore) + tuplestore_clear(new_tuplestore); + } + } + + /* Clean up hash entry and delete tuplestores */ + clean_up_IVM_hash_entry(entry, false); + if (old_tuplestore) + { + dest_old->rDestroy(dest_old); + tuplestore_end(old_tuplestore); + } + if (new_tuplestore) + { + dest_new->rDestroy(dest_new); + tuplestore_end(new_tuplestore); + } + + /* Pop the original snapshot. */ + PopActiveSnapshot(); + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + return PointerGetDatum(NULL); +} + +/* + * rewrite_query_for_preupdate_state + * + * Rewrite the query so that base tables' RTEs will represent "pre-update" + * state of tables. This is necessary to calculate view delta after multiple + * tables are modified. + */ +static Query* +rewrite_query_for_preupdate_state(Query *query, List *tables, + ParseState *pstate, Oid matviewid) +{ + ListCell *lc; + int num_rte = list_length(query->rtable); + int i; + + + /* register delta ENRs */ + register_delta_ENRs(pstate, query, tables); + + /* XXX: Is necessary? Is this right timing? */ + AcquireRewriteLocks(query, true, false); + + i = 1; + foreach(lc, query->rtable) + { + RangeTblEntry *r = (RangeTblEntry*) lfirst(lc); + + ListCell *lc2; + foreach(lc2, tables) + { + MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc2); + /* + * if the modified table is found then replace the original RTE with + * "pre-state" RTE and append its index to the list. + */ + if (r->relid == table->table_id) + { + List *securityQuals; + List *withCheckOptions; + bool hasRowSecurity; + bool hasSubLinks; + + RangeTblEntry *rte_pre = get_prestate_rte(r, table, pstate->p_queryEnv, matviewid); + + /* + * Set a row security poslicies of the modified table to the subquery RTE which + * represents the pre-update state of the table. + */ + get_row_security_policies(query, table->original_rte, i, + &securityQuals, &withCheckOptions, + &hasRowSecurity, &hasSubLinks); + + if (hasRowSecurity) + { + query->hasRowSecurity = true; + rte_pre->security_barrier = true; + } + if (hasSubLinks) + query->hasSubLinks = true; + + rte_pre->securityQuals = securityQuals; + lfirst(lc) = rte_pre; + + table->rte_indexes = lappend_int(table->rte_indexes, i); + break; + } + } + + /* finish the loop if we processed all RTE included in the original query */ + if (i++ >= num_rte) + break; + } + + return query; +} + +/* + * register_delta_ENRs + * + * For all modified tables, make ENRs for their transition tables + * and register them to the queryEnv. ENR's RTEs are also appended + * into the list in query tree. + */ +static void +register_delta_ENRs(ParseState *pstate, Query *query, List *tables) +{ + QueryEnvironment *queryEnv = pstate->p_queryEnv; + ListCell *lc; + RangeTblEntry *rte; + + foreach(lc, tables) + { + MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc); + ListCell *lc2; + int count; + + count = 0; + foreach(lc2, table->old_tuplestores) + { + Tuplestorestate *oldtable = (Tuplestorestate *) lfirst(lc2); + EphemeralNamedRelation enr = + palloc(sizeof(EphemeralNamedRelationData)); + ParseNamespaceItem *nsitem; + + enr->md.name = make_delta_enr_name("old", table->table_id, count); + enr->md.reliddesc = table->table_id; + enr->md.tupdesc = NULL; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(oldtable); + enr->reldata = oldtable; + register_ENR(queryEnv, enr); + + nsitem = addRangeTableEntryForENR(pstate, makeRangeVar(NULL, enr->md.name, -1), true); + rte = nsitem->p_rte; + + query->rtable = lappend(query->rtable, rte); + + count++; + } + + count = 0; + foreach(lc2, table->new_tuplestores) + { + Tuplestorestate *newtable = (Tuplestorestate *) lfirst(lc2); + EphemeralNamedRelation enr = + palloc(sizeof(EphemeralNamedRelationData)); + ParseNamespaceItem *nsitem; + + enr->md.name = make_delta_enr_name("new", table->table_id, count); + enr->md.reliddesc = table->table_id; + enr->md.tupdesc = NULL; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(newtable); + enr->reldata = newtable; + register_ENR(queryEnv, enr); + + nsitem = addRangeTableEntryForENR(pstate, makeRangeVar(NULL, enr->md.name, -1), true); + rte = nsitem->p_rte; + + query->rtable = lappend(query->rtable, rte); + + count++; + } + } +} + +#define DatumGetItemPointer(X) ((ItemPointer) DatumGetPointer(X)) +#define PG_GETARG_ITEMPOINTER(n) DatumGetItemPointer(PG_GETARG_DATUM(n)) + +/* + * ivm_visible_in_prestate + * + * Check visibility of a tuple specified by the tableoid and item pointer + * using the snapshot taken just before the table was modified. + */ +Datum +ivm_visible_in_prestate(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + ItemPointer itemPtr = PG_GETARG_ITEMPOINTER(1); + Oid matviewOid = PG_GETARG_OID(2); + MV_TriggerHashEntry *entry; + MV_TriggerTable *table = NULL; + ListCell *lc; + bool found; + bool result; + + if (!in_delta_calculation) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("ivm_visible_in_prestate can be called only in delta calculation"))); + + entry = (MV_TriggerHashEntry *) hash_search(mv_trigger_info, + (void *) &matviewOid, + HASH_FIND, &found); + Assert (found && entry != NULL); + + foreach(lc, entry->tables) + { + table = (MV_TriggerTable *) lfirst(lc); + if (table->table_id == tableoid) + break; + } + + Assert (table != NULL); + + result = table_tuple_fetch_row_version(table->rel, itemPtr, entry->snapshot, table->slot); + + PG_RETURN_BOOL(result); +} + +/* + * get_prestate_rte + * + * Rewrite RTE of the modified table to a subquery which represents + * "pre-state" table. The original RTE is saved in table->rte_original. + */ +static RangeTblEntry* +get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *table, + QueryEnvironment *queryEnv, Oid matviewid) +{ + StringInfoData str; + RawStmt *raw; + Query *subquery; + Relation rel; + ParseState *pstate; + char *relname; + int i; + + pstate = make_parsestate(NULL); + pstate->p_queryEnv = queryEnv; + pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; + + /* + * We can use NoLock here since AcquireRewriteLocks should + * have locked the relation already. + */ + rel = table_open(table->table_id, NoLock); + relname = quote_qualified_identifier( + get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel)); + table_close(rel, NoLock); + + /* + * Filtering inserted row using the snapshot taken before the table + * is modified. ctid is required for maintaining outer join views. + */ + initStringInfo(&str); + appendStringInfo(&str, + "SELECT t.* FROM %s t" + " WHERE pg_catalog.ivm_visible_in_prestate(t.tableoid, t.ctid ,%d::pg_catalog.oid)", + relname, matviewid); + + /* + * Append deleted rows contained in old transition tables. + */ + for (i = 0; i < list_length(table->old_tuplestores); i++) + { + appendStringInfo(&str, " UNION ALL "); + appendStringInfo(&str," SELECT * FROM %s", + make_delta_enr_name("old", table->table_id, i)); + } + + /* Get a subquery representing pre-state of the table */ + raw = (RawStmt*)linitial(raw_parser(str.data, RAW_PARSE_DEFAULT)); + subquery = transformStmt(pstate, raw->stmt); + + /* save the original RTE */ + table->original_rte = copyObject(rte); + + rte->rtekind = RTE_SUBQUERY; + rte->subquery = subquery; + rte->security_barrier = false; + + /* Clear fields that should not be set in a subquery RTE */ + rte->relid = InvalidOid; + rte->relkind = 0; + rte->rellockmode = 0; + rte->tablesample = NULL; + rte->inh = false; /* must not be set for a subquery */ + + return rte; +} + +/* + * make_delta_enr_name + * + * Make a name for ENR of a transition table from the base table's oid. + * prefix will be "new" or "old" depending on its transition table kind.. + */ +static char* +make_delta_enr_name(const char *prefix, Oid relid, int count) +{ + char buf[NAMEDATALEN]; + char *name; + + snprintf(buf, NAMEDATALEN, "__ivm_%s_%u_%u", prefix, relid, count); + name = pstrdup(buf); + + return name; +} + +/* + * replace_rte_with_delta + * + * Replace RTE of the modified table with a single table delta that combine its + * all transition tables. + */ +static RangeTblEntry* +replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new, + QueryEnvironment *queryEnv) +{ + Oid relid = table->table_id; + StringInfoData str; + ParseState *pstate; + RawStmt *raw; + Query *sub; + int num_tuplestores = list_length(is_new ? table->new_tuplestores : table->old_tuplestores); + int i; + + /* the previous RTE must be a subquery which represents "pre-state" table */ + Assert(rte->rtekind == RTE_SUBQUERY); + + /* Create a ParseState for rewriting the view definition query */ + pstate = make_parsestate(NULL); + pstate->p_queryEnv = queryEnv; + pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET; + + initStringInfo(&str); + + for (i = 0; i < num_tuplestores; i++) + { + if (i > 0) + appendStringInfo(&str, " UNION ALL "); + + appendStringInfo(&str, + " SELECT * FROM %s", + make_delta_enr_name(is_new ? "new" : "old", relid, i)); + } + + raw = (RawStmt*)linitial(raw_parser(str.data, RAW_PARSE_DEFAULT)); + sub = transformStmt(pstate, raw->stmt); + + /* + * Update the subquery so that it represent the combined transition + * table. Note that we leave the security_barrier and securityQuals + * fields so that the subquery relation can be protected by the RLS + * policy as same as the modified table. + */ + rte->rtekind = RTE_SUBQUERY; + rte->subquery = sub; + + return rte; +} + +/* + * rewrite_query_for_counting + * + * Rewrite query for counting duplicated tuples. + */ +static Query * +rewrite_query_for_counting(Query *query, ParseState *pstate) +{ + TargetEntry *tle_count; + FuncCall *fn; + Node *node; + + /* Add count(*) for counting distinct tuples in views */ + fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1); + fn->agg_star = true; + if (!query->groupClause && !query->hasAggs) + query->groupClause = transformDistinctClause(NULL, &query->targetList, query->sortClause, false); + + node = ParseFuncOrColumn(pstate, fn->funcname, NIL, NULL, fn, false, -1); + + tle_count = makeTargetEntry((Expr *) node, + list_length(query->targetList) + 1, + pstrdup("__ivm_count__"), + false); + query->targetList = lappend(query->targetList, tle_count); + query->hasAggs = true; + + return query; +} + +/* + * calc_delta + * + * Calculate view deltas generated under the modification of a table specified + * by the RTE index. + */ +static void +calc_delta(MV_TriggerTable *table, int rte_index, Query *query, + DestReceiver *dest_old, DestReceiver *dest_new, + TupleDesc *tupdesc_old, TupleDesc *tupdesc_new, + QueryEnvironment *queryEnv) +{ + ListCell *lc = list_nth_cell(query->rtable, rte_index - 1); + RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc); + RefreshClause *refreshClause; + in_delta_calculation = true; + + RangeVar *relation = makeRangeVar(get_namespace_name(RelationGetNamespace(table->rel)), + pstrdup(RelationGetRelationName(table->rel)), + -1); + + refreshClause = MakeRefreshClause(false, false, relation, + RelationIsAppendOptimized(table->rel)); + + /* Generate old delta */ + if (list_length(table->old_tuplestores) > 0) + { + /* Replace the modified table with the old delta table and calculate the old view delta. */ + replace_rte_with_delta(rte, table, false, queryEnv); + refresh_matview_datafill(dest_old, query, queryEnv, tupdesc_old, "", refreshClause); + } + + /* Generate new delta */ + if (list_length(table->new_tuplestores) > 0) + { + /* Replace the modified table with the new delta table and calculate the new view delta*/ + replace_rte_with_delta(rte, table, true, queryEnv); + refresh_matview_datafill(dest_new, query, queryEnv, tupdesc_new, "", refreshClause); + } + + in_delta_calculation = false; +} + +/* + * rewrite_query_for_postupdate_state + * + * Rewrite the query so that the specified base table's RTEs will represent + * "post-update" state of tables. This is called after the view delta + * calculation due to changes on this table finishes. + */ +static Query* +rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte_index) +{ + ListCell *lc = list_nth_cell(query->rtable, rte_index - 1); + + /* Retore the original RTE */ + lfirst(lc) = table->original_rte; + + return query; +} + +/* + * apply_delta + * + * Apply deltas to the materialized view. In outer join cases, this requires + * the view maintenance graph. + */ +static void +apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores, + TupleDesc tupdesc_old, TupleDesc tupdesc_new, + Query *query) +{ + StringInfoData querybuf; + StringInfoData target_list_buf; + Relation matviewRel; + char *matviewname; + ListCell *lc; + int i; + List *keys = NIL; + + + /* + * get names of the materialized view and delta tables + */ + + matviewRel = table_open(matviewOid, NoLock); + matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel)); + + /* + * Build parts of the maintenance queries + */ + + initStringInfo(&querybuf); + initStringInfo(&target_list_buf); + + /* build string of target list */ + for (i = 0; i < matviewRel->rd_att->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + char *resname = NameStr(attr->attname); + + if (i != 0) + appendStringInfo(&target_list_buf, ", "); + appendStringInfo(&target_list_buf, "%s", quote_qualified_identifier(NULL, resname)); + } + + i = 0; + foreach (lc, query->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i); + + i++; + + if (tle->resjunk) + continue; + + keys = lappend(keys, attr); + } + + /* Start maintaining the materialized view. */ + OpenMatViewIncrementalMaintenance(); + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* For tuple deletion */ + if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0) + { + EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData)); + int rc; + + /* convert tuplestores to ENR, and register for SPI */ + enr->md.name = pstrdup(OLD_DELTA_ENRNAME); + enr->md.reliddesc = InvalidOid; + enr->md.tupdesc = tupdesc_old; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(old_tuplestores); + enr->reldata = old_tuplestores; + + rc = SPI_register_relation(enr); + if (rc != SPI_OK_REL_REGISTER) + elog(ERROR, "SPI_register failed"); + + apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys); + + } + /* For tuple insertion */ + if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0) + { + EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData)); + int rc; + + /* convert tuplestores to ENR, and register for SPI */ + enr->md.name = pstrdup(NEW_DELTA_ENRNAME); + enr->md.reliddesc = InvalidOid; + enr->md.tupdesc = tupdesc_new;; + enr->md.enrtype = ENR_NAMED_TUPLESTORE; + enr->md.enrtuples = tuplestore_tuple_count(new_tuplestores); + enr->reldata = new_tuplestores; + + rc = SPI_register_relation(enr); + if (rc != SPI_OK_REL_REGISTER) + elog(ERROR, "SPI_register failed"); + + /* apply new delta */ + apply_new_delta(matviewname, NEW_DELTA_ENRNAME, &target_list_buf); + } + + /* We're done maintaining the materialized view. */ + CloseMatViewIncrementalMaintenance(); + + table_close(matviewRel, NoLock); + + /* Close SPI context. */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + +/* + * apply_old_delta + * + * Execute a query for applying a delta table given by deltname_old + * which contains tuples to be deleted from to a materialized view given by + * matviewname. This is used when counting is not required. + */ +static void +apply_old_delta(const char *matviewname, const char *deltaname_old, + List *keys) +{ + StringInfoData querybuf; + StringInfoData keysbuf; + char *match_cond; + ListCell *lc; + + /* build WHERE condition for searching tuples to be deleted */ + match_cond = get_matching_condition_string(keys); + + /* build string of keys list */ + initStringInfo(&keysbuf); + foreach (lc, keys) + { + Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc); + char *resname = NameStr(attr->attname); + appendStringInfo(&keysbuf, "%s", quote_qualified_identifier("mv", resname)); + if (lnext(keys, lc)) + appendStringInfo(&keysbuf, ", "); + } + + /* Search for matching tuples from the view and update or delete if found. */ + initStringInfo(&querybuf); + appendStringInfo(&querybuf, + "DELETE FROM %s WHERE ctid IN (" + "SELECT tid FROM (SELECT pg_catalog.row_number() over (partition by %s) AS \"__ivm_row_number__\"," + "mv.ctid AS tid," + "diff.\"__ivm_count__\"" + "FROM %s AS mv, %s AS diff " + "WHERE %s) v " + "WHERE v.\"__ivm_row_number__\" OPERATOR(pg_catalog.<=) v.\"__ivm_count__\")", + matviewname, + keysbuf.data, + matviewname, deltaname_old, + match_cond); + + if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); +} + +/* + * apply_new_delta + * + * Execute a query for applying a delta table given by deltname_new + * which contains tuples to be inserted into a materialized view given by + * matviewname. This is used when counting is not required. + */ +static void +apply_new_delta(const char *matviewname, const char *deltaname_new, + StringInfo target_list) +{ + StringInfoData querybuf; + + /* Search for matching tuples from the view and update or delete if found. */ + initStringInfo(&querybuf); + appendStringInfo(&querybuf, + "INSERT INTO %s (%s) SELECT %s FROM (" + "SELECT diff.*, pg_catalog.generate_series(1, diff.\"__ivm_count__\")" + " AS __ivm_generate_series__ " + "FROM %s AS diff) AS v", + matviewname, target_list->data, target_list->data, + deltaname_new); + + if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); +} + +/* + * get_matching_condition_string + * + * Build a predicate string for looking for a tuple with given keys. + */ +static char * +get_matching_condition_string(List *keys) +{ + StringInfoData match_cond; + ListCell *lc; + + /* If there is no key columns, the condition is always true. */ + if (keys == NIL) + return "true"; + + initStringInfo(&match_cond); + foreach (lc, keys) + { + Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc); + char *resname = NameStr(attr->attname); + char *mv_resname = quote_qualified_identifier("mv", resname); + char *diff_resname = quote_qualified_identifier("diff", resname); + Oid typid = attr->atttypid; + + /* Considering NULL values, we can not use simple = operator. */ + appendStringInfo(&match_cond, "("); + generate_equal(&match_cond, typid, mv_resname, diff_resname); + appendStringInfo(&match_cond, " OR (%s IS NULL AND %s IS NULL))", + mv_resname, diff_resname); + + if (lnext(keys, lc)) + appendStringInfo(&match_cond, " AND "); + } + + return match_cond.data; +} + +/* + * generate_equals + * + * Generate an equality clause using given operands' default equality + * operator. + */ +static void +generate_equal(StringInfo querybuf, Oid opttype, + const char *leftop, const char *rightop) +{ + TypeCacheEntry *typentry; + + typentry = lookup_type_cache(opttype, TYPECACHE_EQ_OPR); + if (!OidIsValid(typentry->eq_opr)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be_qualified(opttype)))); + + generate_operator_clause(querybuf, + leftop, opttype, + typentry->eq_opr, + rightop, opttype); +} + +/* + * mv_InitHashTables + */ +static void +mv_InitHashTables(void) +{ + HASHCTL ctl; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(MV_TriggerHashEntry); + mv_trigger_info = hash_create("MV trigger info", + MV_INIT_QUERYHASHSIZE, + &ctl, HASH_ELEM | HASH_BLOBS); +} + +/* + * AtAbort_IVM + * + * Clean up hash entries for all materialized views. This is called at + * transaction abort. + */ +void +AtAbort_IVM() +{ + HASH_SEQ_STATUS seq; + MV_TriggerHashEntry *entry; + + if (mv_trigger_info) + { + hash_seq_init(&seq, mv_trigger_info); + while ((entry = hash_seq_search(&seq)) != NULL) + clean_up_IVM_hash_entry(entry, true); + } + in_delta_calculation = false; +} + +/* + * clean_up_IVM_hash_entry + * + * Clean up tuple stores and hash entries for a materialized view after its + * maintenance finished. + */ +static void +clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort) +{ + bool found; + ListCell *lc; + + foreach(lc, entry->tables) + { + MV_TriggerTable *table = (MV_TriggerTable *) lfirst(lc); + + list_free(table->old_tuplestores); + list_free(table->new_tuplestores); + if (!is_abort) + { + ExecDropSingleTupleTableSlot(table->slot); + table_close(table->rel, NoLock); + } + } + list_free(entry->tables); + + if (!is_abort) + UnregisterSnapshot(entry->snapshot); + + hash_search(mv_trigger_info, (void *) &entry->matview_id, HASH_REMOVE, &found); +} + +/* + * isIvmName + * + * Check if this is a IVM hidden column from the name. + */ +bool +isIvmName(const char *s) +{ + if (s) + return (strncmp(s, "__ivm_", 6) == 0); + return false; +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d0833282b22..b184f32fa94 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12475,5 +12475,15 @@ { oid => 7077, descr => 'Update gp_segment_configuration mode and status by dbid', proname => 'gp_update_segment_configuration_mode_status', proisstrict => 'f', provolatile => 'v', proparallel => 'r', prorettype => 'int2', proargtypes => 'int4 char char', prosrc => 'gp_update_segment_configuration_mode_status'}, +# IVM +{ oid => '786', descr => 'ivm trigger (before)', + proname => 'IVM_immediate_before', provolatile => 'v', prorettype => 'trigger', + proargtypes => '', prosrc => 'IVM_immediate_before' }, +{ oid => '787', descr => 'ivm trigger (after)', + proname => 'IVM_immediate_maintenance', provolatile => 'v', prorettype => 'trigger', + proargtypes => '', prosrc => 'IVM_immediate_maintenance' }, +{ oid => '788', descr => 'ivm filetring ', + proname => 'ivm_visible_in_prestate', provolatile => 's', prorettype => 'bool', + proargtypes => 'oid tid oid', prosrc => 'ivm_visible_in_prestate' }, ] diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index e346df3c636..ad4b2abf6ca 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -16,6 +16,7 @@ #include "catalog/objectaddress.h" #include "nodes/params.h" +#include "nodes/pathnodes.h" #include "parser/parse_node.h" #include "tcop/dest.h" #include "utils/queryenvironment.h" @@ -25,6 +26,9 @@ extern ObjectAddress ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *st ParamListInfo params, QueryEnvironment *queryEnv, QueryCompletion *qc); +extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid); +extern void CreateIndexOnIMMV(Query *query, Relation matviewRel); + extern int GetIntoRelEFlags(IntoClause *intoClause); extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause); diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h index 767232a6259..087a8b88746 100644 --- a/src/include/commands/matview.h +++ b/src/include/commands/matview.h @@ -24,6 +24,8 @@ extern void SetMatViewPopulatedState(Relation relation, bool newstate); +extern void SetMatViewIVMState(Relation relation, bool newstate); + extern ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, QueryCompletion *qc); @@ -34,4 +36,10 @@ extern bool MatViewIncrementalMaintenanceIsEnabled(void); extern void transientrel_init(QueryDesc *queryDesc); +extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); +extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); +extern Datum IVM_visible_in_prestate(PG_FUNCTION_ARGS); +extern void AtAbort_IVM(void); +extern bool isIvmName(const char *s); + #endif /* MATVIEW_H */ From 6b239f911335c3606b142ba4a7d9fe8fb8657205 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Thu, 20 Jul 2023 10:55:43 +0800 Subject: [PATCH 7/8] Fix: dispatch ivm trigger --- src/backend/commands/createas.c | 12 +++++++++++- src/backend/commands/matview.c | 13 ++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index ef86cc6ca99..2ae22fe013c 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -979,7 +979,8 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock refaddr.objectSubId = 0; ivm_trigger = makeNode(CreateTrigStmt); - ivm_trigger->relation = NULL; + ivm_trigger->relation = makeRangeVar(get_namespace_name(get_rel_namespace(relOid)), get_rel_name(relOid), -1); + // FIXME(yang): use statment-level trigger ivm_trigger->row = false; ivm_trigger->timing = timing; @@ -1059,6 +1060,15 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock recordDependencyOn(&address, &refaddr, DEPENDENCY_AUTO); + if (Gp_role == GP_ROLE_DISPATCH && ENABLE_DISPATCH()) + { + CdbDispatchUtilityStatement((Node *) ivm_trigger, + DF_CANCEL_ON_ERROR| + DF_WITH_SNAPSHOT| + DF_NEED_TWO_PHASE, + GetAssignedOidsForDispatch(), + NULL); + } /* Make changes-so-far visible */ CommandCounterIncrement(); } diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index ad3abe078ec..fa565d96744 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -153,7 +153,7 @@ static RangeTblEntry *replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable QueryEnvironment *queryEnv); static Query *rewrite_query_for_counting(Query *query, ParseState *pstate); -static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query, +static void calc_delta(Oid matviewOid,MV_TriggerTable *table, int rte_index, Query *query, DestReceiver *dest_old, DestReceiver *dest_new, TupleDesc *tupdesc_old, TupleDesc *tupdesc_new, QueryEnvironment *queryEnv); @@ -1474,6 +1474,7 @@ IVM_immediate_before(PG_FUNCTION_ARGS) } entry->before_trig_count++; + elog(INFO, "trigger IVM_immediate_before."); return PointerGetDatum(NULL); } @@ -1746,7 +1747,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) TupleDesc tupdesc_new; /* calculate delta tables */ - calc_delta(table, rte_index, rewritten, dest_old, dest_new, + calc_delta(matviewOid, table, rte_index, rewritten, dest_old, dest_new, &tupdesc_old, &tupdesc_new, queryEnv); /* Set the table in the query to post-update state */ @@ -1796,7 +1797,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) /* Restore userid and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); - + elog(INFO, "trigger IVM_immediate_maintenance."); return PointerGetDatum(NULL); } @@ -2172,7 +2173,7 @@ rewrite_query_for_counting(Query *query, ParseState *pstate) * by the RTE index. */ static void -calc_delta(MV_TriggerTable *table, int rte_index, Query *query, +calc_delta(Oid matviewOid, MV_TriggerTable *table, int rte_index, Query *query, DestReceiver *dest_old, DestReceiver *dest_new, TupleDesc *tupdesc_old, TupleDesc *tupdesc_new, QueryEnvironment *queryEnv) @@ -2182,9 +2183,7 @@ calc_delta(MV_TriggerTable *table, int rte_index, Query *query, RefreshClause *refreshClause; in_delta_calculation = true; - RangeVar *relation = makeRangeVar(get_namespace_name(RelationGetNamespace(table->rel)), - pstrdup(RelationGetRelationName(table->rel)), - -1); + RangeVar *relation = makeRangeVar(get_namespace_name(get_rel_namespace(matviewOid)), get_rel_name(matviewOid), -1); refreshClause = MakeRefreshClause(false, false, relation, RelationIsAppendOptimized(table->rel)); From e30ffe91a3cc882536ed4b3c7012b42ade029fa2 Mon Sep 17 00:00:00 2001 From: yang jianghua Date: Sat, 22 Jul 2023 20:37:56 +0800 Subject: [PATCH 8/8] test ivm utility mode --- src/backend/commands/createas.c | 2 +- src/backend/commands/matview.c | 34 +++++++++++++++++++++++++++------ src/backend/commands/trigger.c | 21 +++++++++++--------- src/include/catalog/pg_proc.dat | 14 ++++++++++---- src/include/commands/matview.h | 8 +++++--- 5 files changed, 56 insertions(+), 23 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 2ae22fe013c..e0b54cac8a1 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -1041,7 +1041,7 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock ex_lock = true; ivm_trigger->funcname = - (timing == TRIGGER_TYPE_BEFORE ? SystemFuncName("IVM_immediate_before") : SystemFuncName("IVM_immediate_maintenance")); + (timing == TRIGGER_TYPE_BEFORE ? SystemFuncName("ivm_immediate_before") : SystemFuncName("ivm_immediate_maintenance")); ivm_trigger->columns = NIL; ivm_trigger->transitionRels = transitionRels; diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index fa565d96744..1b96bff58a8 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -1394,14 +1394,14 @@ get_matview_query(Relation matviewRel) */ /* - * IVM_immediate_before + * ivm_immediate_before * * IVM trigger function invoked before base table is modified. If this is * invoked firstly in the same statement, we save the transaction id and the * command id at that time. */ Datum -IVM_immediate_before(PG_FUNCTION_ARGS) +ivm_immediate_before(PG_FUNCTION_ARGS) { TriggerData *trigdata = (TriggerData *) fcinfo->context; char *matviewOid_text = trigdata->tg_trigger->tgargs[0]; @@ -1474,20 +1474,20 @@ IVM_immediate_before(PG_FUNCTION_ARGS) } entry->before_trig_count++; - elog(INFO, "trigger IVM_immediate_before."); + elog(INFO, "trigger ivm_immediate_before."); return PointerGetDatum(NULL); } /* - * IVM_immediate_maintenance + * ivm_immediate_maintenance * * IVM trigger function invoked after base table is modified. * For each table, tuplestores of transition tables are collected. * and after the last modification */ Datum -IVM_immediate_maintenance(PG_FUNCTION_ARGS) +ivm_immediate_maintenance(PG_FUNCTION_ARGS) { TriggerData *trigdata = (TriggerData *) fcinfo->context; Relation rel; @@ -1732,6 +1732,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcxt); } + // FIXME: This is a hack to avoid error in oid_dispatch.c + Gp_role = GP_ROLE_UTILITY; /* for all modified tables */ foreach(lc, entry->tables) { @@ -1773,6 +1775,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) tuplestore_clear(new_tuplestore); } } + Gp_role = GP_ROLE_EXECUTE; /* Clean up hash entry and delete tuplestores */ clean_up_IVM_hash_entry(entry, false); @@ -1797,7 +1800,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS) /* Restore userid and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); - elog(INFO, "trigger IVM_immediate_maintenance."); + elog(INFO, "trigger ivm_immediate_maintenance."); return PointerGetDatum(NULL); } @@ -2564,3 +2567,22 @@ isIvmName(const char *s) return (strncmp(s, "__ivm_", 6) == 0); return false; } + + +Datum +ivm_rule_before(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + bool result = true; + elog(INFO, "trigger ivm_rule_before %d", tableoid); + PG_RETURN_BOOL(result); +} + +Datum +ivm_rule_after(PG_FUNCTION_ARGS) +{ + Oid tableoid = PG_GETARG_OID(0); + bool result = true; + elog(INFO, "trigger ivm_rule_after %d", tableoid); + PG_RETURN_BOOL(result); +} diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index c163a60dd58..9007d1544f4 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2272,12 +2272,12 @@ ExecBSInsertTriggers(EState *estate, ResultRelInfo *relinfo) TriggerDesc *trigdesc; int i; TriggerData LocTriggerData = {0}; - - if (Gp_role == GP_ROLE_EXECUTE) - { - /* Don't fire statement-triggers in executor nodes. */ - return; - } + //FIXME: + // if (Gp_role == GP_ROLE_EXECUTE) + // { + // /* Don't fire statement-triggers in executor nodes. */ + // return; + // } trigdesc = relinfo->ri_TrigDesc; @@ -5688,10 +5688,13 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, */ if (afterTriggers.query_depth < 0) elog(ERROR, "AfterTriggerSaveEvent() called outside of query"); - + //FIXME: /* Don't fire statement-triggers in executor nodes. */ - if (!row_trigger && Gp_role == GP_ROLE_EXECUTE) - return; + // if (!row_trigger && Gp_role == GP_ROLE_EXECUTE) + // { + // elog(INFO, "AfterTriggerSaveEvent() Gp_role %d.", Gp_role); + // return; + // } /* Be sure we have enough space to record events at this query depth. */ if (afterTriggers.query_depth >= afterTriggers.maxquerydepth) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b184f32fa94..a972261b275 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12477,13 +12477,19 @@ # IVM { oid => '786', descr => 'ivm trigger (before)', - proname => 'IVM_immediate_before', provolatile => 'v', prorettype => 'trigger', - proargtypes => '', prosrc => 'IVM_immediate_before' }, + proname => 'ivm_immediate_before', provolatile => 'v', prorettype => 'trigger', + proargtypes => '', prosrc => 'ivm_immediate_before' }, { oid => '787', descr => 'ivm trigger (after)', - proname => 'IVM_immediate_maintenance', provolatile => 'v', prorettype => 'trigger', - proargtypes => '', prosrc => 'IVM_immediate_maintenance' }, + proname => 'ivm_immediate_maintenance', provolatile => 'v', prorettype => 'trigger', + proargtypes => '', prosrc => 'ivm_immediate_maintenance' }, { oid => '788', descr => 'ivm filetring ', proname => 'ivm_visible_in_prestate', provolatile => 's', prorettype => 'bool', proargtypes => 'oid tid oid', prosrc => 'ivm_visible_in_prestate' }, +{ oid => '789', descr => 'ivm rule before ', + proname => 'ivm_rule_before', provolatile => 'v', prorettype => 'bool', + proargtypes => 'oid', prosrc => 'ivm_rule_before' }, +{ oid => '7201', descr => 'ivm rule after ', + proname => 'ivm_rule_after', provolatile => 'v', prorettype => 'bool', + proargtypes => 'oid', prosrc => 'ivm_rule_after' }, ] diff --git a/src/include/commands/matview.h b/src/include/commands/matview.h index 087a8b88746..ed3ad127681 100644 --- a/src/include/commands/matview.h +++ b/src/include/commands/matview.h @@ -36,9 +36,11 @@ extern bool MatViewIncrementalMaintenanceIsEnabled(void); extern void transientrel_init(QueryDesc *queryDesc); -extern Datum IVM_immediate_before(PG_FUNCTION_ARGS); -extern Datum IVM_immediate_maintenance(PG_FUNCTION_ARGS); -extern Datum IVM_visible_in_prestate(PG_FUNCTION_ARGS); +extern Datum ivm_immediate_before(PG_FUNCTION_ARGS); +extern Datum ivm_immediate_maintenance(PG_FUNCTION_ARGS); +extern Datum ivm_visible_in_prestate(PG_FUNCTION_ARGS); +extern Datum ivm_rule_before(PG_FUNCTION_ARGS); +extern Datum ivm_rule_after(PG_FUNCTION_ARGS); extern void AtAbort_IVM(void); extern bool isIvmName(const char *s);