From d1114d6340d392bf4cbdb26fe763b26f66a786a9 Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Mon, 22 Dec 2025 18:37:39 +0900 Subject: [PATCH 1/7] Add AWS Aurora PostgreSQL auto-discovery support This commit implements Aurora PostgreSQL auto-discovery functionality, mirroring the existing Aurora MySQL implementation. The feature enables automatic detection and management of Aurora PostgreSQL cluster topology. Key features: - Auto-discovery using aurora_replica_status() function - Writer detection via session_id = 'MASTER_SESSION_ID' - Automatic failover handling with hostgroup reconfiguration - Replication lag-based server shunning - Dynamic server addition when new nodes join the cluster New tables: - pgsql_aws_aurora_hostgroups (configuration) - runtime_pgsql_aws_aurora_hostgroups (runtime) Configuration parameters (same as MySQL Aurora): - writer_hostgroup, reader_hostgroup - aurora_port (default: 5432) - domain_name, max_lag_ms, check_interval_ms, check_timeout_ms - writer_is_also_reader, new_reader_weight - add_lag_ms, min_lag_ms, lag_num_checks Files modified: - include/ProxySQL_Admin_Tables_Definitions.h: Aurora table definitions - include/proxysql_admin.h: incoming_aurora_hostgroups field - lib/Admin_Bootstrap.cpp: Admin table registration - lib/ProxySQL_Config.cpp: Config file parsing - lib/ProxySQL_Admin.cpp: Runtime load logic - include/PgSQL_HostGroups_Manager.h: PgSQL_AWS_Aurora_Info class - lib/PgSQL_HostGroups_Manager.cpp: Hostgroup management implementation - include/PgSQL_Monitor.hpp: Monitoring class definitions - lib/PgSQL_Monitor.cpp: Monitoring thread implementation --- include/PgSQL_HostGroups_Manager.h | 54 ++ include/PgSQL_Monitor.hpp | 103 +++ include/ProxySQL_Admin_Tables_Definitions.h | 5 + include/proxysql_admin.h | 3 +- lib/Admin_Bootstrap.cpp | 3 + lib/PgSQL_HostGroups_Manager.cpp | 619 ++++++++++++++++++ lib/PgSQL_Monitor.cpp | 690 ++++++++++++++++++++ lib/ProxySQL_Admin.cpp | 43 +- lib/ProxySQL_Config.cpp | 56 ++ 9 files changed, 1574 insertions(+), 2 deletions(-) diff --git a/include/PgSQL_HostGroups_Manager.h b/include/PgSQL_HostGroups_Manager.h index 4f66c7ef80..b9d19e722c 100644 --- a/include/PgSQL_HostGroups_Manager.h +++ b/include/PgSQL_HostGroups_Manager.h @@ -48,6 +48,15 @@ #endif /* DEBUG */ #define MYHGM_PgSQL_REPLICATION_HOSTGROUPS "CREATE TABLE pgsql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>=0) , check_type VARCHAR CHECK (LOWER(check_type) IN ('read_only')) NOT NULL DEFAULT 'read_only' , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (reader_hostgroup))" +// AWS Aurora PostgreSQL hostgroups table definition +#define MYHGM_PgSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE pgsql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , " \ + "active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NULL DEFAULT 5432 , domain_name VARCHAR NOT NULL DEFAULT '' , " \ + "max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , " \ + "check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , " \ + "writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , " \ + "add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , " \ + "lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (reader_hostgroup))" + #define PGHGM_GEN_ADMIN_RUNTIME_SERVERS "SELECT hostgroup_id, hostname, port, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" WHEN 4 THEN \"SHUNNED\" END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers ORDER BY hostgroup_id, hostname, port" #define MYHGM_PgSQL_HOSTGROUP_ATTRIBUTES "CREATE TABLE pgsql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , hostgroup_settings VARCHAR CHECK (JSON_VALID(hostgroup_settings) OR hostgroup_settings = '') NOT NULL DEFAULT '' , servers_defaults VARCHAR CHECK (JSON_VALID(servers_defaults) OR servers_defaults = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')" @@ -379,6 +388,33 @@ struct PgSQL_srv_opts_t { int32_t use_ssl; }; +/** + * @brief AWS Aurora PostgreSQL configuration info + * @details Stores configuration for each Aurora PostgreSQL hostgroup pair + */ +class PgSQL_AWS_Aurora_Info { +public: + int writer_hostgroup; + int reader_hostgroup; + int aurora_port; + int max_lag_ms; + int add_lag_ms; + int min_lag_ms; + int lag_num_checks; + int check_interval_ms; + int check_timeout_ms; + bool active; + bool __active; // temporary flag for tracking during regeneration + int writer_is_also_reader; + int new_reader_weight; + char *domain_name; + char *comment; + + PgSQL_AWS_Aurora_Info(int w, int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c); + bool update(int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c); + ~PgSQL_AWS_Aurora_Info(); +}; + class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { #if 0 SQLite3DB *admindb; @@ -547,6 +583,13 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { */ SQLite3_result *incoming_replication_hostgroups; + // AWS Aurora PostgreSQL + void generate_pgsql_aws_aurora_hostgroups_table(); + SQLite3_result *incoming_aws_aurora_hostgroups; + + pthread_mutex_t AWS_Aurora_Info_mutex; + std::map AWS_Aurora_Info_Map; + void generate_pgsql_hostgroup_attributes_table(); SQLite3_result *incoming_hostgroup_attributes; @@ -839,6 +882,17 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager { void unshun_server_all_hostgroups(const char * address, uint16_t port, time_t t, int max_wait_sec, unsigned int *skip_hid); PgSQL_SrvC* find_server_in_hg(unsigned int _hid, const std::string& addr, int port); + // AWS Aurora PostgreSQL methods + bool aws_aurora_replication_lag_action(int _whid, int _rhid, char *server_id, float current_replication_lag_ms, bool enable, bool is_writer, bool verbose=true); + void update_aws_aurora_set_writer(int _whid, int _rhid, char *server_id, bool verbose=true); + void update_aws_aurora_set_reader(int _whid, int _rhid, char *server_id); + /** + * @brief Updates the resultset and corresponding checksum used by Monitor for AWS Aurora PostgreSQL. + * @param lock Whether if both 'AWS_Aurora_Info_mutex' and 'PgSQL_Monitor::aws_aurora_mutex' mutexes should + * be acquired before the update takes place or not. + */ + void update_aws_aurora_hosts_monitor_resultset(bool lock=false); + private: void update_hostgroup_manager_mappings(); uint64_t get_pgsql_servers_checksum(SQLite3_result* runtime_pgsql_servers = nullptr); diff --git a/include/PgSQL_Monitor.hpp b/include/PgSQL_Monitor.hpp index bd5a3b7b78..4ef4b3d4cd 100644 --- a/include/PgSQL_Monitor.hpp +++ b/include/PgSQL_Monitor.hpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #define MONITOR_SQLITE_TABLE_PGSQL_SERVER_CONNECT_LOG "CREATE TABLE pgsql_server_connect_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , connect_success_time_us INT DEFAULT 0 , connect_error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))" @@ -20,6 +22,19 @@ #define MONITOR_SQLITE_TABLE_PROXYSQL_SERVERS "CREATE TABLE proxysql_servers (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostname, port) )" +// AWS Aurora PostgreSQL monitoring tables +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_LOG "CREATE TABLE pgsql_server_aws_aurora_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , error VARCHAR , server_id VARCHAR NOT NULL DEFAULT '' , session_id VARCHAR , last_update_timestamp VARCHAR , replica_lag_in_msec INT NOT NULL DEFAULT 0 , estimated_lag_ms INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port, time_start_us, server_id))" + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_CHECK_STATUS "CREATE TABLE pgsql_server_aws_aurora_check_status (writer_hostgroup INT NOT NULL , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , last_checked_at VARCHAR , checks_tot INT NOT NULL DEFAULT 0 , checks_ok INT NOT NULL DEFAULT 0 , last_error VARCHAR , PRIMARY KEY (writer_hostgroup, hostname, port))" + +#define MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_FAILOVERS "CREATE TABLE pgsql_server_aws_aurora_failovers (writer_hostgroup INT NOT NULL , hostname VARCHAR NOT NULL , inserted_at VARCHAR NOT NULL)" + +#define PGSQL_AWS_Aurora_Nentries 150 + +// Forward declarations +class PgSQL_AWS_Aurora_monitor_node; +class PgSQL_AWS_Aurora_status_entry; + struct PgSQL_Monitor { // @brief Flags if monitoring threads should be shutdown. bool shutdown = false; @@ -54,6 +69,18 @@ struct PgSQL_Monitor { const_cast("pgsql_server_read_only_log"), const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_READ_ONLY_LOG) }, + { + const_cast("pgsql_server_aws_aurora_log"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_LOG) + }, + { + const_cast("pgsql_server_aws_aurora_check_status"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_CHECK_STATUS) + }, + { + const_cast("pgsql_server_aws_aurora_failovers"), + const_cast(MONITOR_SQLITE_TABLE_PGSQL_SERVER_AWS_AURORA_FAILOVERS) + }, }; std::vector tables_defs_monitor_internal { @@ -63,7 +90,24 @@ struct PgSQL_Monitor { } }; + // AWS Aurora PostgreSQL monitoring members - placed at end to avoid initialization issues + /////////////////////////////////////////////////////////////////////////// + pthread_mutex_t aws_aurora_mutex; // initialized in constructor like MySQL + SQLite3_result* AWS_Aurora_Hosts_resultset; + uint64_t AWS_Aurora_Hosts_resultset_checksum; + std::map AWS_Aurora_Hosts_Map; + /////////////////////////////////////////////////////////////////////////// + PgSQL_Monitor(); + ~PgSQL_Monitor(); + + // AWS Aurora PostgreSQL methods + unsigned int estimate_lag(char* server_id, PgSQL_AWS_Aurora_status_entry** aase, unsigned int idx, + unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks); + void evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned int rHG, + PgSQL_AWS_Aurora_status_entry** lasts_ase, unsigned int ase_idx, + unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks); + bool server_responds_to_ping(const char* addr, int port); }; struct pgsql_conn_t { @@ -74,6 +118,65 @@ struct pgsql_conn_t { mf_unique_ptr err {}; }; +/** + * @brief Represents a single row from aurora_replica_status() function + * @details PostgreSQL Aurora equivalent of AWS_Aurora_replica_host_status_entry + */ +class PgSQL_AWS_Aurora_replica_host_status_entry { +public: + char* server_id = nullptr; + char* session_id = nullptr; + char* last_update_timestamp = nullptr; + float replica_lag_ms = 0.0; + unsigned int estimated_lag_ms = 0; + bool is_current_master = false; + PgSQL_AWS_Aurora_replica_host_status_entry(char* serid, char* sessid, char* lut, float rlm, bool is_master); + PgSQL_AWS_Aurora_replica_host_status_entry(char* serid, char* sessid, char* lut, const char* rlm, bool is_master); + ~PgSQL_AWS_Aurora_replica_host_status_entry(); +}; + +/** + * @brief Represents a single check executed against a single Aurora node + * @details Can contain several PgSQL_AWS_Aurora_replica_host_status_entry + */ +class PgSQL_AWS_Aurora_status_entry { +public: + unsigned long long start_time; + unsigned long long check_time; + char* error; + std::vector* host_statuses; + PgSQL_AWS_Aurora_status_entry(unsigned long long st, unsigned long long ct, char* e); + void add_host_status(PgSQL_AWS_Aurora_replica_host_status_entry* hs); + ~PgSQL_AWS_Aurora_status_entry(); +}; + +/** + * @brief Represents a single Aurora node where checks are executed + * @details A single node will have a PgSQL_AWS_Aurora_status_entry per check + */ +class PgSQL_AWS_Aurora_monitor_node { +private: + int idx_last_entry; +public: + char* addr; + int port; + unsigned int writer_hostgroup; + uint64_t num_checks_tot; + uint64_t num_checks_ok; + time_t last_checked_at; + PgSQL_AWS_Aurora_status_entry* last_entries[PGSQL_AWS_Aurora_Nentries]; + PgSQL_AWS_Aurora_monitor_node(char* _a, int _p, int _whg); + ~PgSQL_AWS_Aurora_monitor_node(); + bool add_entry(PgSQL_AWS_Aurora_status_entry* ase); + PgSQL_AWS_Aurora_status_entry* last_entry() { + if (idx_last_entry == -1) return nullptr; + return last_entries[idx_last_entry]; + } +}; + void* PgSQL_monitor_scheduler_thread(); +void* PgSQL_monitor_AWS_Aurora_thread(void* arg); +void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg); +void* PgSQL_monitor_aws_aurora(void* arg); #endif diff --git a/include/ProxySQL_Admin_Tables_Definitions.h b/include/ProxySQL_Admin_Tables_Definitions.h index 6305c6026f..cfb3a4b345 100644 --- a/include/ProxySQL_Admin_Tables_Definitions.h +++ b/include/ProxySQL_Admin_Tables_Definitions.h @@ -221,6 +221,11 @@ #define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE runtime_mysql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NUlL DEFAULT 3306 , domain_name VARCHAR NOT NULL CHECK (SUBSTR(domain_name,1,1) = '.') , max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR , UNIQUE (reader_hostgroup))" +// AWS Aurora PostgreSQL +#define ADMIN_SQLITE_TABLE_PGSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE pgsql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NULL DEFAULT 5432 , domain_name VARCHAR NOT NULL CHECK (SUBSTR(domain_name,1,1) = '.') , max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR , UNIQUE (reader_hostgroup))" + +#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_AWS_AURORA_HOSTGROUPS "CREATE TABLE runtime_pgsql_aws_aurora_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , aurora_port INT NOT NULL DEFAULT 5432 , domain_name VARCHAR NOT NULL CHECK (SUBSTR(domain_name,1,1) = '.') , max_lag_ms INT NOT NULL CHECK (max_lag_ms>= 10 AND max_lag_ms <= 600000) DEFAULT 600000 , check_interval_ms INT NOT NULL CHECK (check_interval_ms >= 100 AND check_interval_ms <= 600000) DEFAULT 1000 , check_timeout_ms INT NOT NULL CHECK (check_timeout_ms >= 80 AND check_timeout_ms <= 3000) DEFAULT 800 , writer_is_also_reader INT CHECK (writer_is_also_reader IN (0,1)) NOT NULL DEFAULT 0 , new_reader_weight INT CHECK (new_reader_weight >= 0 AND new_reader_weight <=10000000) NOT NULL DEFAULT 1 , add_lag_ms INT NOT NULL CHECK (add_lag_ms >= 0 AND add_lag_ms <= 600000) DEFAULT 30 , min_lag_ms INT NOT NULL CHECK (min_lag_ms >= 0 AND min_lag_ms <= 600000) DEFAULT 30 , lag_num_checks INT NOT NULL CHECK (lag_num_checks >= 1 AND lag_num_checks <= 16) DEFAULT 1 , comment VARCHAR , UNIQUE (reader_hostgroup))" + #define ADMIN_SQLITE_TABLE_MYSQL_HOSTGROUP_ATTRIBUTES_V2_5_0 "CREATE TABLE mysql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')" #define ADMIN_SQLITE_TABLE_MYSQL_HOSTGROUP_ATTRIBUTES_V2_5_2 "CREATE TABLE mysql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , servers_defaults VARCHAR CHECK (JSON_VALID(servers_defaults) OR servers_defaults = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')" diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index bc8f35675b..01d553a454 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -200,11 +200,12 @@ struct peer_mysql_servers_v2_t { struct incoming_pgsql_servers_t { SQLite3_result* incoming_pgsql_servers_v2 = NULL; SQLite3_result* incoming_replication_hostgroups = NULL; + SQLite3_result* incoming_aurora_hostgroups = NULL; SQLite3_result* incoming_hostgroup_attributes = NULL; SQLite3_result* runtime_pgsql_servers = NULL; incoming_pgsql_servers_t(); - incoming_pgsql_servers_t(SQLite3_result*, SQLite3_result*, SQLite3_result*, SQLite3_result*); + incoming_pgsql_servers_t(SQLite3_result*, SQLite3_result*, SQLite3_result*, SQLite3_result*, SQLite3_result*); }; // Separate structs for runtime pgsql server and pgsql server v2 to avoid human error diff --git a/lib/Admin_Bootstrap.cpp b/lib/Admin_Bootstrap.cpp index 6e3ad7e9ba..0811484377 100644 --- a/lib/Admin_Bootstrap.cpp +++ b/lib/Admin_Bootstrap.cpp @@ -612,6 +612,8 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_hostgroup_attributes", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_HOSTGROUP_ATTRIBUTES); insert_into_tables_defs(tables_defs_admin, "pgsql_replication_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_REPLICATION_HOSTGROUPS); insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_replication_hostgroups", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_REPLICATION_HOSTGROUPS); + insert_into_tables_defs(tables_defs_admin, "pgsql_aws_aurora_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_AWS_AURORA_HOSTGROUPS); + insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_aws_aurora_hostgroups", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_AWS_AURORA_HOSTGROUPS); insert_into_tables_defs(tables_defs_admin, "pgsql_firewall_whitelist_users", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_USERS); insert_into_tables_defs(tables_defs_admin, "runtime_pgsql_firewall_whitelist_users", ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_FIREWALL_WHITELIST_USERS); @@ -627,6 +629,7 @@ bool ProxySQL_Admin::init(const bootstrap_info_t& bootstrap_info) { insert_into_tables_defs(tables_defs_config, "pgsql_query_rules_fast_routing", ADMIN_SQLITE_TABLE_PGSQL_QUERY_RULES_FAST_ROUTING); insert_into_tables_defs(tables_defs_config, "pgsql_hostgroup_attributes", ADMIN_SQLITE_TABLE_PGSQL_HOSTGROUP_ATTRIBUTES); insert_into_tables_defs(tables_defs_config, "pgsql_replication_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_REPLICATION_HOSTGROUPS); + insert_into_tables_defs(tables_defs_config, "pgsql_aws_aurora_hostgroups", ADMIN_SQLITE_TABLE_PGSQL_AWS_AURORA_HOSTGROUPS); insert_into_tables_defs(tables_defs_config, "pgsql_firewall_whitelist_users", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_USERS); insert_into_tables_defs(tables_defs_config, "pgsql_firewall_whitelist_rules", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_RULES); insert_into_tables_defs(tables_defs_config, "pgsql_firewall_whitelist_sqli_fingerprints", ADMIN_SQLITE_TABLE_PGSQL_FIREWALL_WHITELIST_SQLI_FINGERPRINTS); diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 8576d066ce..55239fe778 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -8,6 +8,9 @@ using json = nlohmann::json; #include "PgSQL_PreparedStatement.h" #include "PgSQL_Data_Stream.h" +#include "PgSQL_Monitor.hpp" + +extern PgSQL_Monitor* GloPgMon; #include #include @@ -807,14 +810,17 @@ PgSQL_HostGroups_Manager::PgSQL_HostGroups_Manager() { mydb->execute(MYHGM_PgSQL_SERVERS); mydb->execute(MYHGM_PgSQL_SERVERS_INCOMING); mydb->execute(MYHGM_PgSQL_REPLICATION_HOSTGROUPS); + mydb->execute(MYHGM_PgSQL_AWS_AURORA_HOSTGROUPS); mydb->execute(MYHGM_PgSQL_HOSTGROUP_ATTRIBUTES); mydb->execute("CREATE INDEX IF NOT EXISTS idx_pgsql_servers_hostname_port ON pgsql_servers (hostname,port)"); MyHostGroups=new PtrArray(); runtime_pgsql_servers=NULL; incoming_replication_hostgroups=NULL; + incoming_aws_aurora_hostgroups=NULL; incoming_hostgroup_attributes = NULL; incoming_pgsql_servers_v2 = NULL; pgsql_servers_to_monitor = NULL; + pthread_mutex_init(&AWS_Aurora_Info_mutex, NULL); { static const char alphanum[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; @@ -1498,6 +1504,14 @@ bool PgSQL_HostGroups_Manager::commit( generate_pgsql_hostgroup_attributes_table(); } + // Aurora PostgreSQL hostgroups + if (incoming_aws_aurora_hostgroups) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM pgsql_aws_aurora_hostgroups\n"); + mydb->execute("DELETE FROM pgsql_aws_aurora_hostgroups"); + generate_pgsql_aws_aurora_hostgroups_table(); + // Note: generate_pgsql_aws_aurora_hostgroups_table() already calls update_aws_aurora_hosts_monitor_resultset() + } + uint64_t new_hash = commit_update_checksum_from_pgsql_servers_v2(peer_pgsql_servers_v2.resultset); { @@ -2912,6 +2926,8 @@ void PgSQL_HostGroups_Manager::save_incoming_pgsql_table(SQLite3_result *s, cons SQLite3_result ** inc = NULL; if (name == "pgsql_replication_hostgroups") { inc = &incoming_replication_hostgroups; + } else if (name == "pgsql_aws_aurora_hostgroups") { + inc = &incoming_aws_aurora_hostgroups; } else if (name == "pgsql_hostgroup_attributes") { inc = &incoming_hostgroup_attributes; } else { @@ -2943,6 +2959,8 @@ void PgSQL_HostGroups_Manager::save_pgsql_servers_v2(SQLite3_result* s) { SQLite3_result* PgSQL_HostGroups_Manager::get_current_pgsql_table(const string& name) { if (name == "pgsql_replication_hostgroups") { return this->incoming_replication_hostgroups; + } else if (name == "pgsql_aws_aurora_hostgroups") { + return this->incoming_aws_aurora_hostgroups; } else if (name == "pgsql_hostgroup_attributes") { return this->incoming_hostgroup_attributes; } else if (name == "cluster_pgsql_servers") { @@ -4555,3 +4573,604 @@ void PgSQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(PgSQL_SrvC* srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; srv->ConnectionsFree->drop_all_connections(); } + +// AWS Aurora PostgreSQL implementations + +PgSQL_AWS_Aurora_Info::PgSQL_AWS_Aurora_Info(int w, int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c) { + writer_hostgroup = w; + reader_hostgroup = r; + aurora_port = _port; + domain_name = _domain ? strdup(_domain) : strdup(""); + max_lag_ms = maxl; + add_lag_ms = al; + min_lag_ms = minl; + lag_num_checks = lnc; + check_interval_ms = ci; + check_timeout_ms = ct; + active = _a; + __active = true; + writer_is_also_reader = wiar; + new_reader_weight = nrw; + comment = c ? strdup(c) : strdup(""); +} + +bool PgSQL_AWS_Aurora_Info::update(int r, int _port, char *_domain, int maxl, int al, int minl, int lnc, int ci, int ct, bool _a, int wiar, int nrw, char *c) { + bool ret = false; + __active = true; + if (reader_hostgroup != r) { + reader_hostgroup = r; + ret = true; + } + if (aurora_port != _port) { + aurora_port = _port; + ret = true; + } + if (strcmp(domain_name, _domain)) { + free(domain_name); + domain_name = strdup(_domain); + ret = true; + } + if (max_lag_ms != maxl) { + max_lag_ms = maxl; + ret = true; + } + if (add_lag_ms != al) { + add_lag_ms = al; + ret = true; + } + if (min_lag_ms != minl) { + min_lag_ms = minl; + ret = true; + } + if (lag_num_checks != lnc) { + lag_num_checks = lnc; + ret = true; + } + if (check_interval_ms != ci) { + check_interval_ms = ci; + ret = true; + } + if (check_timeout_ms != ct) { + check_timeout_ms = ct; + ret = true; + } + if (active != _a) { + active = _a; + ret = true; + } + if (writer_is_also_reader != wiar) { + writer_is_also_reader = wiar; + ret = true; + } + if (new_reader_weight != nrw) { + new_reader_weight = nrw; + ret = true; + } + if (strcmp(comment, c)) { + free(comment); + comment = strdup(c); + ret = true; + } + return ret; +} + +PgSQL_AWS_Aurora_Info::~PgSQL_AWS_Aurora_Info() { + if (domain_name) free(domain_name); + if (comment) free(comment); +} + +void PgSQL_HostGroups_Manager::generate_pgsql_aws_aurora_hostgroups_table() { + if (incoming_aws_aurora_hostgroups == nullptr) { + return; + } + int rc; + sqlite3_stmt *statement = nullptr; + char *query = (char *)"INSERT INTO pgsql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms," + "check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES " + "(?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + rc = mydb->prepare_v2(query, &statement); + ASSERT_SQLITE_OK(rc, mydb); + proxy_info("New pgsql_aws_aurora_hostgroups table\n"); + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + // Mark all existing entries as inactive + for (std::map::iterator it1 = AWS_Aurora_Info_Map.begin(); it1 != AWS_Aurora_Info_Map.end(); ++it1) { + PgSQL_AWS_Aurora_Info *info = nullptr; + info = it1->second; + info->__active = false; + } + // Process incoming entries + for (std::vector::iterator it = incoming_aws_aurora_hostgroups->rows.begin(); it != incoming_aws_aurora_hostgroups->rows.end(); ++it) { + SQLite3_row *r = *it; + int writer_hostgroup = atoi(r->fields[0]); + int reader_hostgroup = atoi(r->fields[1]); + int active = atoi(r->fields[2]); + int aurora_port = atoi(r->fields[3]); + int max_lag_ms = atoi(r->fields[5]); + int check_interval_ms = atoi(r->fields[6]); + int check_timeout_ms = atoi(r->fields[7]); + int writer_is_also_reader = atoi(r->fields[8]); + int new_reader_weight = atoi(r->fields[9]); + int add_lag_ms = atoi(r->fields[10]); + int min_lag_ms = atoi(r->fields[11]); + int lag_num_checks = atoi(r->fields[12]); + proxy_info("Loading AWS Aurora PostgreSQL info for (%d,%d,%s,%d,\"%s\",%d,%d,%d,%d,%d,%d,\"%s\")\n", writer_hostgroup, reader_hostgroup, (active ? "on" : "off"), aurora_port, + r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, r->fields[13]); + rc = (*proxy_sqlite3_bind_int64)(statement, 1, writer_hostgroup); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 2, reader_hostgroup); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 3, active); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 4, aurora_port); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_text)(statement, 5, r->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 6, max_lag_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 7, check_interval_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 8, check_timeout_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 9, writer_is_also_reader); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 10, new_reader_weight); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 11, add_lag_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 12, min_lag_ms); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_int64)(statement, 13, lag_num_checks); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_bind_text)(statement, 14, r->fields[13], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); + + SAFE_SQLITE3_STEP2(statement); + rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mydb); + rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mydb); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(writer_hostgroup); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + bool changed = false; + changed = info->update(reader_hostgroup, aurora_port, r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, (bool)active, writer_is_also_reader, new_reader_weight, r->fields[13]); + if (changed) { + // info->need_converge = true; + } + } else { + info = new PgSQL_AWS_Aurora_Info(writer_hostgroup, reader_hostgroup, aurora_port, r->fields[4], max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks, check_interval_ms, check_timeout_ms, (bool)active, writer_is_also_reader, new_reader_weight, r->fields[13]); + AWS_Aurora_Info_Map.insert(AWS_Aurora_Info_Map.begin(), std::pair(writer_hostgroup, info)); + } + } + (*proxy_sqlite3_finalize)(statement); + delete incoming_aws_aurora_hostgroups; + incoming_aws_aurora_hostgroups = nullptr; + + // Remove inactive entries + for (auto it3 = AWS_Aurora_Info_Map.begin(); it3 != AWS_Aurora_Info_Map.end(); ) { + PgSQL_AWS_Aurora_Info *info = it3->second; + if (info->__active == false) { + delete info; + it3 = AWS_Aurora_Info_Map.erase(it3); + } else { + it3++; + } + } + + // Update monitor resultset + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + update_aws_aurora_hosts_monitor_resultset(false); + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); +} + +bool PgSQL_HostGroups_Manager::aws_aurora_replication_lag_action(int _whid, int _rhid, char *_server_id, float current_replication_lag_ms, bool enable, bool is_writer, bool verbose) { + bool ret = false; + bool reader_found_in_whg = false; + if (is_writer) { + ret = false; + } + unsigned port = 5432; + char *domain_name = strdup((char *)""); + { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(_whid); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + if (info->domain_name) { + free(domain_name); + domain_name = strdup(info->domain_name); + } + port = info->aurora_port; + } + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } + char *address = (char *)malloc(strlen(_server_id) + strlen(domain_name) + 1); + sprintf(address, "%s%s", _server_id, domain_name); + + GloAdmin->pgsql_servers_wrlock(); + wrlock(); + int i, j; + for (i = 0; i < (int)MyHostGroups->len; i++) { + PgSQL_HGC *myhgc = (PgSQL_HGC *)MyHostGroups->index(i); + if (_whid != (int)myhgc->hid && _rhid != (int)myhgc->hid) continue; + for (j = 0; j < (int)myhgc->mysrvs->cnt(); j++) { + PgSQL_SrvC *mysrvc = (PgSQL_SrvC *)myhgc->mysrvs->servers->index(j); + if (strcmp(mysrvc->address, address) == 0 && mysrvc->port == (int)port) { + // Found the server + if (enable == false) { + if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE) { + if (verbose) { + proxy_warning("Aurora PostgreSQL: Shunning server %s:%d from HG %u with replication lag of %f ms\n", + address, port, myhgc->hid, current_replication_lag_ms); + } + mysrvc->status = MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG; + } + } else { + if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { + if (verbose) { + proxy_warning("Aurora PostgreSQL: Re-enabling server %s:%d from HG %u with replication lag of %f ms\n", + address, port, myhgc->hid, current_replication_lag_ms); + } + mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; + } + } + mysrvc->aws_aurora_current_lag_us = current_replication_lag_ms * 1000; + if (mysrvc->status == MYSQL_SERVER_STATUS_ONLINE || mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { + if (ret) { + if (_whid == (int)myhgc->hid && is_writer == false) { + // Server should be a reader but is in writer hostgroup + ret = false; + reader_found_in_whg = true; + } + } else { + if (is_writer == true) { + if (_whid == (int)myhgc->hid) { + // Server is a writer and found in writer hostgroup + ret = true; + } + } else { + if (_rhid == (int)myhgc->hid) { + // Server is a reader and found in reader hostgroup + ret = true; + } + } + } + } + if (ret == false) + if (is_writer == true) + if (enable == true) + if (_whid == (int)myhgc->hid) + if (mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) { + mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; + proxy_warning("Aurora PostgreSQL: Re-enabling server %s:%d from HG %u because it is a writer\n", + address, port, myhgc->hid); + ret = true; + } + } + } + } + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + if (ret == true) { + if (reader_found_in_whg == true) { + ret = false; + } + } + free(address); + free(domain_name); + return ret; +} + +void PgSQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid, char *_server_id, bool verbose) { + int cols = 0; + int affected_rows = 0; + SQLite3_result *resultset = nullptr; + char *query = nullptr; + char *q = nullptr; + char *error = nullptr; + + int writer_is_also_reader = 0; + int new_reader_weight = 1; + bool found_writer = false; + bool found_reader = false; + int _writer_hostgroup = _whid; + int aurora_port = 5432; + char *domain_name = strdup((char *)""); + int read_HG = -1; + { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + writer_is_also_reader = info->writer_is_also_reader; + new_reader_weight = info->new_reader_weight; + read_HG = info->reader_hostgroup; + if (info->domain_name) { + free(domain_name); + domain_name = strdup(info->domain_name); + } + aurora_port = info->aurora_port; + } + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } + + q = (char *)"SELECT hostgroup_id FROM pgsql_servers JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d, %d)"; + query = (char *)malloc(strlen(q) + strlen(_server_id) + strlen(domain_name) + 1024); + sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); + mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + free(error); + error = nullptr; + } + + if (resultset) { + if (resultset->rows_count) { + for (auto it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row *r = *it; + int hostgroup = atoi(r->fields[0]); + if (hostgroup == _writer_hostgroup) { + found_writer = true; + } + if (read_HG >= 0) { + if (hostgroup == read_HG) { + found_reader = true; + } + } + } + } + + if (found_writer) { + if ( + (writer_is_also_reader == 0 && found_reader == false) + || + (writer_is_also_reader > 0 && found_reader == true) + ) { + delete resultset; + resultset = nullptr; + } + } + } + + if (resultset) { + if (resultset->rows_count) { + GloAdmin->pgsql_servers_wrlock(); + mydb->execute("DELETE FROM pgsql_servers_incoming"); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id=%d"; + sprintf(query, q, _rhid); + mydb->execute(query); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; + sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port); + mydb->execute(query); + q = (char *)"UPDATE OR IGNORE pgsql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; + sprintf(query, q, _writer_hostgroup, _server_id, domain_name, aurora_port, _writer_hostgroup); + mydb->execute(query); + q = (char *)"DELETE FROM pgsql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id<>%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); + mydb->execute(query); + q = (char *)"UPDATE pgsql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _writer_hostgroup); + mydb->execute(query); + + // Move the old writer into the reader HG + q = (char *)"DELETE FROM pgsql_servers_incoming WHERE status=3 AND hostgroup_id=%d"; + sprintf(query, q, _rhid); + mydb->execute(query); + q = (char *)"INSERT OR IGNORE INTO pgsql_servers_incoming SELECT %d, hostname, port, %d, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id=%d AND status=0"; + sprintf(query, q, _rhid, new_reader_weight, _whid); + mydb->execute(query); + + if (writer_is_also_reader && read_HG >= 0) { + q = (char *)"INSERT OR IGNORE INTO pgsql_servers_incoming (hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment) SELECT %d,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM pgsql_servers_incoming WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; + sprintf(query, q, read_HG, _writer_hostgroup, _server_id, domain_name, aurora_port); + mydb->execute(query); + q = (char *)"UPDATE pgsql_servers_incoming SET weight=%d WHERE hostgroup_id=%d AND hostname='%s%s' AND port=%d"; + sprintf(query, q, new_reader_weight, read_HG, _server_id, domain_name, aurora_port); + mydb->execute(query); + } + + proxy_warning("Aurora PostgreSQL: setting host %s%s:%d as writer\n", _server_id, domain_name, aurora_port); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id NOT IN (%d, %d)"; + sprintf(query, q, _rhid, _whid); + mydb->execute(query); + commit(); + wrlock(); + q = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + sprintf(query, q, _whid, _rhid); + mydb->execute(query); + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + } else { + // Auto-discovery: server not found, create new entry (matching MySQL approach) + std::string full_hostname = std::string(_server_id) + std::string(domain_name); + + GloAdmin->pgsql_servers_wrlock(); + wrlock(); + + // Use create_new_server_in_hg to create the server in memory + PgSQL_srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora PG" }; + PgSQL_srv_opts_t wr_srv_opts { -1, -1, -1 }; + + int wr_res = create_new_server_in_hg(_writer_hostgroup, srv_info, wr_srv_opts); + int rd_res = -1; + + // WRITER can also be placed as READER, or could previously be one + if (writer_is_also_reader && read_HG >= 0) { + PgSQL_srv_opts_t rd_srv_opts { new_reader_weight, -1, -1 }; + rd_res = create_new_server_in_hg(read_HG, srv_info, rd_srv_opts); + } + + // A new server has been created, or an OFFLINE_HARD brought back as ONLINE + if (wr_res == 0 || rd_res == 0) { + proxy_info("Aurora PostgreSQL: setting new auto-discovered host %s:%d as writer\n", + full_hostname.c_str(), aurora_port); + + purge_pgsql_servers_table(); + + // Delete servers from the table before regenerating + char del_query[256]; + snprintf(del_query, sizeof(del_query), + "DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)", + _writer_hostgroup, _rhid); + mydb->execute(del_query); + + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + update_aws_aurora_hosts_monitor_resultset(true); + } + + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + } + } + if (resultset) { + delete resultset; + resultset = nullptr; + } + if (query) { + free(query); + } + free(domain_name); +} + +void PgSQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid, char *_server_id) { + int cols = 0; + int affected_rows = 0; + SQLite3_result *resultset = nullptr; + char *query = nullptr; + char *q = nullptr; + char *error = nullptr; + int _writer_hostgroup = _whid; + int aurora_port = 5432; + int new_reader_weight = 1; + char *domain_name = strdup((char *)""); + { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + std::map::iterator it2; + it2 = AWS_Aurora_Info_Map.find(_writer_hostgroup); + PgSQL_AWS_Aurora_Info *info = nullptr; + if (it2 != AWS_Aurora_Info_Map.end()) { + info = it2->second; + if (info->domain_name) { + free(domain_name); + domain_name = strdup(info->domain_name); + } + aurora_port = info->aurora_port; + new_reader_weight = info->new_reader_weight; + } + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } + + q = (char *)"SELECT hostgroup_id FROM pgsql_servers JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE hostname='%s%s' AND port=%d AND status<>3 AND hostgroup_id IN (%d,%d)"; + query = (char *)malloc(strlen(q) + strlen(_server_id) + strlen(domain_name) + 64); + sprintf(query, q, _server_id, domain_name, aurora_port, _whid, _rhid); + mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + free(error); + error = nullptr; + } + free(query); + + if (resultset) { + if (resultset->rows_count) { + proxy_warning("Aurora PostgreSQL: setting host %s%s:%d (part of cluster with writer_hostgroup=%d) as a reader, moving from writer_hostgroup %d to reader_hostgroup %d\n", + _server_id, domain_name, aurora_port, _whid, _whid, _rhid); + GloAdmin->pgsql_servers_wrlock(); + mydb->execute("DELETE FROM pgsql_servers_incoming"); + mydb->execute("INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers"); + // If server present as WRITER try moving it to reader_hostgroup + q = (char *)"UPDATE OR IGNORE pgsql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + query = (char *)malloc(strlen(q) + strlen(_server_id) + strlen(domain_name) + 512); + sprintf(query, q, _rhid, _server_id, domain_name, aurora_port, _whid); + mydb->execute(query); + // If server is still in writer_hostgroup, remove it + q = (char *)"DELETE FROM pgsql_servers_incoming WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _whid); + mydb->execute(query); + q = (char *)"UPDATE pgsql_servers_incoming SET status=0 WHERE hostname='%s%s' AND port=%d AND hostgroup_id=%d"; + sprintf(query, q, _server_id, domain_name, aurora_port, _rhid); + mydb->execute(query); + commit(); + wrlock(); + + q = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + sprintf(query, q, _whid, _rhid); + mydb->execute(query); + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + free(query); + } else { + // Auto-discovery: server not found, create new entry in reader hostgroup + // Following MySQL's pattern using create_new_server_in_hg + std::string full_hostname = std::string(_server_id) + std::string(domain_name); + GloAdmin->pgsql_servers_wrlock(); + wrlock(); + + PgSQL_srv_info_t srv_info { full_hostname, static_cast(aurora_port), "Aurora PG" }; + PgSQL_srv_opts_t srv_opts { new_reader_weight, -1, -1 }; + int wr_res = create_new_server_in_hg(_rhid, srv_info, srv_opts); + + // A new server has been created, or an OFFLINE_HARD brought back as ONLINE + if (wr_res == 0) { + purge_pgsql_servers_table(); + + char *q1 = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + char *q2 = (char *)malloc(strlen(q1) + 64); + sprintf(q2, q1, _whid, _rhid); + mydb->execute(q2); + free(q2); + + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); + + // Update AWS Aurora resultset used for monitoring + update_aws_aurora_hosts_monitor_resultset(true); + } + + wrunlock(); + GloAdmin->pgsql_servers_wrunlock(); + } + } + if (resultset) { + delete resultset; + resultset = nullptr; + } + free(domain_name); +} + +const char SELECT_PGSQL_AWS_AURORA_SERVERS_FOR_MONITOR[] { + "SELECT writer_hostgroup, reader_hostgroup, hostname, port, MAX(use_ssl) use_ssl, max_lag_ms, check_interval_ms," + " check_timeout_ms, add_lag_ms, min_lag_ms, lag_num_checks FROM pgsql_servers" + " JOIN pgsql_aws_aurora_hostgroups ON" + " hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE active=1 AND status NOT IN (2,3)" + " GROUP BY writer_hostgroup, hostname, port" +}; + +void PgSQL_HostGroups_Manager::update_aws_aurora_hosts_monitor_resultset(bool lock) { + if (lock) { + pthread_mutex_lock(&AWS_Aurora_Info_mutex); + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + } + + SQLite3_result* resultset = nullptr; + { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + mydb->execute_statement(SELECT_PGSQL_AWS_AURORA_SERVERS_FOR_MONITOR, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Aurora PostgreSQL: Error executing monitor query: %s\n", error); + free(error); + } + } + + if (resultset) { + if (GloPgMon->AWS_Aurora_Hosts_resultset) { + delete GloPgMon->AWS_Aurora_Hosts_resultset; + } + GloPgMon->AWS_Aurora_Hosts_resultset = resultset; + GloPgMon->AWS_Aurora_Hosts_resultset_checksum = resultset->raw_checksum(); + } + + if (lock) { + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + pthread_mutex_unlock(&AWS_Aurora_Info_mutex); + } +} diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 4db02cf77a..27654e24d5 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -101,6 +101,11 @@ void check_and_build_standard_tables(SQLite3DB& db, const vector& t } PgSQL_Monitor::PgSQL_Monitor() { + // Initialize Aurora mutex and members like MySQL does + pthread_mutex_init(&aws_aurora_mutex, NULL); + AWS_Aurora_Hosts_resultset = nullptr; + AWS_Aurora_Hosts_resultset_checksum = 0; + int rc = monitordb.open( const_cast("file:mem_monitordb?mode=memory&cache=shared"), SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX @@ -125,6 +130,153 @@ PgSQL_Monitor::PgSQL_Monitor() { monitordb.execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON pgsql_server_connect_log (time_start_us)"); monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON pgsql_server_ping_log (time_start_us)"); monitordb.execute("CREATE INDEX IF NOT EXISTS idx_ping_2 ON pgsql_server_ping_log (hostname, port, time_start_us)"); + // Aurora specific indexes + monitordb.execute("CREATE INDEX IF NOT EXISTS idx_aurora_log_time_start ON pgsql_server_aws_aurora_log (time_start_us)"); +} + +PgSQL_Monitor::~PgSQL_Monitor() { + if (AWS_Aurora_Hosts_resultset) { + delete AWS_Aurora_Hosts_resultset; + AWS_Aurora_Hosts_resultset = nullptr; + } + // Clean up Aurora hosts map + for (auto& it : AWS_Aurora_Hosts_Map) { + delete it.second; + } + AWS_Aurora_Hosts_Map.clear(); +} + +bool PgSQL_Monitor::server_responds_to_ping(const char* addr, int port) { + int max_fails = 3; // Could be made configurable + cfmt_t q_fmt { cstr_format(RESP_SERVERS_QUERY_T, addr, port, max_fails, max_fails) }; + + char* err { nullptr }; + unique_ptr result { monitordb.execute_statement(q_fmt.str.c_str(), &err) }; + + if (err || result == nullptr) { + free(err); + return false; + } + return !result->rows_count; +} + +unsigned int PgSQL_Monitor::estimate_lag(char* server_id, PgSQL_AWS_Aurora_status_entry** aase, unsigned int idx, + unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks) { + // Safety checks - return 0 if invalid input + // Use N_L_ASE (16) for array bounds, not PGSQL_AWS_Aurora_Nentries (150) + if (!aase || !server_id) { + return 0; + } + if (idx >= N_L_ASE) { + return 0; + } + + if (lag_num_checks > N_L_ASE) lag_num_checks = N_L_ASE; + if (lag_num_checks <= 0) lag_num_checks = 1; + + unsigned int mlag = 0; + unsigned int lag = 0; + + for (unsigned int i = 1; i <= lag_num_checks; i++) { + if (!aase[idx] || !aase[idx]->host_statuses) + break; + for (auto hse : *(aase[idx]->host_statuses)) { + // NULL check for hse->server_id + if (hse && hse->server_id && strcmp(server_id, hse->server_id) == 0 && (unsigned int)hse->replica_lag_ms != 0) { + unsigned int ms = std::max(((unsigned int)hse->replica_lag_ms + add_lag_ms), min_lag_ms); + if (ms > mlag) mlag = ms; + if (!lag) lag = ms; + } + } + if (idx == 0) idx = N_L_ASE; + idx--; + } + + return mlag; +} + +// AWS Aurora PostgreSQL class implementations + +PgSQL_AWS_Aurora_replica_host_status_entry::PgSQL_AWS_Aurora_replica_host_status_entry( + char* serid, char* sessid, char* lut, float rlm, bool is_master +) { + server_id = serid ? strdup(serid) : nullptr; + session_id = sessid ? strdup(sessid) : nullptr; + last_update_timestamp = lut ? strdup(lut) : nullptr; + replica_lag_ms = rlm; + is_current_master = is_master; +} + +PgSQL_AWS_Aurora_replica_host_status_entry::PgSQL_AWS_Aurora_replica_host_status_entry( + char* serid, char* sessid, char* lut, const char* rlm, bool is_master +) { + server_id = serid ? strdup(serid) : nullptr; + session_id = sessid ? strdup(sessid) : nullptr; + last_update_timestamp = lut ? strdup(lut) : nullptr; + replica_lag_ms = rlm ? atof(rlm) : 0.0f; + is_current_master = is_master; +} + +PgSQL_AWS_Aurora_replica_host_status_entry::~PgSQL_AWS_Aurora_replica_host_status_entry() { + if (server_id) free(server_id); + if (session_id) free(session_id); + if (last_update_timestamp) free(last_update_timestamp); +} + +PgSQL_AWS_Aurora_status_entry::PgSQL_AWS_Aurora_status_entry( + unsigned long long st, unsigned long long ct, char* e +) : start_time(st), check_time(ct), error(nullptr) { + if (e) error = strdup(e); + host_statuses = new std::vector(); +} + +void PgSQL_AWS_Aurora_status_entry::add_host_status(PgSQL_AWS_Aurora_replica_host_status_entry* hs) { + host_statuses->push_back(hs); +} + +PgSQL_AWS_Aurora_status_entry::~PgSQL_AWS_Aurora_status_entry() { + if (error) free(error); + for (auto hs : *host_statuses) { + delete hs; + } + delete host_statuses; +} + +PgSQL_AWS_Aurora_monitor_node::PgSQL_AWS_Aurora_monitor_node(char* _a, int _p, int _whg) { + addr = strdup(_a); + port = _p; + writer_hostgroup = _whg; + idx_last_entry = -1; + num_checks_tot = 0; + num_checks_ok = 0; + last_checked_at = 0; + for (int i = 0; i < PGSQL_AWS_Aurora_Nentries; i++) { + last_entries[i] = nullptr; + } +} + +PgSQL_AWS_Aurora_monitor_node::~PgSQL_AWS_Aurora_monitor_node() { + if (addr) free(addr); + for (int i = 0; i < PGSQL_AWS_Aurora_Nentries; i++) { + if (last_entries[i]) delete last_entries[i]; + } +} + +bool PgSQL_AWS_Aurora_monitor_node::add_entry(PgSQL_AWS_Aurora_status_entry* ase) { + num_checks_tot++; + if (ase->error == nullptr) { + num_checks_ok++; + } + last_checked_at = time(nullptr); + idx_last_entry++; + if (idx_last_entry >= PGSQL_AWS_Aurora_Nentries) { + idx_last_entry = 0; + } + if (last_entries[idx_last_entry]) { + delete last_entries[idx_last_entry]; + } + last_entries[idx_last_entry] = ase; + return true; } /** @@ -2088,6 +2240,18 @@ void* PgSQL_monitor_scheduler_thread() { workers.emplace_back(worker_thread_t { std::move(th), std::move(worker_queue) }); } + // Start Aurora PostgreSQL monitoring thread + pthread_t pgsql_monitor_aws_aurora_thread; + pthread_attr_t aurora_attr; + pthread_attr_init(&aurora_attr); + pthread_attr_setstacksize(&aurora_attr, 2048 * 1024); + if (pthread_create(&pgsql_monitor_aws_aurora_thread, &aurora_attr, PgSQL_monitor_aws_aurora, NULL) != 0) { + proxy_error("Failed to create Aurora PostgreSQL monitor thread\n"); + } else { + proxy_info("Started Aurora PostgreSQL monitor thread\n"); + } + pthread_attr_destroy(&aurora_attr); + uint64_t cur_intv_start = 0; tasks_intvs_t next_intvs {}; vector tasks_batches {}; @@ -2222,6 +2386,10 @@ void* PgSQL_monitor_scheduler_thread() { pthread_join(worker.first, NULL); } + // Wait for Aurora thread to exit + pthread_join(pgsql_monitor_aws_aurora_thread, NULL); + proxy_info("Aurora PostgreSQL monitor thread joined\n"); + // Cleanup the global connection pool; no mutex, threads joined for (auto& entry : mon_conn_pool.conn_map) { for (auto& conn : entry.second) { @@ -2233,3 +2401,525 @@ void* PgSQL_monitor_scheduler_thread() { return nullptr; } + +// ========================================================================= +// AWS Aurora PostgreSQL Monitoring Implementation +// ========================================================================= + +extern PgSQL_HostGroups_Manager* PgHGM; + +// Number of last Aurora status entries to keep +#define N_L_ASE 16 + +// Structure to hold host definitions for Aurora monitoring +struct pgsql_host_def_t { + char* host; + int port; + int use_ssl; +}; + +// Helper function to shuffle hosts array +static void shuffle_pgsql_hosts(pgsql_host_def_t* arr, unsigned int n) { + if (n <= 1) return; + for (unsigned int i = n - 1; i > 0; i--) { + unsigned int j = rand() % (i + 1); + if (i != j) { + pgsql_host_def_t tmp; + size_t stride = sizeof(pgsql_host_def_t); + memcpy(&tmp, arr + i * stride / sizeof(pgsql_host_def_t), sizeof(pgsql_host_def_t)); + memcpy(arr + i * stride / sizeof(pgsql_host_def_t), arr + j * stride / sizeof(pgsql_host_def_t), sizeof(pgsql_host_def_t)); + memcpy(arr + j * stride / sizeof(pgsql_host_def_t), &tmp, sizeof(pgsql_host_def_t)); + } + } +} + +void PgSQL_Monitor::evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned int rHG, + PgSQL_AWS_Aurora_status_entry** lasts_ase, unsigned int ase_idx, + unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks) { + + unsigned int prev_ase_idx = ase_idx; + if (prev_ase_idx == 0) prev_ase_idx = N_L_ASE; + prev_ase_idx--; + + PgSQL_AWS_Aurora_status_entry* aase = lasts_ase[ase_idx]; + PgSQL_AWS_Aurora_status_entry* prev_aase = lasts_ase[prev_ase_idx]; + + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (auto it3 = aase->host_statuses->begin(); it3 != aase->host_statuses->end(); ++it3) { + PgSQL_AWS_Aurora_replica_host_status_entry* hse = *it3; + if (!hse) continue; // Skip NULL entries + + bool run_action = true; + bool enable = true; + bool is_writer = false; + bool rla_rc = true; + + // Skip if server_id is NULL + if (!hse->server_id) { + proxy_warning("Aurora PostgreSQL: Skipping entry with NULL server_id\n"); + continue; + } + + unsigned int current_lag_ms = estimate_lag(hse->server_id, lasts_ase, ase_idx, add_lag_ms, min_lag_ms, lag_num_checks); + hse->estimated_lag_ms = current_lag_ms; + + if (current_lag_ms > max_latency_ms) { + enable = false; + } + + // PostgreSQL Aurora uses is_current_master instead of MASTER_SESSION_ID + if (hse->is_current_master) { + is_writer = true; + } + + // Determine if a change needs to be made by comparing with previous check + if (prev_aase && prev_aase->start_time) { + if (prev_aase->host_statuses->size()) { + for (auto it4 = prev_aase->host_statuses->begin(); it4 != prev_aase->host_statuses->end(); ++it4) { + PgSQL_AWS_Aurora_replica_host_status_entry* prev_hse = *it4; + if (!prev_hse || !prev_hse->server_id) continue; // Skip NULL entries + if (strcmp(prev_hse->server_id, hse->server_id) == 0) { + bool prev_enabled = true; + unsigned int prev_lag_ms = estimate_lag(hse->server_id, lasts_ase, prev_ase_idx, add_lag_ms, min_lag_ms, lag_num_checks); + if (prev_lag_ms > max_latency_ms) { + prev_enabled = false; + } + if (prev_enabled == enable) { + // Previous status is the same, no action needed + run_action = false; + } + } + } + } + } + + if (run_action) { + rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer); + } else { + if (is_writer) { + // If the server is a writer we run it anyway for sanity check + rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer); + } + } + + if (rla_rc == false) { + if (is_writer) { + // The server should be a writer but is not configured as one + proxy_info("Aurora PostgreSQL: Calling update_aws_aurora_set_writer for %s\n", hse->server_id); + PgHGM->update_aws_aurora_set_writer(wHG, rHG, hse->server_id); + + // Log failover event + time_t __timer; + char lut[30]; + struct tm __tm_info; + time(&__timer); + localtime_r(&__timer, &__tm_info); + strftime(lut, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); + + char* q1 = (char*)"INSERT INTO pgsql_server_aws_aurora_failovers VALUES (%d, '%s', '%s')"; + char* q2 = (char*)malloc(strlen(q1) + strlen(lut) + strlen(hse->server_id) + 32); + sprintf(q2, q1, wHG, hse->server_id, lut); + monitordb.execute(q2); + free(q2); + } else { + proxy_info("Aurora PostgreSQL: Calling update_aws_aurora_set_reader for %s\n", hse->server_id); + PgHGM->update_aws_aurora_set_reader(wHG, rHG, hse->server_id); + } + } + } + } + } +} + +/** + * @brief Aurora PostgreSQL monitoring thread for a specific hostgroup + * @details This thread periodically queries aurora_replica_status() to discover cluster topology + */ +void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { + unsigned int wHG = *(unsigned int*)arg; + unsigned int rHG = 0; + unsigned int num_hosts = 0; + unsigned int cur_host_idx = 0; + unsigned int max_lag_ms = 0; + unsigned int check_interval_ms = 0; + unsigned int check_timeout_ms = 0; + unsigned int add_lag_ms = 0; + unsigned int min_lag_ms = 0; + unsigned int lag_num_checks = 1; + + proxy_info("Started Aurora PostgreSQL Monitor thread for writer HG %u\n", wHG); + + // Quick exit checks + if (!GloPTH) return nullptr; + if (!GloPgMon) return nullptr; + + // Get monitor credentials from GloPTH + char* monitor_user = GloPTH->get_variable_string((char*)"monitor_username"); + char* monitor_pass = GloPTH->get_variable_string((char*)"monitor_password"); + + uint64_t initial_raw_checksum = 0; + + // Static array of the latest reads + unsigned int ase_idx = 0; + PgSQL_AWS_Aurora_status_entry* lasts_ase[N_L_ASE]; + for (unsigned int i = 0; i < N_L_ASE; i++) { + lasts_ase[i] = nullptr; + } + + // Initialize hpa to NULL for proper cleanup + pgsql_host_def_t* hpa = nullptr; + + // Initial data load + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + initial_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset_checksum; + + // Count the number of hosts + for (auto it = GloPgMon->AWS_Aurora_Hosts_resultset->rows.begin(); + it != GloPgMon->AWS_Aurora_Hosts_resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + if (atoi(r->fields[0]) == (int)wHG) { + num_hosts++; + if (max_lag_ms == 0) { + max_lag_ms = atoi(r->fields[5]); + } + if (check_interval_ms == 0) { + check_interval_ms = atoi(r->fields[6]); + } + if (check_timeout_ms == 0) { + check_timeout_ms = atoi(r->fields[7]); + } + if (rHG == 0) { + rHG = atoi(r->fields[1]); + } + add_lag_ms = atoi(r->fields[8]); + min_lag_ms = atoi(r->fields[9]); + lag_num_checks = atoi(r->fields[10]); + } + } + + if (num_hosts == 0) { + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + proxy_warning("Aurora PostgreSQL Monitor: No hosts found for writer HG %u\n", wHG); + // Cleanup before early return + if (monitor_user) free(monitor_user); + if (monitor_pass) free(monitor_pass); + return nullptr; + } + + hpa = (pgsql_host_def_t*)malloc(sizeof(pgsql_host_def_t) * num_hosts); + cur_host_idx = 0; + for (auto it = GloPgMon->AWS_Aurora_Hosts_resultset->rows.begin(); + it != GloPgMon->AWS_Aurora_Hosts_resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + if (atoi(r->fields[0]) == (int)wHG) { + hpa[cur_host_idx].host = strdup(r->fields[2]); + hpa[cur_host_idx].port = atoi(r->fields[3]); + hpa[cur_host_idx].use_ssl = atoi(r->fields[4]); + cur_host_idx++; + } + } + // NOTE: 'cur_host_idx' should never be higher than 'num_hosts' otherwise later an invalid memory access + // can take place later when accessing 'hpa[cur_host_idx]'. + if (cur_host_idx >= num_hosts) { + cur_host_idx = num_hosts - 1; + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + bool exit_now = false; + unsigned long long t1 = 0; + unsigned long long next_loop_at = 0; + + uint64_t current_raw_checksum = 0; + bool found_pingable_host = false; + + t1 = monotonic_time(); + unsigned long long start_time = t1; + + while (GloPgMon->shutdown == false && exit_now == false) { + t1 = monotonic_time(); + + if (!GloPTH) { + goto __exit_pgsql_monitor_AWS_Aurora_thread_HG_now; + } + + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + current_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset_checksum; + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + if (current_raw_checksum != initial_raw_checksum) { + // Content has changed, exit + exit_now = true; + break; + } + + if (t1 < next_loop_at) { + unsigned long long st = next_loop_at - t1; + if (st > 50000) { + st = 50000; + } + usleep(st); + continue; + } + + found_pingable_host = false; + + // Pick a random host + size_t rnd = (size_t)rand(); + rnd %= num_hosts; + if (GloPgMon->server_responds_to_ping(hpa[rnd].host, hpa[rnd].port)) { + found_pingable_host = true; + cur_host_idx = rnd; + } else { + // Try all hosts + shuffle_pgsql_hosts(hpa, num_hosts); + for (unsigned int i = 0; found_pingable_host == false && i < num_hosts; i++) { + if (GloPgMon->server_responds_to_ping(hpa[i].host, hpa[i].port)) { + found_pingable_host = true; + cur_host_idx = i; + } + } + } + + if (found_pingable_host == false) { + proxy_error("No node is pingable for AWS Aurora PostgreSQL cluster with writer HG %u\n", wHG); + next_loop_at = t1 + check_interval_ms * 1000; + continue; + } + + // Execute Aurora replica status query + start_time = t1; + char* error_msg = nullptr; + + // Build connection string with monitor credentials + // Note: dbname=postgres is used because aurora_replica_status() is a system function + char conninfo[1024]; + snprintf(conninfo, sizeof(conninfo), "host=%s port=%d dbname=postgres user=%s password=%s connect_timeout=%d", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, + monitor_user ? monitor_user : "", + monitor_pass ? monitor_pass : "", + check_timeout_ms / 1000); + + PGconn* conn = PQconnectdb(conninfo); + + unsigned long long t2 = monotonic_time(); + PgSQL_AWS_Aurora_status_entry* ase = nullptr; + PgSQL_AWS_Aurora_status_entry* ase_l = nullptr; + + if (PQstatus(conn) != CONNECTION_OK) { + error_msg = strdup(PQerrorMessage(conn)); + proxy_error("Aurora PostgreSQL: Connection failed for %s:%d - %s\n", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, error_msg); + ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + free(error_msg); + } else { + // Execute the aurora_replica_status() query + // Aurora PostgreSQL provides: server_id, session_id, replica_lag_in_msec + // Writer is identified by session_id = 'MASTER_SESSION_ID' + const char* query = "SELECT server_id, session_id, replica_lag_in_msec, " + "CASE WHEN session_id = 'MASTER_SESSION_ID' THEN true ELSE false END as is_writer " + "FROM aurora_replica_status()"; + + PGresult* res = PQexec(conn, query); + t2 = monotonic_time(); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) { + error_msg = strdup(PQerrorMessage(conn)); + proxy_error("Aurora PostgreSQL: Query failed for %s:%d - %s\n", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, error_msg); + ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); + free(error_msg); + } else { + unsigned long long time_now = realtime_time(); + time_now = time_now - (t2 - start_time); + ase = new PgSQL_AWS_Aurora_status_entry(time_now, t2 - start_time, nullptr); + ase_l = new PgSQL_AWS_Aurora_status_entry(time_now, t2 - start_time, nullptr); + + int nrows = PQntuples(res); + for (int i = 0; i < nrows; i++) { + char* server_id = PQgetvalue(res, i, 0); + char* session_id = PQgetvalue(res, i, 1); + char* replica_lag_str = PQgetvalue(res, i, 2); + char* is_writer_str = PQgetvalue(res, i, 3); + + float replica_lag = replica_lag_str ? atof(replica_lag_str) : 0.0f; + bool is_writer = (is_writer_str && (strcmp(is_writer_str, "t") == 0 || strcmp(is_writer_str, "true") == 0 || strcmp(is_writer_str, "1") == 0)); + + // Use session_id as last_update placeholder (not available in aurora_replica_status()) + PgSQL_AWS_Aurora_replica_host_status_entry* arhse = + new PgSQL_AWS_Aurora_replica_host_status_entry(server_id, session_id, session_id, replica_lag, is_writer); + ase->add_host_status(arhse); + + PgSQL_AWS_Aurora_replica_host_status_entry* arhse_l = + new PgSQL_AWS_Aurora_replica_host_status_entry(server_id, session_id, session_id, replica_lag, is_writer); + ase_l->add_host_status(arhse_l); + } + } + PQclear(res); + } + PQfinish(conn); + + // Process results + if (lasts_ase[ase_idx]) { + delete lasts_ase[ase_idx]; + } + lasts_ase[ase_idx] = ase_l; + + GloPgMon->evaluate_pgsql_aws_aurora_results(wHG, rHG, &lasts_ase[0], ase_idx, max_lag_ms, add_lag_ms, min_lag_ms, lag_num_checks); + + // Copy estimated_lag_ms from ase_l to ase + for (auto h : *(ase_l->host_statuses)) { + for (auto h2 : *(ase->host_statuses)) { + if (strcmp(h2->server_id, h->server_id) == 0) { + h2->estimated_lag_ms = h->estimated_lag_ms; + } + } + } + + ase_idx++; + if (ase_idx == N_L_ASE) { + ase_idx = 0; + } + + // Store in Aurora hosts map for monitoring statistics + if (GloPgMon && ase && hpa && cur_host_idx < num_hosts && hpa[cur_host_idx].host) { + std::string key = std::string(hpa[cur_host_idx].host) + ":" + std::to_string(hpa[cur_host_idx].port); + + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + auto it2 = GloPgMon->AWS_Aurora_Hosts_Map.find(key); + PgSQL_AWS_Aurora_monitor_node* node = nullptr; + if (it2 != GloPgMon->AWS_Aurora_Hosts_Map.end()) { + node = it2->second; + node->add_entry(ase); + } else { + node = new PgSQL_AWS_Aurora_monitor_node(hpa[cur_host_idx].host, hpa[cur_host_idx].port, wHG); + node->add_entry(ase); + GloPgMon->AWS_Aurora_Hosts_Map.insert(std::make_pair(key, node)); + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + } else if (ase) { + // If we can't store it, delete to prevent memory leak + delete ase; + ase = nullptr; + } + + next_loop_at = t1 + (check_interval_ms * 1000); + } + +__exit_pgsql_monitor_AWS_Aurora_thread_HG_now: + // Cleanup + if (monitor_user) free(monitor_user); + if (monitor_pass) free(monitor_pass); + + if (hpa) { + for (unsigned int i = 0; i < num_hosts; i++) { + if (hpa[i].host) { + free(hpa[i].host); + } + } + free(hpa); + } + + for (unsigned int i = 0; i < N_L_ASE; i++) { + if (lasts_ase[i]) { + delete lasts_ase[i]; + } + } + + proxy_info("Stopping Aurora PostgreSQL Monitor thread for writer HG %u\n", wHG); + return nullptr; +} + +/** + * @brief Main Aurora PostgreSQL monitoring function + * @details Spawns per-hostgroup monitoring threads when Aurora hostgroups are configured + */ +void* PgSQL_monitor_aws_aurora(void* arg) { + (void)arg; // unused + if (!GloPgMon) return nullptr; + + uint64_t last_raw_checksum = 0; + unsigned int* hgs_array = nullptr; + pthread_t* pthreads_array = nullptr; + unsigned int hgs_num = 0; + + proxy_info("Started Aurora PostgreSQL Monitor main thread\n"); + + while (GloPgMon->shutdown == false) { + if (!GloPTH) return nullptr; + + // Check if list of servers or HG or options has changed + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + uint64_t new_raw_checksum = 0; + if (GloPgMon->AWS_Aurora_Hosts_resultset) { + new_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset->raw_checksum(); + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + if (new_raw_checksum != last_raw_checksum) { + proxy_info("Aurora PostgreSQL: Detected new/changed definition for monitoring\n"); + last_raw_checksum = new_raw_checksum; + + if (pthreads_array) { + // Wait for all threads to terminate + for (unsigned int i = 0; i < hgs_num; i++) { + pthread_join(pthreads_array[i], nullptr); + proxy_info("Stopped Aurora PostgreSQL Monitor thread for writer HG %u\n", hgs_array[i]); + } + free(pthreads_array); + free(hgs_array); + pthreads_array = nullptr; + hgs_array = nullptr; + hgs_num = 0; + } + + // Count unique writer hostgroups + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + if (GloPgMon->AWS_Aurora_Hosts_resultset && GloPgMon->AWS_Aurora_Hosts_resultset->rows_count) { + std::map unique_whgs; + for (auto it = GloPgMon->AWS_Aurora_Hosts_resultset->rows.begin(); + it != GloPgMon->AWS_Aurora_Hosts_resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + unsigned int whg = atoi(r->fields[0]); + unique_whgs[whg] = true; + } + hgs_num = unique_whgs.size(); + if (hgs_num) { + proxy_info("Activating Monitoring of %u AWS Aurora PostgreSQL clusters\n", hgs_num); + hgs_array = (unsigned int*)malloc(sizeof(unsigned int) * hgs_num); + pthreads_array = (pthread_t*)malloc(sizeof(pthread_t) * hgs_num); + unsigned int idx = 0; + for (auto& it : unique_whgs) { + hgs_array[idx] = it.first; + idx++; + } + } + } + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); + + // Start threads for each writer hostgroup + for (unsigned int i = 0; i < hgs_num; i++) { + proxy_info("Starting Monitor thread for AWS Aurora PostgreSQL writer HG %u\n", hgs_array[i]); + if (pthread_create(&pthreads_array[i], nullptr, PgSQL_monitor_AWS_Aurora_thread_HG, &hgs_array[i]) != 0) { + proxy_error("Thread creation failed for AWS Aurora PostgreSQL writer HG %u\n", hgs_array[i]); + } + } + } + + usleep(500000); // 500ms + } + + // Cleanup on shutdown + if (pthreads_array) { + for (unsigned int i = 0; i < hgs_num; i++) { + pthread_join(pthreads_array[i], nullptr); + } + free(pthreads_array); + free(hgs_array); + } + + proxy_info("Stopping Aurora PostgreSQL Monitor main thread\n"); + return nullptr; +} diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index daebe79ced..fa2bff8c13 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -147,7 +147,7 @@ static const vector pgsql_servers_tablenames = { "pgsql_replication_hostgroups", // "pgsql_group_replication_hostgroups", // "pgsql_galera_hostgroups", -// "pgsql_aws_aurora_hostgroups", + "pgsql_aws_aurora_hostgroups", "pgsql_hostgroup_attributes", }; @@ -872,11 +872,13 @@ incoming_pgsql_servers_t::incoming_pgsql_servers_t() {} incoming_pgsql_servers_t::incoming_pgsql_servers_t( SQLite3_result* incoming_pgsql_servers_v2, SQLite3_result* incoming_replication_hostgroups, + SQLite3_result* incoming_aurora_hostgroups, SQLite3_result* incoming_hostgroup_attributes, SQLite3_result* runtime_pgsql_servers ) : incoming_pgsql_servers_v2(incoming_pgsql_servers_v2), incoming_replication_hostgroups(incoming_replication_hostgroups), + incoming_aurora_hostgroups(incoming_aurora_hostgroups), incoming_hostgroup_attributes(incoming_hostgroup_attributes), runtime_pgsql_servers(runtime_pgsql_servers) {} @@ -7435,10 +7437,12 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_ SQLite3_result* resultset = NULL; SQLite3_result* resultset_servers = NULL; SQLite3_result* resultset_replication = NULL; + SQLite3_result* resultset_aws_aurora = NULL; SQLite3_result* resultset_hostgroup_attributes = NULL; SQLite3_result* runtime_pgsql_servers = incoming_pgsql_servers.runtime_pgsql_servers; SQLite3_result* incoming_replication_hostgroups = incoming_pgsql_servers.incoming_replication_hostgroups; + SQLite3_result* incoming_aurora_hostgroups = incoming_pgsql_servers.incoming_aurora_hostgroups; SQLite3_result* incoming_hostgroup_attributes = incoming_pgsql_servers.incoming_hostgroup_attributes; SQLite3_result* incoming_pgsql_servers_v2 = incoming_pgsql_servers.incoming_pgsql_servers_v2; @@ -7500,6 +7504,39 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_ //if (resultset) delete resultset; //resultset=NULL; + // support for AWS Aurora PostgreSQL, table pgsql_aws_aurora_hostgroups + + // look for invalid combinations + query = (char*)"SELECT a.* FROM pgsql_aws_aurora_hostgroups a JOIN pgsql_aws_aurora_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } + else { + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + proxy_error("Incompatible entry in pgsql_aws_aurora_hostgroups will be ignored : ( %s , %s , %s , %s )\n", r->fields[0], r->fields[1], r->fields[2], r->fields[3]); + } + } + if (resultset) delete resultset; + resultset = NULL; + + query = (char*)"SELECT a.* FROM pgsql_aws_aurora_hostgroups a LEFT JOIN pgsql_aws_aurora_hostgroups b ON (a.writer_hostgroup=b.reader_hostgroup) WHERE b.reader_hostgroup IS NULL ORDER BY writer_hostgroup"; + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + if (incoming_aurora_hostgroups == nullptr) { + admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset_aws_aurora); + } + else { + resultset_aws_aurora = incoming_aurora_hostgroups; + } + if (error) { + proxy_error("Error on %s : %s\n", query, error); + } + else { + // Pass the resultset to PgHGM + PgHGM->save_incoming_pgsql_table(resultset_aws_aurora, "pgsql_aws_aurora_hostgroups"); + } // support for hostgroup attributes, table pgsql_hostgroup_attributes query = (char*)"SELECT * FROM pgsql_hostgroup_attributes ORDER BY hostgroup_id"; @@ -7535,6 +7572,10 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_ delete resultset_replication; resultset_replication = NULL; } + if (resultset_aws_aurora) { + //delete resultset_aws_aurora; // do not delete, resultset is stored in PgHGM + resultset_aws_aurora = NULL; + } if (resultset_hostgroup_attributes) { resultset_hostgroup_attributes = NULL; } diff --git a/lib/ProxySQL_Config.cpp b/lib/ProxySQL_Config.cpp index 1a0aecdc60..c19debd958 100644 --- a/lib/ProxySQL_Config.cpp +++ b/lib/ProxySQL_Config.cpp @@ -1786,6 +1786,62 @@ int ProxySQL_Config::Read_PgSQL_Servers_from_configfile(std::string& error) { rows++; } } + // AWS Aurora PostgreSQL + if (root.exists("pgsql_aws_aurora_hostgroups")==true) { + const Setting &pgsql_aws_aurora_hostgroups = root["pgsql_aws_aurora_hostgroups"]; + int count = pgsql_aws_aurora_hostgroups.getLength(); + char *q=(char *)"INSERT OR REPLACE INTO pgsql_aws_aurora_hostgroups (writer_hostgroup, reader_hostgroup, active, aurora_port, domain_name, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment ) VALUES (%d, %d, %d, %d, '%s', %d, %d, %d, %d, %d, %d, %d, %d, '%s')"; + for (i=0; i< count; i++) { + const Setting &line = pgsql_aws_aurora_hostgroups[i]; + int writer_hostgroup; + int reader_hostgroup; + int active=1; // default + int aurora_port; + int max_lag_ms; + int add_lag_ms; + int min_lag_ms; + int lag_num_checks; + int check_interval_ms; + int check_timeout_ms; + int writer_is_also_reader; + int new_reader_weight; + std::string comment=""; + std::string domain_name=""; + if (line.lookupValue("writer_hostgroup", writer_hostgroup)==false) { + proxy_error("Admin: detected a pgsql_aws_aurora_hostgroups in config file without a mandatory writer_hostgroup\n"); + continue; + } + if (line.lookupValue("reader_hostgroup", reader_hostgroup)==false) { + proxy_error("Admin: detected a pgsql_aws_aurora_hostgroups in config file without a mandatory reader_hostgroup\n"); + continue; + } + if (line.lookupValue("aurora_port", aurora_port)==false) aurora_port=5432; + if (line.lookupValue("max_lag_ms", max_lag_ms)==false) max_lag_ms=600000; + if (line.lookupValue("check_interval_ms", check_interval_ms)==false) check_interval_ms=1000; + if (line.lookupValue("check_timeout_ms", check_timeout_ms)==false) check_timeout_ms=800; + if (line.lookupValue("writer_is_also_reader", writer_is_also_reader)==false) writer_is_also_reader=0; + if (line.lookupValue("new_reader_weight", new_reader_weight)==false) new_reader_weight=1; + if (line.lookupValue("add_lag_ms", add_lag_ms)==false) add_lag_ms=30; + if (line.lookupValue("min_lag_ms", min_lag_ms)==false) min_lag_ms=30; + if (line.lookupValue("lag_num_checks", lag_num_checks)==false) lag_num_checks=1; + line.lookupValue("active", active); + line.lookupValue("comment", comment); + line.lookupValue("domain_name", domain_name); + char *o1=strdup(comment.c_str()); + char *o=escape_string_single_quotes(o1, false); + char *p1=strdup(domain_name.c_str()); + char *p=escape_string_single_quotes(p1, false); + char *query=(char *)malloc(strlen(q)+strlen(o)+strlen(p)+256); + sprintf(query,q, writer_hostgroup, reader_hostgroup, active, aurora_port, p, max_lag_ms, check_interval_ms, check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, o); + admindb->execute(query); + if (o!=o1) free(o); + free(o1); + if (p!=p1) free(p); + free(p1); + free(query); + rows++; + } + } admindb->execute("PRAGMA foreign_keys = ON"); return rows; } From 9b04130b706509d553f299016f1cbd209d46e319 Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Wed, 24 Dec 2025 15:50:20 +0900 Subject: [PATCH 2/7] Add monitoring table population and runtime table dump for Aurora PostgreSQL - Add populate_monitor_pgsql_server_aws_aurora_log() to display aurora log entries - Add populate_monitor_pgsql_server_aws_aurora_check_status() to display check status - Add pgsql_aws_aurora_hostgroups support in dump_table_pgsql() - Add runtime_pgsql_aws_aurora_hostgroups dump in save_pgsql_servers_runtime_to_database() - Remove verbose error logging to match MySQL Aurora behavior pattern --- include/PgSQL_Monitor.hpp | 4 ++ lib/PgSQL_HostGroups_Manager.cpp | 5 +- lib/PgSQL_Monitor.cpp | 115 +++++++++++++++++++++++++++++-- lib/ProxySQL_Admin.cpp | 73 ++++++++++++++++++++ 4 files changed, 192 insertions(+), 5 deletions(-) diff --git a/include/PgSQL_Monitor.hpp b/include/PgSQL_Monitor.hpp index 4ef4b3d4cd..e71fa30d90 100644 --- a/include/PgSQL_Monitor.hpp +++ b/include/PgSQL_Monitor.hpp @@ -108,6 +108,10 @@ struct PgSQL_Monitor { PgSQL_AWS_Aurora_status_entry** lasts_ase, unsigned int ase_idx, unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks); bool server_responds_to_ping(const char* addr, int port); + + // Populate AWS Aurora monitoring tables + void populate_monitor_pgsql_server_aws_aurora_log(); + void populate_monitor_pgsql_server_aws_aurora_check_status(); }; struct pgsql_conn_t { diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 55239fe778..4205522d76 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -1831,7 +1831,10 @@ void PgSQL_HostGroups_Manager::update_table_pgsql_servers_for_monitor(bool lock) SQLite3_result * PgSQL_HostGroups_Manager::dump_table_pgsql(const string& name) { char * query = (char *)""; - if (name == "pgsql_replication_hostgroups") { + if (name == "pgsql_aws_aurora_hostgroups") { + query=(char *)"SELECT writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms," + "check_interval_ms,check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment FROM pgsql_aws_aurora_hostgroups"; + } else if (name == "pgsql_replication_hostgroups") { query=(char *)"SELECT writer_hostgroup, reader_hostgroup, check_type, comment FROM pgsql_replication_hostgroups"; } else if (name == "pgsql_hostgroup_attributes") { query=(char *)"SELECT hostgroup_id, max_num_online_servers, autocommit, free_connections_pct, init_connect, multiplex, connection_warming, throttle_connections_per_sec, ignore_session_variables, hostgroup_settings, servers_defaults, comment FROM pgsql_hostgroup_attributes ORDER BY hostgroup_id"; diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 27654e24d5..304b7ae039 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -2708,8 +2708,7 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { if (PQstatus(conn) != CONNECTION_OK) { error_msg = strdup(PQerrorMessage(conn)); - proxy_error("Aurora PostgreSQL: Connection failed for %s:%d - %s\n", - hpa[cur_host_idx].host, hpa[cur_host_idx].port, error_msg); + // Note: Not logging here to match MySQL behavior - errors are stored in status entry ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); free(error_msg); @@ -2726,8 +2725,7 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { if (PQresultStatus(res) != PGRES_TUPLES_OK) { error_msg = strdup(PQerrorMessage(conn)); - proxy_error("Aurora PostgreSQL: Query failed for %s:%d - %s\n", - hpa[cur_host_idx].host, hpa[cur_host_idx].port, error_msg); + // Note: Not logging here to match MySQL behavior - errors are stored in status entry ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); free(error_msg); @@ -2923,3 +2921,112 @@ void* PgSQL_monitor_aws_aurora(void* arg) { proxy_info("Stopping Aurora PostgreSQL Monitor main thread\n"); return nullptr; } + +void PgSQL_Monitor::populate_monitor_pgsql_server_aws_aurora_log() { + SQLite3DB* db = &monitordb; + int rc; + char *query1 = nullptr; + query1 = (char *)"INSERT OR IGNORE INTO pgsql_server_aws_aurora_log VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)"; + sqlite3_stmt *statement1 = nullptr; + char *query2 = nullptr; + query2 = (char *)"INSERT OR IGNORE INTO pgsql_server_aws_aurora_log (hostname, port, time_start_us, success_time_us, error) VALUES (?1, ?2, ?3, ?4, ?5)"; + sqlite3_stmt *statement2 = nullptr; + rc = db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, db); + rc = db->prepare_v2(query2, &statement2); + ASSERT_SQLITE_OK(rc, db); + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + db->execute((char *)"DELETE FROM pgsql_server_aws_aurora_log"); + std::map::iterator it2; + PgSQL_AWS_Aurora_monitor_node *node = nullptr; + for (it2 = GloPgMon->AWS_Aurora_Hosts_Map.begin(); it2 != GloPgMon->AWS_Aurora_Hosts_Map.end(); ++it2) { + std::string s = it2->first; + node = it2->second; + std::size_t found = s.find_last_of(":"); + std::string host = s.substr(0, found); + std::string port = s.substr(found + 1); + int i; + for (i = 0; i < PGSQL_AWS_Aurora_Nentries; i++) { + PgSQL_AWS_Aurora_status_entry *aase = node->last_entries[i]; + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (std::vector::iterator it3 = aase->host_statuses->begin(); it3 != aase->host_statuses->end(); ++it3) { + PgSQL_AWS_Aurora_replica_host_status_entry *hse = *it3; + if (hse) { + rc = (*proxy_sqlite3_bind_text)(statement1, 1, host.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 2, atoi(port.c_str())); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 3, aase->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 4, aase->check_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 5, aase->error, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 6, hse->server_id, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 7, hse->session_id, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 8, hse->last_update_timestamp, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_double)(statement1, 9, hse->replica_lag_ms); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 10, hse->estimated_lag_ms); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + } + } else { + rc = (*proxy_sqlite3_bind_text)(statement2, 1, host.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement2, 2, atoi(port.c_str())); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement2, 3, aase->start_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement2, 4, aase->check_time); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement2, 5, aase->error, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement2); + rc = (*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, db); + } + } + } + } + (*proxy_sqlite3_finalize)(statement1); + (*proxy_sqlite3_finalize)(statement2); + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); +} + +void PgSQL_Monitor::populate_monitor_pgsql_server_aws_aurora_check_status() { + SQLite3DB* db = &monitordb; + int rc; + char *query1 = nullptr; + query1 = (char *)"INSERT OR IGNORE INTO pgsql_server_aws_aurora_check_status VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"; + sqlite3_stmt *statement1 = nullptr; + rc = db->prepare_v2(query1, &statement1); + ASSERT_SQLITE_OK(rc, db); + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); + db->execute((char *)"DELETE FROM pgsql_server_aws_aurora_check_status"); + std::map::iterator it2; + PgSQL_AWS_Aurora_monitor_node *node = nullptr; + for (it2 = GloPgMon->AWS_Aurora_Hosts_Map.begin(); it2 != GloPgMon->AWS_Aurora_Hosts_Map.end(); ++it2) { + std::string s = it2->first; + node = it2->second; + std::size_t found = s.find_last_of(":"); + std::string host = s.substr(0, found); + std::string port = s.substr(found + 1); + PgSQL_AWS_Aurora_status_entry *aase = node->last_entry(); + char *error_msg = nullptr; + if (aase && aase->start_time) { + if (aase->error) { + error_msg = aase->error; + } + } + char lut[30]; + struct tm __tm_info; + localtime_r(&node->last_checked_at, &__tm_info); + strftime(lut, 25, "%Y-%m-%d %H:%M:%S", &__tm_info); + + rc = (*proxy_sqlite3_bind_int64)(statement1, 1, node->writer_hostgroup); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 2, host.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 3, atoi(port.c_str())); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 4, lut, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 5, node->num_checks_tot); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_int64)(statement1, 6, node->num_checks_ok); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_bind_text)(statement1, 7, error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, db); + SAFE_SQLITE3_STEP2(statement1); + rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, db); + rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, db); + } + (*proxy_sqlite3_finalize)(statement1); + pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); +} diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index fa2bff8c13..f34c05a97e 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -42,6 +42,7 @@ using json = nlohmann::json; #include "ProxySQL_Statistics.hpp" #include "MySQL_Logger.hpp" #include "PgSQL_Logger.hpp" +#include "PgSQL_Monitor.hpp" #include "SQLite3_Server.h" #include "Web_Interface.hpp" @@ -322,6 +323,7 @@ extern MySQL_Logger *GloMyLogger; extern PgSQL_Logger* GloPgSQL_Logger; extern MySQL_STMT_Manager_v14 *GloMyStmt; extern MySQL_Monitor *GloMyMon; +extern PgSQL_Monitor* GloPgMon; extern PgSQL_Threads_Handler* GloPTH; extern void (*flush_logs_function)(); @@ -1207,6 +1209,9 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign bool monitor_mysql_server_aws_aurora_log=false; bool monitor_mysql_server_aws_aurora_check_status=false; + bool monitor_pgsql_server_aws_aurora_log=false; + bool monitor_pgsql_server_aws_aurora_check_status=false; + bool stats_proxysql_servers_checksums = false; bool stats_proxysql_servers_metrics = false; bool stats_proxysql_message_metrics = false; @@ -1384,6 +1389,8 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign || strstr(query_no_space, "runtime_pgsql_replication_hostgroups") || + strstr(query_no_space, "runtime_pgsql_aws_aurora_hostgroups") + || strstr(query_no_space, "runtime_pgsql_hostgroup_attributes") ) { runtime_pgsql_servers = true; refresh = true; @@ -1467,6 +1474,12 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign if (strstr(query_no_space,"mysql_server_aws_aurora_check_status")) { monitor_mysql_server_aws_aurora_check_status=true; refresh=true; } + if (strstr(query_no_space,"pgsql_server_aws_aurora_log")) { + monitor_pgsql_server_aws_aurora_log=true; refresh=true; + } + if (strstr(query_no_space,"pgsql_server_aws_aurora_check_status")) { + monitor_pgsql_server_aws_aurora_check_status=true; refresh=true; + } // if (stats_mysql_processlist || stats_mysql_connection_pool || stats_mysql_query_digest || stats_mysql_query_digest_reset) { if (refresh==true) { //pthread_mutex_lock(&admin_mutex); @@ -1687,6 +1700,16 @@ bool ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign GloMyMon->populate_monitor_mysql_server_aws_aurora_check_status(); } } + if (monitor_pgsql_server_aws_aurora_log) { + if (GloPgMon) { + GloPgMon->populate_monitor_pgsql_server_aws_aurora_log(); + } + } + if (monitor_pgsql_server_aws_aurora_check_status) { + if (GloPgMon) { + GloPgMon->populate_monitor_pgsql_server_aws_aurora_check_status(); + } + } //pthread_mutex_unlock(&admin_mutex); } @@ -7167,6 +7190,56 @@ void ProxySQL_Admin::save_pgsql_servers_runtime_to_database(bool _runtime) { } if (resultset) delete resultset; resultset = NULL; + + // dump pgsql_aws_aurora_hostgroups + + if (_runtime) { + query = (char*)"DELETE FROM main.runtime_pgsql_aws_aurora_hostgroups"; + } else { + query = (char*)"DELETE FROM main.pgsql_aws_aurora_hostgroups"; + } + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); + admindb->execute(query); + resultset = PgHGM->dump_table_pgsql("pgsql_aws_aurora_hostgroups"); + if (resultset) { + int rc; + sqlite3_stmt* statement = NULL; + + char* q = NULL; + if (_runtime) { + q = (char*)"INSERT INTO runtime_pgsql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms,check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + } else { + q = (char*)"INSERT INTO pgsql_aws_aurora_hostgroups(writer_hostgroup,reader_hostgroup,active,aurora_port,domain_name,max_lag_ms,check_interval_ms,check_timeout_ms,writer_is_also_reader,new_reader_weight,add_lag_ms,min_lag_ms,lag_num_checks,comment) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)"; + } + + rc = admindb->prepare_v2(q, &statement); + ASSERT_SQLITE_OK(rc, admindb); + + for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { + SQLite3_row* r = *it; + rc = (*proxy_sqlite3_bind_int64)(statement, 1, atoi(r->fields[0])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 2, atoi(r->fields[1])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 3, atoi(r->fields[2])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 4, atoi(r->fields[3])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_text)(statement, 5, r->fields[4], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 6, atoi(r->fields[5])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 7, atoi(r->fields[6])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 8, atoi(r->fields[7])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 9, atoi(r->fields[8])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 10, atoi(r->fields[9])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 11, atoi(r->fields[10])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 12, atoi(r->fields[11])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_int64)(statement, 13, atoi(r->fields[12])); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_bind_text)(statement, 14, r->fields[13], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb); + + SAFE_SQLITE3_STEP2(statement); + rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, admindb); + rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, admindb); + } + (*proxy_sqlite3_finalize)(statement); + } + if (resultset) delete resultset; + resultset = NULL; } From eb4e90258a07e45de10eb97fbd3a9f9e9434f656 Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Tue, 30 Dec 2025 15:34:00 +0900 Subject: [PATCH 3/7] Add connection pool for Aurora PostgreSQL monitoring - Add PgSQL_Monitor_Connection_Pool class (equivalent to MySQL_Monitor_Connection_Pool) - Add MonPgSrvC class for per-server connection management - Implement get_connection/put_connection for connection reuse - Add purge_some_connections to limit connections per server (max 4) - Add purge_all_connections for cleanup on shutdown - Wrap verbose logging with #ifdef TEST_AURORA guards - Add print_pgsql_aws_aurora_status_entry for TEST_AURORA debugging - Add action counting (action_yes/no, enabling/disabling) in evaluate function --- include/PgSQL_Monitor.hpp | 2 + lib/PgSQL_Monitor.cpp | 233 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 223 insertions(+), 12 deletions(-) diff --git a/include/PgSQL_Monitor.hpp b/include/PgSQL_Monitor.hpp index e71fa30d90..897fbabe1e 100644 --- a/include/PgSQL_Monitor.hpp +++ b/include/PgSQL_Monitor.hpp @@ -34,6 +34,7 @@ // Forward declarations class PgSQL_AWS_Aurora_monitor_node; class PgSQL_AWS_Aurora_status_entry; +class PgSQL_Monitor_Connection_Pool; struct PgSQL_Monitor { // @brief Flags if monitoring threads should be shutdown. @@ -96,6 +97,7 @@ struct PgSQL_Monitor { SQLite3_result* AWS_Aurora_Hosts_resultset; uint64_t AWS_Aurora_Hosts_resultset_checksum; std::map AWS_Aurora_Hosts_Map; + PgSQL_Monitor_Connection_Pool* My_Conn_Pool; // Connection pool for Aurora monitoring /////////////////////////////////////////////////////////////////////////// PgSQL_Monitor(); diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 304b7ae039..72a8832d80 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -100,11 +100,145 @@ void check_and_build_standard_tables(SQLite3DB& db, const vector& t db.execute("PRAGMA foreign_keys = ON"); } +/** + * @brief Server container for PostgreSQL Monitor connection pool + * @details Holds connections per server (hostname:port) for reuse + * Equivalent to MySQL's MonMySrvC class + */ +class MonPgSrvC { +public: + char* address; + uint16_t port; + std::unique_ptr conns; + MonPgSrvC(char* a, uint16_t p) { + address = strdup(a); + port = p; + conns = std::unique_ptr(new PtrArray()); + }; + ~MonPgSrvC() { + free(address); + if (conns) { + while (conns->len) { + PGconn* pg = static_cast(conns->index(0)); + if (pg) { + PQfinish(pg); + pg = nullptr; + } + conns->remove_index_fast(0); + } + } + } +}; + +/** + * @brief Connection pool for PostgreSQL Aurora monitoring + * @details Equivalent to MySQL_Monitor_Connection_Pool + * Pools connections for Aurora health checks to reduce connection overhead + */ +class PgSQL_Monitor_Connection_Pool { +private: + std::mutex mutex; + std::unique_ptr servers; +public: + PGconn* get_connection(char* hostname, int port); + void put_connection(char* hostname, int port, PGconn* pg); + void purge_some_connections(); + void purge_all_connections(); + PgSQL_Monitor_Connection_Pool() { + servers = std::unique_ptr(new PtrArray()); + }; + ~PgSQL_Monitor_Connection_Pool() { + purge_all_connections(); + } +}; + +PGconn* PgSQL_Monitor_Connection_Pool::get_connection(char* hostname, int port) { + std::lock_guard lock(mutex); + PGconn* pg = nullptr; + + for (unsigned int i = 0; i < servers->len; i++) { + MonPgSrvC* srv = (MonPgSrvC*)servers->index(i); + if (srv->port == port && strcmp(hostname, srv->address) == 0) { + if (srv->conns->len) { + while (srv->conns->len) { + unsigned int idx = rand() % srv->conns->len; + PGconn* pgconn = (PGconn*)srv->conns->remove_index_fast(idx); + + if (!pgconn) continue; + + // Check if connection is still alive + if (PQstatus(pgconn) != CONNECTION_OK) { + PQfinish(pgconn); + continue; + } + + pg = pgconn; + break; + } + } + return pg; + } + } + return pg; +} + +void PgSQL_Monitor_Connection_Pool::put_connection(char* hostname, int port, PGconn* pg) { + std::lock_guard lock(mutex); + for (unsigned int i = 0; i < servers->len; i++) { + MonPgSrvC* srv = (MonPgSrvC*)servers->index(i); + if (srv->port == port && strcmp(hostname, srv->address) == 0) { + srv->conns->add(pg); + return; + } + } + // if no server was found + MonPgSrvC* srv = new MonPgSrvC(hostname, port); + srv->conns->add(pg); + servers->add(srv); +} + +void PgSQL_Monitor_Connection_Pool::purge_some_connections() { + std::lock_guard lock(mutex); + for (unsigned int i = 0; i < servers->len; i++) { + MonPgSrvC* srv = (MonPgSrvC*)servers->index(i); + // Keep at most 4 connections per server (same as MySQL) + while (srv->conns->len > 4) { + PGconn* pg = (PGconn*)srv->conns->remove_index_fast(0); + if (pg) { + PQfinish(pg); + } + } + // Also check connection status and close dead connections + for (unsigned int j = 0; j < srv->conns->len; j++) { + PGconn* pg = (PGconn*)srv->conns->index(j); + if (pg && PQstatus(pg) != CONNECTION_OK) { + srv->conns->remove_index_fast(j); + PQfinish(pg); + j--; // Recheck this index + } + } + } +} + +void PgSQL_Monitor_Connection_Pool::purge_all_connections() { + std::lock_guard lock(mutex); + if (servers) { + while (servers->len) { + MonPgSrvC* srv = static_cast(servers->index(0)); + if (srv) { + delete srv; + } + servers->remove_index_fast(0); + } + } +} + PgSQL_Monitor::PgSQL_Monitor() { // Initialize Aurora mutex and members like MySQL does pthread_mutex_init(&aws_aurora_mutex, NULL); AWS_Aurora_Hosts_resultset = nullptr; AWS_Aurora_Hosts_resultset_checksum = 0; + My_Conn_Pool = new PgSQL_Monitor_Connection_Pool(); int rc = monitordb.open( const_cast("file:mem_monitordb?mode=memory&cache=shared"), @@ -144,6 +278,11 @@ PgSQL_Monitor::~PgSQL_Monitor() { delete it.second; } AWS_Aurora_Hosts_Map.clear(); + // Clean up connection pool + if (My_Conn_Pool) { + delete My_Conn_Pool; + My_Conn_Pool = nullptr; + } } bool PgSQL_Monitor::server_responds_to_ping(const char* addr, int port) { @@ -2433,10 +2572,45 @@ static void shuffle_pgsql_hosts(pgsql_host_def_t* arr, unsigned int n) { } } +#ifdef TEST_AURORA +static void print_pgsql_aws_aurora_status_entry(PgSQL_AWS_Aurora_status_entry* aase) { + if (aase && aase->start_time) { + if (aase->host_statuses->size()) { + for (PgSQL_AWS_Aurora_replica_host_status_entry* hse : *aase->host_statuses) { + if (hse) { + fprintf(stderr, "%s %s %s %f %u\n", hse->server_id, hse->session_id, + hse->last_update_timestamp, hse->replica_lag_ms, hse->estimated_lag_ms); + } + } + } + } +} +#endif // TEST_AURORA + void PgSQL_Monitor::evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned int rHG, PgSQL_AWS_Aurora_status_entry** lasts_ase, unsigned int ase_idx, unsigned int max_latency_ms, unsigned int add_lag_ms, unsigned int min_lag_ms, unsigned int lag_num_checks) { - +#ifdef TEST_AURORA + unsigned int i = 0; + bool verbose = false; + unsigned int action_yes = 0; + unsigned int action_no = 0; + unsigned int enabling = 0; + unsigned int disabling = 0; + if (rand() % 500 == 0) { + verbose = true; + bool ev = false; + if (rand() % 1000 == 0) { + ev = true; + } + for (i = 0; i < N_L_ASE; i++) { + PgSQL_AWS_Aurora_status_entry* aase_tmp = lasts_ase[i]; + if (ev == true || i == ase_idx) { + print_pgsql_aws_aurora_status_entry(aase_tmp); + } + } + } +#endif // TEST_AURORA unsigned int prev_ase_idx = ase_idx; if (prev_ase_idx == 0) prev_ase_idx = N_L_ASE; prev_ase_idx--; @@ -2495,8 +2669,17 @@ void PgSQL_Monitor::evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned } if (run_action) { +#ifdef TEST_AURORA + action_yes++; + (enable ? enabling++ : disabling++); + rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer, verbose); +#else rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer); +#endif // TEST_AURORA } else { +#ifdef TEST_AURORA + action_no++; +#endif // TEST_AURORA if (is_writer) { // If the server is a writer we run it anyway for sanity check rla_rc = PgHGM->aws_aurora_replication_lag_action(wHG, rHG, hse->server_id, current_lag_ms, enable, is_writer); @@ -2506,7 +2689,9 @@ void PgSQL_Monitor::evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned if (rla_rc == false) { if (is_writer) { // The server should be a writer but is not configured as one +#ifdef TEST_AURORA proxy_info("Aurora PostgreSQL: Calling update_aws_aurora_set_writer for %s\n", hse->server_id); +#endif // TEST_AURORA PgHGM->update_aws_aurora_set_writer(wHG, rHG, hse->server_id); // Log failover event @@ -2523,13 +2708,20 @@ void PgSQL_Monitor::evaluate_pgsql_aws_aurora_results(unsigned int wHG, unsigned monitordb.execute(q2); free(q2); } else { +#ifdef TEST_AURORA proxy_info("Aurora PostgreSQL: Calling update_aws_aurora_set_reader for %s\n", hse->server_id); +#endif // TEST_AURORA PgHGM->update_aws_aurora_set_reader(wHG, rHG, hse->server_id); } } } } } +#ifdef TEST_AURORA + if (verbose) { + proxy_info("Aurora PostgreSQL replication_lag_actions: YES=%u , NO=%u , enabling=%u , disabling=%u\n", action_yes, action_no, enabling, disabling); + } +#endif // TEST_AURORA } /** @@ -2691,16 +2883,22 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { start_time = t1; char* error_msg = nullptr; - // Build connection string with monitor credentials - // Note: dbname=postgres is used because aurora_replica_status() is a system function - char conninfo[1024]; - snprintf(conninfo, sizeof(conninfo), "host=%s port=%d dbname=postgres user=%s password=%s connect_timeout=%d", - hpa[cur_host_idx].host, hpa[cur_host_idx].port, - monitor_user ? monitor_user : "", - monitor_pass ? monitor_pass : "", - check_timeout_ms / 1000); - - PGconn* conn = PQconnectdb(conninfo); + // Try to get connection from pool first (crc=false means from pool, crc=true means new connection) + // Note: crc is kept for MySQL parity and potential future use (e.g., connection timeout tracking) + bool crc __attribute__((unused)) = false; + PGconn* conn = GloPgMon->My_Conn_Pool->get_connection(hpa[cur_host_idx].host, hpa[cur_host_idx].port); + if (!conn) { + // Build connection string with monitor credentials + // Note: dbname=postgres is used because aurora_replica_status() is a system function + char conninfo[1024]; + snprintf(conninfo, sizeof(conninfo), "host=%s port=%d dbname=postgres user=%s password=%s connect_timeout=%d", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, + monitor_user ? monitor_user : "", + monitor_pass ? monitor_pass : "", + check_timeout_ms / 1000); + conn = PQconnectdb(conninfo); + crc = true; // Mark as new connection + } unsigned long long t2 = monotonic_time(); PgSQL_AWS_Aurora_status_entry* ase = nullptr; @@ -2754,10 +2952,21 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { new PgSQL_AWS_Aurora_replica_host_status_entry(server_id, session_id, session_id, replica_lag, is_writer); ase_l->add_host_status(arhse_l); } + // Query succeeded, return connection to pool + // Note: MySQL distinguishes between pool connections (crc=false) and new connections (crc=true) + // with set_wait_timeout() for new connections. PostgreSQL doesn't have this, so we always + // return the connection to pool on success regardless of crc flag. + GloPgMon->My_Conn_Pool->put_connection(hpa[cur_host_idx].host, hpa[cur_host_idx].port, conn); + conn = nullptr; // Mark as handled } PQclear(res); } - PQfinish(conn); + // If connection wasn't returned to pool (error case), close it + // This matches MySQL's behavior: on error, connection is closed (not returned to pool) + if (conn) { + PQfinish(conn); + conn = nullptr; + } // Process results if (lasts_ase[ase_idx]) { From 497bf8971390528f176d9a4129b9d79baf2e8c3a Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Wed, 31 Dec 2025 11:11:09 +0900 Subject: [PATCH 4/7] Fix Aurora PostgreSQL auto-discovered servers not being monitored by ping Auto-discovered Aurora PostgreSQL servers were not being added to the ping monitoring target list, causing deleted/failed instances to remain in ONLINE status instead of being properly shunned. Root cause: - update_aws_aurora_set_writer() and update_aws_aurora_set_reader() were calling generate_pgsql_servers_table() to sync MyHostGroups to the internal pgsql_servers table - But update_table_pgsql_servers_for_monitor() was not called after, so the monitor's server list (pgsql_servers_to_monitor) was never updated with the new auto-discovered servers - MySQL implementation correctly calls update_table_mysql_servers_for_monitor() in these same locations Fix: Add update_table_pgsql_servers_for_monitor(false) calls in 4 locations: - update_aws_aurora_set_writer(): commit path (line 4979) - update_aws_aurora_set_writer(): auto-discovery path (line 5021) - update_aws_aurora_set_reader(): commit path (line 5107) - update_aws_aurora_set_reader(): auto-discovery path (line 5137) This ensures auto-discovered servers are properly ping monitored and will be shunned when they become unreachable. --- lib/PgSQL_HostGroups_Manager.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 4205522d76..0ef3aa8325 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -4974,6 +4974,10 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid mydb->execute(query); generate_pgsql_servers_table(&_whid); generate_pgsql_servers_table(&_rhid); + + // Because 'commit' is called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + wrunlock(); GloAdmin->pgsql_servers_wrunlock(); } else { @@ -5013,6 +5017,9 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid generate_pgsql_servers_table(&_whid); generate_pgsql_servers_table(&_rhid); + // Because 'commit' isn't called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + // Update AWS Aurora resultset used for monitoring update_aws_aurora_hosts_monitor_resultset(true); } @@ -5096,6 +5103,9 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid generate_pgsql_servers_table(&_whid); generate_pgsql_servers_table(&_rhid); + // Because 'commit' is called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + wrunlock(); GloAdmin->pgsql_servers_wrunlock(); free(query); @@ -5123,6 +5133,8 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid generate_pgsql_servers_table(&_whid); generate_pgsql_servers_table(&_rhid); + // Because 'commit' isn't called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); // Update AWS Aurora resultset used for monitoring update_aws_aurora_hosts_monitor_resultset(true); } From b6d582f1709a1a395513762d598bb8b17aa8ffa2 Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Wed, 31 Dec 2025 15:20:55 +0900 Subject: [PATCH 5/7] Change monitor connect error log level from error to debug MySQL does not log connection failures as errors (only SSL errors). Change PostgreSQL monitor to follow the same pattern by using proxy_debug instead of proxy_error for connection failures. This prevents excessive error logs (every 2 seconds) when a server is unreachable (e.g., deleted Aurora instance). Note: MySQL has DNS caching (DNS_Cache class, dns_lookup(), DNS resolver thread running every 60 seconds), while PostgreSQL does direct DNS lookup on every connection attempt. This causes PostgreSQL to log errors every 2 seconds (ping interval) vs MySQL's ~60 seconds. --- lib/PgSQL_Monitor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 72a8832d80..daacaa2409 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -1187,7 +1187,7 @@ pgsql_conn_t create_new_conn(task_st_t& task_st) { if (pgconn.conn) { auto error { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - proxy_error( + proxy_debug(PROXY_DEBUG_MONITOR, 5, "Monitor connect failed addr='%s:%d' error='%s'\n", srv.addr.c_str(), srv.port, error.get() ); @@ -1196,7 +1196,7 @@ pgsql_conn_t create_new_conn(task_st_t& task_st) { task_st.end = monotonic_time(); } else { mf_unique_ptr error { strdup("Out of memory") }; - proxy_error( + proxy_debug(PROXY_DEBUG_MONITOR, 5, "Monitor connect failed addr='%s:%d' error='%s'\n", srv.addr.c_str(), srv.port, "Out of memory" ); From 6141807a16958a2338e1d370fca371eb2de18c86 Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Tue, 6 Jan 2026 12:12:17 +0900 Subject: [PATCH 6/7] Align Aurora PostgreSQL monitoring with MySQL implementation This commit ensures Aurora PostgreSQL monitoring behaves identically to Aurora MySQL monitoring by adding missing functionality: 1. Add runtime variable refresh in Aurora monitoring threads - HG thread (PgSQL_monitor_AWS_Aurora_thread_HG): Now checks global variable version on each loop iteration and calls refresh_variables() when changes are detected, allowing runtime configuration changes to take effect immediately without restarting the thread - Main Aurora thread (PgSQL_monitor_aws_aurora): Added PgSQL_Thread object initialization and variable refresh loop 2. Add monitor_enabled check to Aurora thread loops - Both HG thread and main Aurora thread now check pgsql_thread___monitor_enabled in their while conditions - Setting pgsql-monitor_enabled=false now properly stops Aurora monitoring threads (previously they would continue running) 3. Add checksum comparison to update_aws_aurora_set_writer() - Before committing writer changes, compare checksums of current and incoming server lists - Only commit if there are actual changes, avoiding unnecessary table regeneration and log noise - Add verbose skip logging when hostgroup_manager_verbose > 1 These changes match the behavior of the MySQL Aurora implementation in: - MySQL_Monitor.cpp: monitor_AWS_Aurora_thread_HG() - MySQL_Monitor.cpp: MySQL_Monitor::monitor_aws_aurora() - MySQL_HostGroups_Manager.cpp: update_aws_aurora_set_writer() Benefits: - Runtime variable changes (e.g., hostgroup_manager_verbose) are now reflected immediately in Aurora monitoring threads - pgsql-monitor_enabled=false properly stops Aurora monitoring - Reduced unnecessary commits and log output when no actual changes occur - Full consistency with MySQL Aurora monitoring behavior --- lib/PgSQL_HostGroups_Manager.cpp | 97 +++++++++++++++++++++++++++----- lib/PgSQL_Monitor.cpp | 63 +++++++++++++++++++-- 2 files changed, 141 insertions(+), 19 deletions(-) diff --git a/lib/PgSQL_HostGroups_Manager.cpp b/lib/PgSQL_HostGroups_Manager.cpp index 0ef3aa8325..5938371733 100644 --- a/lib/PgSQL_HostGroups_Manager.cpp +++ b/lib/PgSQL_HostGroups_Manager.cpp @@ -4963,22 +4963,67 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid mydb->execute(query); } - proxy_warning("Aurora PostgreSQL: setting host %s%s:%d as writer\n", _server_id, domain_name, aurora_port); - q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id NOT IN (%d, %d)"; - sprintf(query, q, _rhid, _whid); - mydb->execute(query); - commit(); - wrlock(); - q = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; - sprintf(query, q, _whid, _rhid); - mydb->execute(query); - generate_pgsql_servers_table(&_whid); - generate_pgsql_servers_table(&_rhid); + // Calculate checksums to check if update is actually needed + uint64_t checksum_current = 0; + uint64_t checksum_incoming = 0; + { + int chk_cols = 0; + int chk_affected_rows = 0; + SQLite3_result *resultset_servers = nullptr; + char *chk_query = nullptr; + char *chk_error = nullptr; + const char *q1 = "SELECT DISTINCT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, pgsql_servers.comment FROM pgsql_servers JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; + const char *q2 = "SELECT DISTINCT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, pgsql_servers_incoming.comment FROM pgsql_servers_incoming JOIN pgsql_aws_aurora_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE writer_hostgroup=%d ORDER BY hostgroup_id, hostname, port"; + chk_query = (char *)malloc(strlen(q2) + 128); + sprintf(chk_query, q1, _writer_hostgroup); + mydb->execute_statement(chk_query, &chk_error, &chk_cols, &chk_affected_rows, &resultset_servers); + if (chk_error == nullptr) { + if (resultset_servers) { + checksum_current = resultset_servers->raw_checksum(); + } + } + if (chk_error) { free(chk_error); chk_error = nullptr; } + if (resultset_servers) { + delete resultset_servers; + resultset_servers = nullptr; + } + sprintf(chk_query, q2, _writer_hostgroup); + mydb->execute_statement(chk_query, &chk_error, &chk_cols, &chk_affected_rows, &resultset_servers); + if (chk_error == nullptr) { + if (resultset_servers) { + checksum_incoming = resultset_servers->raw_checksum(); + } + } + if (chk_error) { free(chk_error); chk_error = nullptr; } + if (resultset_servers) { + delete resultset_servers; + resultset_servers = nullptr; + } + free(chk_query); + } - // Because 'commit' is called, we are required to update 'pgsql_servers_for_monitor'. - update_table_pgsql_servers_for_monitor(false); + if (checksum_incoming != checksum_current) { + proxy_warning("Aurora PostgreSQL: setting host %s%s:%d as writer\n", _server_id, domain_name, aurora_port); + q = (char *)"INSERT INTO pgsql_servers_incoming SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers WHERE hostgroup_id NOT IN (%d, %d)"; + sprintf(query, q, _rhid, _whid); + mydb->execute(query); + commit(); + wrlock(); + q = (char *)"DELETE FROM pgsql_servers WHERE hostgroup_id IN (%d, %d)"; + sprintf(query, q, _whid, _rhid); + mydb->execute(query); + generate_pgsql_servers_table(&_whid); + generate_pgsql_servers_table(&_rhid); - wrunlock(); + // Because 'commit' is called, we are required to update 'pgsql_servers_for_monitor'. + update_table_pgsql_servers_for_monitor(false); + + wrunlock(); + } else { + if (GloPTH->variables.hostgroup_manager_verbose > 1) { + proxy_warning("Aurora PostgreSQL: skipping setting node %s%s:%d from hostgroup %d as writer because won't change the list of ONLINE nodes in writer hostgroup\n", _server_id, domain_name, aurora_port, _writer_hostgroup); + } + } GloAdmin->pgsql_servers_wrunlock(); } else { // Auto-discovery: server not found, create new entry (matching MySQL approach) @@ -5017,6 +5062,18 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_writer(int _whid, int _rhid generate_pgsql_servers_table(&_whid); generate_pgsql_servers_table(&_rhid); + // Update the global checksums after 'pgsql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_pgsql_servers(mydb) }; + string pgsrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; + save_runtime_pgsql_servers(resultset.release()); + proxy_info("Checksum for table %s is %s\n", "pgsql_servers", pgsrvs_checksum.c_str()); + + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_pgsql_servers_checksum(pgsrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); + } + // Because 'commit' isn't called, we are required to update 'pgsql_servers_for_monitor'. update_table_pgsql_servers_for_monitor(false); // Update AWS Aurora resultset used for monitoring @@ -5133,6 +5190,18 @@ void PgSQL_HostGroups_Manager::update_aws_aurora_set_reader(int _whid, int _rhid generate_pgsql_servers_table(&_whid); generate_pgsql_servers_table(&_rhid); + // Update the global checksums after 'pgsql_servers' regeneration + { + unique_ptr resultset { get_admin_runtime_pgsql_servers(mydb) }; + string pgsrvs_checksum { get_checksum_from_hash(resultset ? resultset->raw_checksum() : 0) }; + save_runtime_pgsql_servers(resultset.release()); + proxy_info("Checksum for table %s is %s\n", "pgsql_servers", pgsrvs_checksum.c_str()); + + pthread_mutex_lock(&GloVars.checksum_mutex); + update_glovars_pgsql_servers_checksum(pgsrvs_checksum); + pthread_mutex_unlock(&GloVars.checksum_mutex); + } + // Because 'commit' isn't called, we are required to update 'pgsql_servers_for_monitor'. update_table_pgsql_servers_for_monitor(false); // Update AWS Aurora resultset used for monitoring diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index daacaa2409..1c75340419 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -2742,9 +2742,22 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { proxy_info("Started Aurora PostgreSQL Monitor thread for writer HG %u\n", wHG); + // Initialize thread-local variables (matching MySQL pattern) + unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version; + PgSQL_Thread* pgsql_thr = new PgSQL_Thread(); + pgsql_thr->curtime = monotonic_time(); + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version(); + pgsql_thr->refresh_variables(); + // Quick exit checks - if (!GloPTH) return nullptr; - if (!GloPgMon) return nullptr; + if (!GloPTH) { + delete pgsql_thr; + return nullptr; + } + if (!GloPgMon) { + delete pgsql_thr; + return nullptr; + } // Get monitor credentials from GloPTH char* monitor_user = GloPTH->get_variable_string((char*)"monitor_username"); @@ -2828,13 +2841,22 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { t1 = monotonic_time(); unsigned long long start_time = t1; - while (GloPgMon->shutdown == false && exit_now == false) { + while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true && exit_now == false) { + unsigned int glover; t1 = monotonic_time(); if (!GloPTH) { goto __exit_pgsql_monitor_AWS_Aurora_thread_HG_now; } + // if variables has changed, triggers new checks + glover = GloPTH->get_global_version(); + if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < glover) { + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = glover; + pgsql_thr->refresh_variables(); + next_loop_at = 0; + } + pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); current_raw_checksum = GloPgMon->AWS_Aurora_Hosts_resultset_checksum; pthread_mutex_unlock(&GloPgMon->aws_aurora_mutex); @@ -3035,6 +3057,11 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { } } + // Cleanup thread object + if (pgsql_thr) { + delete pgsql_thr; + } + proxy_info("Stopping Aurora PostgreSQL Monitor thread for writer HG %u\n", wHG); return nullptr; } @@ -3047,6 +3074,14 @@ void* PgSQL_monitor_aws_aurora(void* arg) { (void)arg; // unused if (!GloPgMon) return nullptr; + // Initialize the PgSQL Thread (note: this is not a real thread, just the structures associated with it) + unsigned int PgSQL_Monitor__thread_PgSQL_Thread_Variables_version; + PgSQL_Thread* pgsql_thr = new PgSQL_Thread(); + pgsql_thr->curtime = monotonic_time(); + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = GloPTH->get_global_version(); + pgsql_thr->refresh_variables(); + if (!GloPTH) return nullptr; // quick exit during shutdown/restart + uint64_t last_raw_checksum = 0; unsigned int* hgs_array = nullptr; pthread_t* pthreads_array = nullptr; @@ -3054,8 +3089,19 @@ void* PgSQL_monitor_aws_aurora(void* arg) { proxy_info("Started Aurora PostgreSQL Monitor main thread\n"); - while (GloPgMon->shutdown == false) { - if (!GloPTH) return nullptr; + while (GloPgMon->shutdown == false && pgsql_thread___monitor_enabled == true) { + unsigned int glover; + + if (!GloPTH) { + goto __exit_pgsql_monitor_aws_aurora; + } + + // if variables has changed, triggers new checks + glover = GloPTH->get_global_version(); + if (PgSQL_Monitor__thread_PgSQL_Thread_Variables_version < glover) { + PgSQL_Monitor__thread_PgSQL_Thread_Variables_version = glover; + pgsql_thr->refresh_variables(); + } // Check if list of servers or HG or options has changed pthread_mutex_lock(&GloPgMon->aws_aurora_mutex); @@ -3118,6 +3164,13 @@ void* PgSQL_monitor_aws_aurora(void* arg) { usleep(500000); // 500ms } +__exit_pgsql_monitor_aws_aurora: + // Cleanup thread object + if (pgsql_thr) { + delete pgsql_thr; + pgsql_thr = nullptr; + } + // Cleanup on shutdown if (pthreads_array) { for (unsigned int i = 0; i < hgs_num; i++) { From 1e5fdbdcf9140991aa6a0a176c0142905be330e8 Mon Sep 17 00:00:00 2001 From: "eden.lee" Date: Tue, 6 Jan 2026 18:43:26 +0900 Subject: [PATCH 7/7] Fix Aurora PostgreSQL monitor logging to match MySQL behavior Changes: 1. Remove noisy "Monitor connect failed" logging on every connection failure - Errors are recorded in pgsql_server_connect_log table - Shunning logic will log when max_failures is reached (matching MySQL behavior) 2. Add Aurora health check error logging (matching MySQL behavior) - Connection failure: "Error on AWS Aurora PostgreSQL check for :..." - Query failure: "Error on AWS Aurora PostgreSQL check for :... Query failed" - Previous incorrect comment stated "Not logging to match MySQL" but MySQL does log these errors --- lib/PgSQL_Monitor.cpp | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/PgSQL_Monitor.cpp b/lib/PgSQL_Monitor.cpp index 1c75340419..47a3afcbba 100644 --- a/lib/PgSQL_Monitor.cpp +++ b/lib/PgSQL_Monitor.cpp @@ -927,13 +927,9 @@ pair handle_async_connect_cont(state_t& st, short revent) { break; case PGRES_POLLING_FAILED: { // During connection phase use `PQerrorMessage` - const mon_srv_t& srv { st.task.op_st.srv_info }; + // Note: Error is recorded in pgsql_server_connect_log table; logging here would be noisy + // as this fires on every connection failure. The shunning logic will log when max_failures is reached. auto err { strdup_no_lf(PQerrorMessage(pgconn.conn)) }; - - proxy_error( - "Monitor connect failed addr='%s:%d' error='%s'\n", - srv.addr.c_str(), srv.port, err.get() - ); set_failed_st(st, ASYNC_CONNECT_FAILED, std::move(err)); break; } @@ -2928,7 +2924,8 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { if (PQstatus(conn) != CONNECTION_OK) { error_msg = strdup(PQerrorMessage(conn)); - // Note: Not logging here to match MySQL behavior - errors are stored in status entry + proxy_error("Error on AWS Aurora PostgreSQL check for %s:%d after %llums. Unable to create a connection. Error: %s\n", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, (t2 - start_time) / 1000, error_msg); ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); free(error_msg); @@ -2945,7 +2942,8 @@ void* PgSQL_monitor_AWS_Aurora_thread_HG(void* arg) { if (PQresultStatus(res) != PGRES_TUPLES_OK) { error_msg = strdup(PQerrorMessage(conn)); - // Note: Not logging here to match MySQL behavior - errors are stored in status entry + proxy_error("Error on AWS Aurora PostgreSQL check for %s:%d after %llums. Query failed. Error: %s\n", + hpa[cur_host_idx].host, hpa[cur_host_idx].port, (t2 - start_time) / 1000, error_msg); ase = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); ase_l = new PgSQL_AWS_Aurora_status_entry(start_time, t2 - start_time, error_msg); free(error_msg);