Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ RWOBJS = \
$(FETOOLS)/pg_rewind/local_source.o \
$(FETOOLS)/pg_rewind/parsexlog.o \
$(FETOOLS)/pg_rewind/pg_rewind.o \
$(FETOOLS)/pg_rewind/tde_file.o \
$(FETOOLS)/pg_rewind/timeline.o

RMGRDESCSOURCES = $(sort $(wildcard $(FETOOLS)/rmgrdesc/*desc*.c))
Expand Down
87 changes: 52 additions & 35 deletions fetools/pg18/pg_rewind/filemap.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ action_to_str(file_action_t action)
return "CREATE";
case FILE_ACTION_REMOVE:
return "REMOVE";
case FILE_ACTION_ENSURE_TDE_KEY:
return "ENSURE_KEY";

default:
return "unknown";
Expand Down Expand Up @@ -572,9 +574,33 @@ isRelDataFile(const char *path)
{
RelFileLocator rlocator;
unsigned int segNo;
int nmatch;
bool matched;

matched = path_rlocator(path, &rlocator, &segNo);
if (matched)
{
char *check_path = datasegpath(rlocator, MAIN_FORKNUM, segNo);

if (strcmp(check_path, path) != 0)
matched = false;

pfree(check_path);
}

return matched;
}

/*
* Sets rlocator and segNo based on given path. Returns false if didn't find
* a match.
*
* Only concerned with files belonging to the main fork.
*/
bool
path_rlocator(const char *path, RelFileLocator *rlocator, unsigned int *segNo)
{
int nmatch;

/*----
* Relation data files can be in one of the following directories:
*
Expand All @@ -594,55 +620,38 @@ isRelDataFile(const char *path)
*
*----
*/
rlocator.spcOid = InvalidOid;
rlocator.dbOid = InvalidOid;
rlocator.relNumber = InvalidRelFileNumber;
segNo = 0;
matched = false;
rlocator->spcOid = InvalidOid;
rlocator->dbOid = InvalidOid;
rlocator->relNumber = InvalidRelFileNumber;
*segNo = 0;

nmatch = sscanf(path, "global/%u.%u", &rlocator.relNumber, &segNo);
nmatch = sscanf(path, "global/%u.%u", &rlocator->relNumber, segNo);
if (nmatch == 1 || nmatch == 2)
{
rlocator.spcOid = GLOBALTABLESPACE_OID;
rlocator.dbOid = 0;
matched = true;
rlocator->spcOid = GLOBALTABLESPACE_OID;
rlocator->dbOid = 0;
return true;
}
else
{
nmatch = sscanf(path, "base/%u/%u.%u",
&rlocator.dbOid, &rlocator.relNumber, &segNo);
&rlocator->dbOid, &rlocator->relNumber, segNo);
if (nmatch == 2 || nmatch == 3)
{
rlocator.spcOid = DEFAULTTABLESPACE_OID;
matched = true;
rlocator->spcOid = DEFAULTTABLESPACE_OID;
return true;
}
else
{
nmatch = sscanf(path, "pg_tblspc/%u/" TABLESPACE_VERSION_DIRECTORY "/%u/%u.%u",
&rlocator.spcOid, &rlocator.dbOid, &rlocator.relNumber,
&segNo);
&rlocator->spcOid, &rlocator->dbOid, &rlocator->relNumber,
segNo);
if (nmatch == 3 || nmatch == 4)
matched = true;
return true;
}
}

/*
* The sscanf tests above can match files that have extra characters at
* the end. To eliminate such cases, cross-check that GetRelationPath
* creates the exact same filename, when passed the RelFileLocator
* information we extracted from the filename.
*/
if (matched)
{
char *check_path = datasegpath(rlocator, MAIN_FORKNUM, segNo);

if (strcmp(check_path, path) != 0)
matched = false;

pfree(check_path);
}

return matched;
return false;
}

/*
Expand Down Expand Up @@ -712,6 +721,13 @@ decide_file_action(file_entry_t *entry)
if (strstr(path, ".DS_Store") != NULL)
return FILE_ACTION_NONE;

/*
* Skip pg_tde key data but WAL-related stuff as WAL being replaced by
* source's. We will handle the rest while re-encrypting data.
*/
if (strstr(path, "pg_tde/") != NULL)
return FILE_ACTION_NONE;

/*
* Remove all files matching the exclusion filters in the target.
*/
Expand Down Expand Up @@ -831,14 +847,15 @@ decide_file_action(file_entry_t *entry)
* in the target will be copied based on parsing the target
* system's WAL, and any blocks modified in the source will be
* updated after rewinding, when the source system's WAL is
* replayed.
* replayed. But we still have to sync source/target keys in
* case it is encrypted.
*/
if (entry->target_size < entry->source_size)
return FILE_ACTION_COPY_TAIL;
else if (entry->target_size > entry->source_size)
return FILE_ACTION_TRUNCATE;
else
return FILE_ACTION_NONE;
return FILE_ACTION_ENSURE_TDE_KEY;
}
break;

Expand Down
5 changes: 5 additions & 0 deletions fetools/pg18/pg_rewind/filemap.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ typedef enum
* blocks based on the parsed WAL) */
FILE_ACTION_TRUNCATE, /* truncate local file to 'newsize' bytes */
FILE_ACTION_REMOVE, /* remove local file / directory / symlink */
FILE_ACTION_ENSURE_TDE_KEY, /* data file with no action, but we to check
* if it is encrypted and sync source/target
* keys */
} file_action_t;

typedef enum
Expand Down Expand Up @@ -113,4 +116,6 @@ extern void print_filemap(filemap_t *filemap);
extern void keepwal_init(void);
extern void keepwal_add_entry(const char *path);

extern bool path_rlocator(const char *path, RelFileLocator *rlocator, unsigned int *segNo);

#endif /* FILEMAP_H */
70 changes: 69 additions & 1 deletion fetools/pg18/pg_rewind/libpq_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "pg_rewind.h"
#include "port/pg_bswap.h"
#include "rewind_source.h"
#include "tde_file.h"

#include "pg_tde.h"

/*
* Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
Expand All @@ -31,6 +34,7 @@ typedef struct
const char *path; /* path relative to data directory root */
off_t offset;
size_t length;
bool encrypt;
} fetch_range_request;

typedef struct
Expand Down Expand Up @@ -71,6 +75,10 @@ static char *libpq_fetch_file(rewind_source *source, const char *path,
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
static void libpq_destroy(rewind_source *source);

static void libpq_queue_fetch_range_do(rewind_source *source, const char *path,
bool encrypt, off_t off, size_t len);
static void libpq_fetch_tde_keys(rewind_source *source);

/*
* Create a new libpq source.
*
Expand Down Expand Up @@ -100,6 +108,8 @@ init_libpq_source(PGconn *conn)
initStringInfo(&src->offsets);
initStringInfo(&src->lengths);

libpq_fetch_tde_keys(&src->common);

return &src->common;
}

Expand Down Expand Up @@ -345,7 +355,7 @@ libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
* fetch-requests are for a whole file.
*/
open_target_file(path, true);
libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE));
libpq_queue_fetch_range_do(source, path, false, 0, Max(len, MAX_CHUNK_SIZE));
}

/*
Expand All @@ -354,6 +364,16 @@ libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
static void
libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
size_t len)
{
libpq_queue_fetch_range_do(source, path, true, off, len);
}

/*
* Queue up a request to fetch a piece of a file from remote system.
*/
static void
libpq_queue_fetch_range_do(rewind_source *source, const char *path, bool encrypt, off_t off,
size_t len)
{
libpq_source *src = (libpq_source *) source;

Expand Down Expand Up @@ -406,6 +426,7 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
src->request_queue[src->num_requests].path = path;
src->request_queue[src->num_requests].offset = off;
src->request_queue[src->num_requests].length = thislen;
src->request_queue[src->num_requests].encrypt = encrypt;
src->num_requests++;

off += thislen;
Expand Down Expand Up @@ -592,6 +613,19 @@ process_queued_fetch_requests(libpq_source *src)

open_target_file(filename, false);

if (rq->encrypt)
{
Assert(chunksize % BLCKSZ == 0);

ensure_tde_keys(filename);

for (int i = 0; i < chunksize / BLCKSZ; i++)
{
unsigned char *data = (unsigned char *) chunk + BLCKSZ * i;

encrypt_block(data, chunkoff + BLCKSZ * i, MAIN_FORKNUM);
}
}
write_target_range(chunk, chunkoff, chunksize);
}

Expand Down Expand Up @@ -682,3 +716,37 @@ libpq_destroy(rewind_source *source)

/* NOTE: we don't close the connection here, as it was not opened by us. */
}

static void
libpq_fetch_tde_keys(rewind_source *source)
{
PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res;

res = PQexec(conn, "SELECT pg_ls_dir('"PG_TDE_DATA_DIR"', true, false)");

if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not fetch file list: %s",
PQresultErrorMessage(res));

/* no tde dir, nothing to do */
if (PQnfields(res) == 0)
return;

init_tde();

for (int i = 0; i < PQntuples(res); i++)
{
char *path;
char *tde_file_buf;
size_t size;
char target_path[MAXPGPATH];

path = PQgetvalue(res, i, 0);

snprintf(target_path, MAXPGPATH, "%s/%s", PG_TDE_DATA_DIR, path);
tde_file_buf = libpq_fetch_file(source, target_path, &size);

write_tmp_source_file(path, tde_file_buf, size);
}
}
48 changes: 45 additions & 3 deletions fetools/pg18/pg_rewind/local_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
#include "postgres_fe.h"

#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>

#include "catalog/pg_tablespace_d.h"
#include "common/logging.h"
#include "file_ops.h"
#include "pg_rewind.h"
#include "rewind_source.h"
#include "tde_file.h"

#include "pg_tde.h"
#include "common/pg_tde_utils.h"
#include "access/pg_tde_tdemap.h"

typedef struct
{
Expand All @@ -34,6 +42,8 @@ static void local_queue_fetch_range(rewind_source *source, const char *path,
static void local_finish_fetch(rewind_source *source);
static void local_destroy(rewind_source *source);

static void local_fetch_tde_keys(rewind_source *source);

rewind_source *
init_local_source(const char *datadir)
{
Expand All @@ -51,6 +61,8 @@ init_local_source(const char *datadir)

src->datadir = datadir;

local_fetch_tde_keys(&src->common);

return &src->common;
}

Expand Down Expand Up @@ -145,6 +157,8 @@ local_queue_fetch_range(rewind_source *source, const char *path, off_t off,

open_target_file(path, false);

ensure_tde_keys(path);

while (end - begin > 0)
{
ssize_t readlen;
Expand All @@ -162,6 +176,9 @@ local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
else if (readlen == 0)
pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);

/* Re-encrypt blocks with a proper key if neeed. */
encrypt_block((unsigned char *) buf.data, begin, MAIN_FORKNUM);

write_target_range(buf.data, begin, readlen);
begin += readlen;
}
Expand All @@ -170,12 +187,37 @@ local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
pg_fatal("could not close file \"%s\": %m", srcpath);
}

static bool
directory_exists(const char *dir)
{
struct stat st;

if (stat(dir, &st) != 0)
return false;
if (S_ISDIR(st.st_mode))
return true;
return false;
}

static void
local_fetch_tde_keys(rewind_source *source)
{
char tde_source_dir[MAXPGPATH];
const char *datadir = ((local_source *) source)->datadir;

snprintf(tde_source_dir, sizeof(tde_source_dir), "%s/%s", datadir, PG_TDE_DATA_DIR);

if (!directory_exists(tde_source_dir))
return;

init_tde();
copy_tmp_tde_files(tde_source_dir);
}

static void
local_finish_fetch(rewind_source *source)
{
/*
* Nothing to do, local_queue_fetch_range() copies the ranges immediately.
*/
flush_current_key();
}

static void
Expand Down
Loading
Loading